/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.bootstrap.mysql.txnprocessor.impl;

import com.flipkart.aesop.bootstrap.mysql.MysqlEvent;
import com.flipkart.aesop.bootstrap.mysql.MysqlEventProducer;
import com.flipkart.aesop.bootstrap.mysql.mapper.BinLogEventMapper;
import com.flipkart.aesop.bootstrap.mysql.mapper.impl.DefaultBinLogEventMapper;
import com.flipkart.aesop.bootstrap.mysql.txnprocessor.MysqlTransactionManager;
import com.flipkart.aesop.bootstrap.mysql.txnprocessor.impl.PerSourceTransaction;
import com.flipkart.aesop.bootstrap.mysql.txnprocessor.impl.Transaction;
import com.flipkart.aesop.bootstrap.mysql.utils.ORToMysqlMapper;
import com.flipkart.aesop.event.AbstractEvent;
import com.flipkart.aesop.runtime.bootstrap.consumer.SourceEventConsumer;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.common.glossary.Row;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import com.linkedin.databus2.schemas.VersionedSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class MysqlTransactionManagerImpl<T extends AbstractEvent>
implements MysqlTransactionManager {
    private static final Logger LOGGER = LogFactory.getLogger(MysqlTransactionManagerImpl.class);
    private volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private int currFileNum;
    private String currTableName = "";
    private long currTableId = -1L;
    private boolean beginTxnSeen = false;
    private long currTxnSizeInBytes = 0L;
    private long currTxnStartReadTimestamp = 0L;
    private Map<Long, String> mysqlTableIdToTableNameMap;
    private final Map<String, Short> tableUriToSrcIdMap;
    private final Map<String, String> tableUriToSrcNameMap;
    private final BinLogEventMapper<T> binLogEventMapper;
    private Transaction transaction = null;
    private PerSourceTransaction perSourceTransaction = null;
    private SchemaRegistryService schemaRegistryService;
    private MysqlEventProducer mySqlEventProducer;
    private final SourceEventConsumer sourceEventConsumer;

    public MysqlTransactionManagerImpl(int currFileNum, Map<String, Short> tableUriToSrcIdMap, Map<String, String> tableUriToSrcNameMap, SchemaRegistryService schemaRegistryService, MysqlEventProducer mysqlEventProducer, SourceEventConsumer sourceEventConsumer) {
        this.currFileNum = currFileNum;
        this.tableUriToSrcIdMap = tableUriToSrcIdMap;
        this.tableUriToSrcNameMap = tableUriToSrcNameMap;
        this.schemaRegistryService = schemaRegistryService;
        this.mysqlTableIdToTableNameMap = new HashMap<Long, String>();
        this.sourceEventConsumer = sourceEventConsumer;
        this.binLogEventMapper = new DefaultBinLogEventMapper(new ORToMysqlMapper());
        this.mySqlEventProducer = mysqlEventProducer;
    }

    @Override
    public void startXtion() {
        this.currTxnStartReadTimestamp = System.nanoTime();
        if (this.transaction != null) {
            LOGGER.warn("Illegal Start Transaction State");
            throw new DatabusRuntimeException("Got startXtion without an endXtion for previous transaction");
        }
        this.transaction = new Transaction();
    }

    @Override
    public void endXtion(long eventTimeStamp) {
        block6: {
            if (!this.shutdownRequested.get()) {
                long currTxnTimestamp = eventTimeStamp * 1000000L;
                long txnReadLatency = System.nanoTime() - this.currTxnStartReadTimestamp;
                try {
                    this.transaction.setSizeInBytes(this.currTxnSizeInBytes);
                    this.transaction.setTxnNanoTimestamp(currTxnTimestamp);
                    this.transaction.setTxnReadLatencyNanos(txnReadLatency);
                    try {
                        this.onEndTransaction(this.transaction);
                        break block6;
                    }
                    catch (DatabusException e3) {
                        LOGGER.error("Got exception in the transaction handler ", (Throwable)e3);
                        throw new DatabusRuntimeException((Throwable)e3);
                    }
                }
                finally {
                    this.resetTxn();
                }
            }
            LOGGER.info("Not writing event to buffer as shutdown has been requested");
        }
    }

    @Override
    public void resetTxn() {
        this.currTableName = "";
        this.currTableId = -1L;
        this.perSourceTransaction = null;
        this.transaction = null;
        this.currTxnSizeInBytes = 0L;
    }

    @Override
    public void setSource(long newTableId) {
        String newTableName = this.mysqlTableIdToTableNameMap.get(newTableId);
        if (newTableName == null) {
            LOGGER.error("TableMap Event not received for the change event tableId: " + newTableId);
            throw new DatabusRuntimeException("TableMap Event not received for the change event tableId: " + newTableId);
        }
        if (this.currTableName.isEmpty() && this.currTableId == -1L) {
            this.startSource(newTableName, newTableId);
        } else if (!this.currTableName.equals(newTableName) || this.currTableId != newTableId) {
            LOGGER.debug("Table name changed from " + this.currTableName + " to " + newTableName);
            this.endSource();
            this.startSource(newTableName, newTableId);
        }
    }

    @Override
    public Map<Long, String> getMysqlTableIdToTableNameMap() {
        return this.mysqlTableIdToTableNameMap;
    }

    @Override
    public void performChanges(long tableId, BinlogEventV4Header eventHeader, List<Row> rowList, DbusOpcode databusOpcode) {
        try {
            this.setSource(tableId);
            VersionedSchema schema = this.schemaRegistryService.fetchLatestVersionedSchemaBySourceName(this.tableUriToSrcNameMap.get(this.currTableName));
            LOGGER.debug("Schema obtained for table " + this.currTableName + " = " + schema);
            if (schema != null) {
                for (Row row : rowList) {
                    T abstractEvent = this.binLogEventMapper.mapBinLogEvent(row, schema.getSchema(), databusOpcode);
                    this.perSourceTransaction.mergeDbChangeEntrySet(new MysqlEvent(this.frameSCN(this.currFileNum, (int)eventHeader.getPosition()), (AbstractEvent)abstractEvent));
                }
                if (!this.perSourceTransaction.equals(this.transaction.getPerSourceTransaction(this.perSourceTransaction.getSrcId()))) {
                    this.transaction.mergePerSourceTransaction(this.perSourceTransaction);
                }
            } else {
                LOGGER.info("Events recieved from uninterested sources " + this.currTableName);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            LOGGER.error("Exception occurred while persisting changes to transaction " + e.getMessage());
        }
    }

    @Override
    public void setShutdownRequested(boolean shutdownRequested) {
        this.shutdownRequested.set(shutdownRequested);
    }

    private void startSource(String newTableName, long newTableId) {
        Short srcId;
        this.currTableName = newTableName;
        this.currTableId = newTableId;
        if (this.perSourceTransaction == null || this.transaction == null) {
            srcId = this.tableUriToSrcIdMap.get(this.currTableName);
            if (srcId == null) {
                LOGGER.warn("Could not find a matching logical source for table Uri (" + this.currTableName + ")");
                return;
            }
        } else {
            String errorMessage = "Seems like a startSource has been received without an endSource for previous source";
            LOGGER.error(errorMessage);
            throw new DatabusRuntimeException(errorMessage);
        }
        this.perSourceTransaction = new PerSourceTransaction(srcId.shortValue());
        this.transaction.mergePerSourceTransaction(this.perSourceTransaction);
    }

    private void endSource() {
        this.perSourceTransaction = null;
    }

    private void onEndTransaction(Transaction txn) throws DatabusException {
        this.sendEventsToSourceEventConsumer(txn);
        this.mySqlEventProducer.updateSCN(txn.getScn());
    }

    private void sendEventsToSourceEventConsumer(Transaction txn) {
        for (PerSourceTransaction t : txn.getOrderedPerSourceTransactions()) {
            for (MysqlEvent mysqlEvent : t.getSourceEventChangeSet()) {
                this.sourceEventConsumer.onEvent(mysqlEvent.getAbstractEvent());
            }
        }
    }

    private long frameSCN(int logId, int offset) {
        long scn = logId;
        scn <<= 32;
        return scn |= (long)offset;
    }

    @Override
    public String getCurrTableName() {
        return this.currTableName;
    }

    @Override
    public void setCurrFileNum(int currFileNum) {
        this.currFileNum = currFileNum;
    }

    @Override
    public long getCurrTableId() {
        return this.currTableId;
    }

    @Override
    public boolean isBeginTxnSeen() {
        return this.beginTxnSeen;
    }

    @Override
    public void setBeginTxnSeen(boolean beginTxnSeen) {
        this.beginTxnSeen = beginTxnSeen;
    }
}

