/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers;

import com.google.code.or.binlog.BinlogEventListener;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.RotateEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
import com.google.code.or.binlog.impl.event.XidEvent;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.glossary.column.BitColumn;
import com.google.code.or.common.glossary.column.BlobColumn;
import com.google.code.or.common.glossary.column.DateColumn;
import com.google.code.or.common.glossary.column.DatetimeColumn;
import com.google.code.or.common.glossary.column.DecimalColumn;
import com.google.code.or.common.glossary.column.DoubleColumn;
import com.google.code.or.common.glossary.column.EnumColumn;
import com.google.code.or.common.glossary.column.FloatColumn;
import com.google.code.or.common.glossary.column.Int24Column;
import com.google.code.or.common.glossary.column.LongColumn;
import com.google.code.or.common.glossary.column.LongLongColumn;
import com.google.code.or.common.glossary.column.NullColumn;
import com.google.code.or.common.glossary.column.SetColumn;
import com.google.code.or.common.glossary.column.ShortColumn;
import com.google.code.or.common.glossary.column.StringColumn;
import com.google.code.or.common.glossary.column.TimeColumn;
import com.google.code.or.common.glossary.column.TimestampColumn;
import com.google.code.or.common.glossary.column.TinyColumn;
import com.google.code.or.common.glossary.column.YearColumn;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.producers.ds.DbChangeEntry;
import com.linkedin.databus2.producers.ds.KeyPair;
import com.linkedin.databus2.producers.ds.PerSourceTransaction;
import com.linkedin.databus2.producers.ds.PrimaryKeySchema;
import com.linkedin.databus2.producers.ds.Transaction;
import com.linkedin.databus2.schemas.NoSuchSchemaException;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import com.linkedin.databus2.schemas.VersionedSchema;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger;

class ORListener
implements BinlogEventListener {
    private long _currTableId = -1L;
    private String _currTableName = "";
    private Transaction _transaction = null;
    private PerSourceTransaction _perSourceTransaction = null;
    private int _currFileNum;
    private final Logger _log;
    private final String _binlogFilePrefix;
    private final SchemaRegistryService _schemaRegistryService;
    private final Map<String, Short> _tableUriToSrcIdMap;
    private final Map<String, String> _tableUriToSrcNameMap;
    private final TransactionProcessor _txnProcessor;
    private long _currTxnSizeInBytes = 0L;
    private long _currTxnTimestamp = 0L;
    private long _currTxnStartReadTimestamp = 0L;
    private boolean _isBeginTxnSeen = false;

    public ORListener(int currentFileNumber, Logger log, String binlogFilePrefix, TransactionProcessor txnProcessor, Map<String, Short> tableUriToSrcIdMap, Map<String, String> tableUriToSrcNameMap, SchemaRegistryService schemaRegistryService) {
        this._log = log;
        this._txnProcessor = txnProcessor;
        this._binlogFilePrefix = binlogFilePrefix;
        this._tableUriToSrcIdMap = tableUriToSrcIdMap;
        this._tableUriToSrcNameMap = tableUriToSrcNameMap;
        this._schemaRegistryService = schemaRegistryService;
        this._currFileNum = currentFileNumber;
    }

    public void onEvents(BinlogEventV4 event) {
        QueryEvent qe;
        String sql;
        if (event == null) {
            this._log.error((Object)"Received null event");
            return;
        }
        if (event instanceof QueryEvent && "BEGIN".equalsIgnoreCase(sql = (qe = (QueryEvent)event).getSql().toString())) {
            this._isBeginTxnSeen = true;
            this._log.info((Object)("BEGIN sql: " + sql));
            this._currTxnSizeInBytes = event.getHeader().getEventLength();
            this.startXtion(qe);
            return;
        }
        if (!this._isBeginTxnSeen) {
            if (this._log.isDebugEnabled()) {
                this._log.debug((Object)("Skipping event (" + event + ") as this is before the start of first transaction"));
            }
            return;
        }
        this._currTxnSizeInBytes += event.getHeader().getEventLength();
        if (event instanceof QueryEvent) {
            qe = (QueryEvent)event;
            sql = qe.getSql().toString();
            if ("COMMIT".equalsIgnoreCase(sql)) {
                this._log.debug((Object)("COMMIT sql: " + sql));
                this.endXtion((AbstractBinlogEventV4)qe);
                return;
            }
            if ("ROLLBACK".equalsIgnoreCase(sql)) {
                this._log.debug((Object)("ROLLBACK sql: " + sql));
                this.rollbackXtion(qe);
                return;
            }
            this._log.debug((Object)("Likely DDL statement sql: " + sql));
            return;
        }
        if (event instanceof RotateEvent) {
            RotateEvent re = (RotateEvent)event;
            String fileName = re.getBinlogFileName().toString();
            this._log.info((Object)("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + this._binlogFilePrefix));
            String fileNumStr = fileName.substring(fileName.lastIndexOf(this._binlogFilePrefix) + this._binlogFilePrefix.length() + 1);
            this._currFileNum = Integer.parseInt(fileNumStr);
        } else {
            DeleteRowsEventV2 dre;
            UpdateRowsEvent ure;
            WriteRowsEvent wre;
            if (event instanceof XidEvent) {
                XidEvent xe = (XidEvent)event;
                long xid = xe.getXid();
                this._log.debug((Object)("Treating XID event with xid = " + xid + " as commit for the transaction"));
                this.endXtion((AbstractBinlogEventV4)xe);
                return;
            }
            if (event instanceof WriteRowsEvent) {
                wre = (WriteRowsEvent)event;
                this.insertRows(wre);
            } else if (event instanceof WriteRowsEventV2) {
                wre = (WriteRowsEventV2)event;
                this.insertRows((WriteRowsEventV2)wre);
            } else if (event instanceof UpdateRowsEvent) {
                ure = (UpdateRowsEvent)event;
                this.updateRows(ure);
            } else if (event instanceof UpdateRowsEventV2) {
                ure = (UpdateRowsEventV2)event;
                this.updateRows((UpdateRowsEventV2)ure);
            } else if (event instanceof DeleteRowsEventV2) {
                dre = (DeleteRowsEventV2)event;
                this.deleteRows(dre);
            } else if (event instanceof DeleteRowsEvent) {
                dre = (DeleteRowsEvent)event;
                this.deleteRows((DeleteRowsEvent)dre);
            } else if (event instanceof TableMapEvent) {
                TableMapEvent tme = (TableMapEvent)event;
                this.processTableMapEvent(tme);
            } else {
                this._log.warn((Object)("Skipping !! Unknown OR event e: " + event));
                return;
            }
        }
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("e: " + event));
        }
    }

    private void processTableMapEvent(TableMapEvent tme) {
        boolean errorTransition;
        String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
        long newTableId = tme.getTableId();
        boolean areTableNamesEqual = this._currTableName.equals(newTableName);
        boolean areTableIdsEqual = this._currTableId == newTableId;
        boolean didTableNameChange = !areTableNamesEqual || !areTableIdsEqual;
        boolean bl = errorTransition = areTableNamesEqual && !areTableIdsEqual || !areTableNamesEqual && areTableIdsEqual;
        if (this._currTableName.isEmpty() && this._currTableId == -1L) {
            this.startSource(newTableName, newTableId);
        } else if (didTableNameChange) {
            this.endSource();
            this.startSource(newTableName, newTableId);
        } else {
            this._log.error((Object)("Unexpected : TableMap Event obtained :" + tme));
            throw new DatabusRuntimeException("Unexpected : TableMap Event obtained : _currTableName = " + this._currTableName + " _curTableId = " + this._currTableId + " newTableName = " + newTableName + " newTableId = " + newTableId);
        }
        if (errorTransition) {
            throw new DatabusRuntimeException("TableName and TableId should change simultaneously or not _currTableName = " + this._currTableName + " _curTableId = " + this._currTableId + " newTableName = " + newTableName + " newTableId = " + newTableId);
        }
    }

    private void startXtion(QueryEvent e) {
        this._currTxnStartReadTimestamp = System.nanoTime();
        this._log.info((Object)("startXtion" + e));
        if (this._transaction != null) {
            throw new DatabusRuntimeException("Got startXtion without an endXtion for previous transaction");
        }
        this._transaction = new Transaction();
    }

    private void endXtion(AbstractBinlogEventV4 e) {
        boolean em;
        this._currTxnTimestamp = e.getHeader().getTimestamp() * 1000000L;
        long txnReadLatency = System.nanoTime() - this._currTxnStartReadTimestamp;
        boolean bl = em = e instanceof QueryEvent || e instanceof XidEvent;
        if (!em) {
            throw new DatabusRuntimeException("endXtion should be called with either QueryEvent of XidEvent");
        }
        this._transaction.setSizeInBytes(this._currTxnSizeInBytes);
        this._transaction.setTxnNanoTimestamp(this._currTxnTimestamp);
        this._transaction.setTxnReadLatencyNanos(txnReadLatency);
        try {
            this._txnProcessor.onEndTransaction(this._transaction);
        }
        catch (DatabusException e3) {
            this._log.error((Object)"Got exception in the transaction handler ", (Throwable)e3);
            throw new DatabusRuntimeException((Throwable)e3);
        }
        finally {
            this.reset();
            if (this._log.isDebugEnabled()) {
                this._log.debug((Object)("endXtion" + e));
            }
        }
    }

    private void rollbackXtion(QueryEvent e) {
        this.reset();
        this._log.info((Object)("rollbackXtion" + e));
    }

    private void reset() {
        this._currTableName = "";
        this._currTableId = -1L;
        this._perSourceTransaction = null;
        this._transaction = null;
        this._currTxnSizeInBytes = 0L;
    }

    private void startSource(String newTableName, long newTableId) {
        Short srcId;
        this._currTableName = newTableName;
        this._currTableId = newTableId;
        if (this._perSourceTransaction == null) {
            srcId = this._tableUriToSrcIdMap.get(this._currTableName);
            if (null == srcId) {
                throw new DatabusRuntimeException("Could not find a matching logical source for table Uri (" + this._currTableName + ")");
            }
            assert (this._transaction != null);
        } else {
            throw new DatabusRuntimeException("Seems like a startSource has been received without an endSource for previous source");
        }
        this._perSourceTransaction = new PerSourceTransaction((int)srcId.shortValue());
        this._transaction.mergePerSourceTransaction(this._perSourceTransaction);
    }

    private void endSource() {
        if (this._perSourceTransaction == null) {
            throw new DatabusRuntimeException("_perSourceTransaction should not be null in endSource()");
        }
        this._perSourceTransaction = null;
    }

    private void deleteRows(DeleteRowsEventV2 dre) {
        List lp = dre.getRows();
        this.frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE);
    }

    private void deleteRows(DeleteRowsEvent dre) {
        List lp = dre.getRows();
        this.frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE);
    }

    private void updateRows(UpdateRowsEvent ure) {
        List lp = ure.getRows();
        ArrayList<Row> lr = new ArrayList<Row>(lp.size());
        for (Pair pr : lp) {
            Row r = (Row)pr.getAfter();
            lr.add(r);
        }
        this.frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT);
    }

    private void updateRows(UpdateRowsEventV2 ure) {
        List lp = ure.getRows();
        ArrayList<Row> lr = new ArrayList<Row>(lp.size());
        for (Pair pr : lp) {
            Row r = (Row)pr.getAfter();
            lr.add(r);
        }
        this.frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT);
    }

    private void insertRows(WriteRowsEvent wre) {
        this.frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
    }

    private void insertRows(WriteRowsEventV2 wre) {
        this.frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
    }

    private void frameAvroRecord(BinlogEventV4Header bh, List<Row> rl, DbusOpcode doc) {
        try {
            long timestampInNanos = bh.getTimestamp() * 1000000L;
            long scn = ORListener.scn(this._currFileNum, (int)bh.getPosition());
            boolean isReplicated = false;
            VersionedSchema vs = this._schemaRegistryService.fetchLatestVersionedSchemaBySourceName(this._tableUriToSrcNameMap.get(this._currTableName));
            Schema schema = vs.getSchema();
            if (this._log.isDebugEnabled()) {
                this._log.debug((Object)("File Number :" + this._currFileNum + ", Position :" + (int)bh.getPosition() + ", SCN =" + scn));
            }
            for (Row r : rl) {
                List cl = r.getColumns();
                GenericData.Record gr = new GenericData.Record(schema);
                this.generateAvroEvent(schema, cl, (GenericRecord)gr);
                List<KeyPair> kps = this.generateKeyPair(cl, schema);
                DbChangeEntry db = new DbChangeEntry(scn, timestampInNanos, (GenericRecord)gr, doc, false, schema, kps);
                this._perSourceTransaction.mergeDbChangeEntrySet(db);
            }
        }
        catch (NoSuchSchemaException ne) {
            throw new DatabusRuntimeException((Throwable)ne);
        }
        catch (DatabusException de) {
            throw new DatabusRuntimeException((Throwable)de);
        }
    }

    private List<KeyPair> generateKeyPair(List<Column> cl, Schema schema) throws DatabusException {
        Object o = null;
        Schema.Type st = null;
        String pkFieldName = SchemaHelper.getMetaField((Schema)schema, (String)"pk");
        if (pkFieldName == null) {
            throw new DatabusException("No primary key specified in the schema");
        }
        PrimaryKeySchema pkSchema = new PrimaryKeySchema(pkFieldName);
        List fields = schema.getFields();
        ArrayList<KeyPair> kpl = new ArrayList<KeyPair>();
        int cnt = 0;
        for (Schema.Field field : fields) {
            if (pkSchema.isPartOfPrimaryKey(field)) {
                o = cl.get(cnt).getValue();
                st = field.schema().getType();
                KeyPair kp = new KeyPair(o, st);
                kpl.add(kp);
            }
            ++cnt;
        }
        return kpl;
    }

    private void generateAvroEvent(Schema schema, List<Column> cols, GenericRecord record) throws DatabusException {
        List orderedFields = SchemaHelper.getOrderedFieldsByMetaField((Schema)schema, (String)"dbFieldPosition", (Comparator)new Comparator<String>(){

            @Override
            public int compare(String o1, String o2) {
                Integer pos1 = Integer.parseInt(o1);
                Integer pos2 = Integer.parseInt(o2);
                return pos1.compareTo(pos2);
            }
        });
        if (orderedFields.size() != cols.size()) {
            throw new DatabusException("Mismatch in db schema vs avro schema");
        }
        int cnt = 0;
        HashMap<String, Column> avroFieldCol = new HashMap<String, Column>();
        for (Schema.Field field : orderedFields) {
            avroFieldCol.put(field.name(), cols.get(cnt));
            ++cnt;
        }
        for (Schema.Field field : orderedFields) {
            if (field.schema().getType() == Schema.Type.ARRAY) {
                throw new DatabusException("The parser cannot handle ARRAY datatypes. Found in field: " + field);
            }
            String databaseFieldName = SchemaHelper.getMetaField((Schema.Field)field, (String)"dbFieldName").toLowerCase();
            this._log.debug((Object)("databaseFieldName = " + databaseFieldName));
            this.insertFieldIntoRecord(avroFieldCol, record, databaseFieldName, field);
        }
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("Generic record = " + record));
        }
    }

    private void insertFieldIntoRecord(Map<String, Column> eventFields, GenericRecord record, String dbFieldName, Schema.Field avroField) throws DatabusException {
        String f = avroField.name();
        Column fieldValue = eventFields.get(f);
        boolean isFieldNull = fieldValue == null;
        Object fieldValueObj = null;
        try {
            fieldValueObj = !isFieldNull ? this.orToAvroType(fieldValue) : null;
            record.put(avroField.name(), fieldValueObj);
        }
        catch (DatabusException e) {
            this._log.error((Object)("Unable to process field: " + avroField.name()));
            throw e;
        }
    }

    private Object orToAvroType(Column s) throws DatabusException {
        if (s instanceof BitColumn) {
            BitColumn bc = (BitColumn)s;
            byte[] ba = bc.getValue();
            ByteBuffer b = ByteBuffer.wrap(ba);
            return b;
        }
        if (s instanceof BlobColumn) {
            BlobColumn bc = (BlobColumn)s;
            byte[] ba = bc.getValue();
            return ByteBuffer.wrap(ba);
        }
        if (s instanceof DateColumn) {
            DateColumn dc = (DateColumn)s;
            Date d = dc.getValue();
            Long l = d.getTime();
            return l;
        }
        if (s instanceof DatetimeColumn) {
            DatetimeColumn dc = (DatetimeColumn)s;
            java.util.Date d = dc.getValue();
            Long t1 = d.getTime() / 1000L * 1000L;
            return t1;
        }
        if (s instanceof DecimalColumn) {
            DecimalColumn dc = (DecimalColumn)s;
            this._log.info((Object)("dc Value is :" + dc.getValue()));
            String s1 = dc.getValue().toString();
            this._log.info((Object)("Str : " + s1));
            return s1;
        }
        if (s instanceof DoubleColumn) {
            DoubleColumn dc = (DoubleColumn)s;
            Double d = dc.getValue();
            return d;
        }
        if (s instanceof EnumColumn) {
            EnumColumn ec = (EnumColumn)s;
            Integer i = ec.getValue();
            return i;
        }
        if (s instanceof FloatColumn) {
            FloatColumn fc = (FloatColumn)s;
            Float f = fc.getValue();
            return f;
        }
        if (s instanceof Int24Column) {
            Int24Column ic = (Int24Column)s;
            Integer i = ic.getValue();
            return i;
        }
        if (s instanceof LongColumn) {
            LongColumn lc = (LongColumn)s;
            Integer i = lc.getValue();
            return i;
        }
        if (s instanceof LongLongColumn) {
            LongLongColumn llc = (LongLongColumn)s;
            Long l = llc.getValue();
            return l;
        }
        if (s instanceof NullColumn) {
            return null;
        }
        if (s instanceof SetColumn) {
            SetColumn sc = (SetColumn)s;
            Long l = sc.getValue();
            return l;
        }
        if (s instanceof ShortColumn) {
            ShortColumn sc = (ShortColumn)s;
            Integer i = sc.getValue();
            return i;
        }
        if (s instanceof StringColumn) {
            StringColumn sc = (StringColumn)s;
            String str = new String(sc.getValue(), Charset.defaultCharset());
            return str;
        }
        if (s instanceof TimeColumn) {
            TimeColumn tc = (TimeColumn)s;
            Time t = tc.getValue();
            Calendar c = Calendar.getInstance();
            c.set(70, 0, 1, 0, 0, 0);
            long rawVal = c.getTimeInMillis() / 1000L * 1000L;
            long val2 = t.getTime() / 1000L * 1000L;
            long offset = val2 - rawVal;
            return offset;
        }
        if (s instanceof TimestampColumn) {
            TimestampColumn tsc = (TimestampColumn)s;
            Timestamp ts = tsc.getValue();
            Long t = ts.getTime();
            return t;
        }
        if (s instanceof TinyColumn) {
            TinyColumn tc = (TinyColumn)s;
            Integer i = tc.getValue();
            return i;
        }
        if (s instanceof YearColumn) {
            YearColumn yc = (YearColumn)s;
            Integer i = yc.getValue();
            return i;
        }
        throw new DatabusRuntimeException("Unknown MySQL type in the event" + s.getClass() + " Object = " + s);
    }

    public static long scn(int logId, int offset) {
        long scn = logId;
        scn <<= 32;
        return scn |= (long)offset;
    }

    public static interface TransactionProcessor {
        public void onEndTransaction(Transaction var1) throws DatabusException;
    }
}

