/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.producer;

import com.linkedin.databus.bootstrap.api.BootstrapProducerStatus;
import com.linkedin.databus.bootstrap.common.BootstrapConn;
import com.linkedin.databus.bootstrap.common.BootstrapDBMetaDataDAO;
import com.linkedin.databus.bootstrap.common.BootstrapProducerStatsCollector;
import com.linkedin.databus.bootstrap.common.BootstrapReadOnlyConfig;
import com.linkedin.databus.bootstrap.monitoring.producer.mbean.DbusBootstrapProducerStatsMBean;
import com.linkedin.databus.core.DatabusThreadBase;
import com.linkedin.databus.core.util.RateMonitor;
import com.linkedin.databus.core.util.RngUtils;
import com.linkedin.databus2.core.BackoffTimer;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import com.linkedin.databus2.util.DBHelper;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Formatter;
import java.util.Random;
import org.apache.log4j.Logger;

public class BootstrapApplierThread
extends DatabusThreadBase {
    public static final String MODULE = BootstrapApplierThread.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private static final int MAX_EVENT_WAIT_TIME = 1000;
    private static final int INITIAL_EVENT_WAIT_TIME = 5;
    private static final int DEFAULT_LOG_SAMPLING_PERCENTAGE = 2;
    private static final int DEFAULT_MINSCN_TIMEOUT_SEC = 10;
    private static Random _sSampler = new Random();
    private BootstrapDBMetaDataDAO _bootstrapDao;
    private final String _source;
    private PreparedStatement _tabScnStmt;
    private PreparedStatement _getScnStmt;
    private final BootstrapReadOnlyConfig _config;
    private sourcePositions _sourcePositions = null;
    private String _lastLogLine = "";
    private int _lastLogLineRepeatCount = 0;
    private static final String APPLIER_STATE_LINE_FORMAT = "Applier state : %d %d %d %d";
    private static final int MAX_SKIPPED_LOG_LINES = 1000;
    private BootstrapProducerStatsCollector _statsCollector = null;
    private final RateMonitor _srcRm;
    private final RateMonitor _totalRm;
    private final BackoffTimer _retryTimer;
    private long _minScn = -1L;
    private boolean _isRunning = false;

    public BootstrapApplierThread(String name, String source, BootstrapReadOnlyConfig config) {
        this(name, source, config, null);
    }

    public BootstrapApplierThread(String name, String source, BootstrapReadOnlyConfig config, BootstrapProducerStatsCollector statsCollector) {
        super(name);
        this._source = source;
        this._bootstrapDao = null;
        this._config = config;
        this._retryTimer = new BackoffTimer(name + "RetryTimer", config.getRetryConfig());
        this._statsCollector = statsCollector;
        this._srcRm = new RateMonitor(name + "ProducerSourceRateMonitor");
        this._totalRm = new RateMonitor(name + "ProducerTotalRateMonitor");
    }

    public String toString() {
        return "BootstrapApplierThread [_source=" + this._source + ", _config=" + this._config + ", _sourcePositions=" + this._sourcePositions + "]";
    }

    public synchronized void start() {
        super.start();
    }

    public void run() {
        this._isRunning = true;
        try {
            this._sourcePositions = new sourcePositions(this._source);
            this._sourcePositions.init();
            if (null != this._statsCollector) {
                DbusBootstrapProducerStatsMBean stats = this._statsCollector.getSourceStats(this._source);
                stats.registerBatch(0L, 0L, -1L, (long)this._sourcePositions.getApplyId(), (long)this._sourcePositions.getLogPos());
            }
        }
        catch (Exception e) {
            if (null != this._statsCollector) {
                this._statsCollector.getTotalStats().registerSQLException();
            }
            LOG.error((Object)"Error occurred in initializing source position", (Throwable)e);
            return;
        }
        int sleepTime = 5;
        int totalRowsApplied = 0;
        while (this._isRunning && !this.isShutdownRequested()) {
            try {
                if (this.isPauseRequested()) {
                    LOG.info((Object)"Pause requested for applier. Pausing !!");
                    this.signalPause();
                    LOG.info((Object)"Pausing. Waiting for resume command");
                    this.awaitUnPauseRequest();
                    LOG.info((Object)"Resume requested for applier. Resuming !!");
                    this.signalResumed();
                    LOG.info((Object)"Applier resumed !!");
                }
                this._totalRm.start();
                Connection conn = this.getConnection();
                try {
                    totalRowsApplied += this.applyLog(this._source);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Number of rows applier so far = " + totalRowsApplied + " for source = " + this._source));
                    }
                }
                catch (Exception e) {
                    if (null != this._statsCollector) {
                        this._statsCollector.getTotalStats().registerSQLException();
                        this._statsCollector.getSourceStats(this._source).registerSQLException();
                    }
                    LOG.error((Object)"apply error:", (Throwable)e);
                    throw e;
                }
                try {
                    DBHelper.commit((Connection)conn);
                }
                catch (SQLException s) {
                    DBHelper.rollback((Connection)conn);
                    throw s;
                }
                this._totalRm.stop();
                if (null != this._statsCollector) {
                    this._statsCollector.getTotalStats().registerBatch(this._totalRm.getDuration() / 1000000L, (long)totalRowsApplied, -1L, -1L, -1L);
                }
                if (0 == totalRowsApplied) {
                    Thread.sleep(sleepTime);
                    sleepTime = Math.min(sleepTime * 10, 1000);
                    continue;
                }
                sleepTime = 5;
                totalRowsApplied = 0;
            }
            catch (Exception e) {
                LOG.error((Object)"Error occured in bootstrap applier", (Throwable)e);
                if (null != this._statsCollector) {
                    this._statsCollector.getTotalStats().registerSQLException();
                }
                if (!(e instanceof SQLException) || this.reset(true)) continue;
                LOG.fatal((Object)"Unable to reset Bootstrap DB connections. Stopping Applier Thread !!", (Throwable)e);
                this._isRunning = false;
            }
        }
        this.reset(false);
        this.doShutdownNotify();
    }

    public boolean isRunning() {
        return this._isRunning;
    }

    private void closeApplyStatements() throws SQLException {
        this._sourcePositions.close();
    }

    private int applyLog(String source) throws BootstrapDatabaseTooOldException, SQLException {
        PreparedStatement stmt = null;
        sourcePositions pos = this._sourcePositions;
        applyBatch batch = pos.getNextApplyBatch();
        int rowsToApply = batch.getTorid() - batch.getFromrid();
        try {
            this._srcRm.start();
            if (rowsToApply > 0) {
                boolean log;
                stmt = pos.getApplyStmt();
                stmt.setInt(1, batch.getFromrid());
                stmt.setInt(2, batch.getTorid());
                stmt.executeUpdate();
                boolean bl = log = RngUtils.randomPositiveInt((Random)_sSampler) % 100 < 2;
                if (log) {
                    LOG.info((Object)("Applied Log " + batch + " for " + source));
                }
            }
            pos.save();
        }
        catch (SQLException e) {
            LOG.error((Object)"Error occured during apply log", (Throwable)e);
            throw e;
        }
        finally {
            this._srcRm.stop();
            if (null != this._statsCollector) {
                DbusBootstrapProducerStatsMBean stats = this._statsCollector.getSourceStats(source);
                stats.registerBatch(this._srcRm.getDuration() / 1000000L, (long)rowsToApply, pos.getApplyWindowSCN(), (long)pos.getApplyId(), (long)pos.getLogPos());
            }
        }
        return rowsToApply;
    }

    private long getWindowScnforSource(int srcid, int applyLogId, int tabRid) throws SQLException {
        ResultSet rs = null;
        long windowScn = 0L;
        PreparedStatement stmt = null;
        try {
            if (0 == tabRid) {
                stmt = this.getMaxWindowScnStatement();
                stmt.setInt(1, srcid);
                stmt.setInt(2, Math.max(0, applyLogId - 1));
            } else {
                stmt = this.getWindowScnStatement(applyLogId, srcid);
                stmt.setInt(1, tabRid);
            }
            rs = stmt.executeQuery();
            if (rs.next()) {
                windowScn = rs.getLong(1);
            }
        }
        catch (SQLException e) {
            LOG.error((Object)"Error ocurred during getWindowScnforSource", (Throwable)e);
            throw e;
        }
        finally {
            if (null != rs) {
                rs.close();
                rs = null;
            }
            if (null != stmt) {
                stmt.close();
                stmt = null;
            }
        }
        return windowScn;
    }

    private void setTabPosition(int srcid, int logid, int tabRid, long windowScn) throws SQLException {
        PreparedStatement stmt = this.getTabPositionUpdateStmt();
        stmt.setInt(1, logid);
        stmt.setInt(2, tabRid);
        stmt.setLong(3, windowScn);
        stmt.setInt(4, srcid);
        stmt.executeUpdate();
        StringBuilder logLineBuilder = new StringBuilder(1024);
        Formatter logFormatter = new Formatter(logLineBuilder);
        logFormatter.format(APPLIER_STATE_LINE_FORMAT, srcid, logid, tabRid, windowScn);
        this.log(srcid, logFormatter);
    }

    private PreparedStatement getMaxWindowScnStatement() throws SQLException {
        Connection conn = null;
        PreparedStatement windowScnStmt = null;
        try {
            conn = this.getConnection();
            StringBuilder sql = new StringBuilder();
            sql.append("select maxwindowscn from bootstrap_loginfo where srcid = ? and logid = ?");
            windowScnStmt = conn.prepareStatement(sql.toString());
        }
        catch (SQLException e) {
            DBHelper.close(windowScnStmt);
            LOG.error((Object)"error occurred during getWindowScnStatement", (Throwable)e);
            throw e;
        }
        return windowScnStmt;
    }

    private PreparedStatement getWindowScnStatement(int applyLogId, int srcid) throws SQLException {
        Connection conn = null;
        PreparedStatement windowScnStmt = null;
        try {
            conn = this.getConnection();
            StringBuilder sql = new StringBuilder();
            sql.append("select windowscn from ");
            sql.append(this.getLogTableName(applyLogId, srcid));
            sql.append(" where id = ?");
            windowScnStmt = conn.prepareStatement(sql.toString());
        }
        catch (SQLException e) {
            DBHelper.close(windowScnStmt);
            LOG.error((Object)"error occurred during getWindowScnStatement", (Throwable)e);
            throw e;
        }
        return windowScnStmt;
    }

    private PreparedStatement getTabPositionUpdateStmt() throws SQLException {
        if (this._tabScnStmt != null) {
            return this._tabScnStmt;
        }
        Connection conn = null;
        try {
            conn = this.getConnection();
            StringBuilder sql = new StringBuilder();
            sql.append("update bootstrap_applier_state set logid = ?, rid = ? , windowscn = ?  where srcid = ?");
            this._tabScnStmt = conn.prepareStatement(sql.toString());
        }
        catch (SQLException e) {
            LOG.error((Object)"Error ocurred in getTabPositionUpdateStmt", (Throwable)e);
            this._tabScnStmt.close();
            conn.close();
            return null;
        }
        return this._tabScnStmt;
    }

    private PreparedStatement getPositionsStmt() throws SQLException {
        if (this._getScnStmt != null) {
            return this._getScnStmt;
        }
        Connection conn = null;
        try {
            conn = this.getConnection();
            StringBuilder sql = new StringBuilder();
            sql.append("select p.logid, p.rid, a.logid, a.rid, l.maxrid, l.maxwindowscn ");
            sql.append("from bootstrap_sources s, bootstrap_producer_state p, bootstrap_applier_state a, bootstrap_loginfo l ");
            sql.append("where p.srcid = s.id and a.srcid = s.id and l.srcid = s.id and l.logid = a.logid and s.src = ?");
            this._getScnStmt = conn.prepareStatement(sql.toString());
        }
        catch (Exception e) {
            LOG.error((Object)"Error occured in getPositionsstatement", (Throwable)e);
            this._getScnStmt.close();
            conn.close();
            return null;
        }
        return this._getScnStmt;
    }

    private boolean reset(boolean recreate) {
        boolean success = false;
        this._retryTimer.reset();
        while (!success) {
            try {
                this._bootstrapDao.getBootstrapConn().close();
                DBHelper.close((Statement)this._getScnStmt);
                this._getScnStmt = null;
                DBHelper.close((Statement)this._tabScnStmt);
                this._tabScnStmt = null;
                this.closeApplyStatements();
                this._sourcePositions.close();
                if (recreate) {
                    this.getConnection();
                    this._bootstrapDao.getBootstrapConn().executeDummyBootstrapDBQuery();
                    this._sourcePositions.init();
                }
                success = true;
            }
            catch (SQLException sqlEx) {
                LOG.error((Object)"Unable to reset DBConnections in Applier", (Throwable)sqlEx);
                success = false;
                if (null != this._statsCollector) {
                    this._statsCollector.getTotalStats().registerSQLException();
                }
                if (this._retryTimer.getRemainingRetriesNum() <= 0) {
                    LOG.fatal((Object)"Applier Thread reached max retries trying to reset the MySQL Connections. Stopping !!");
                    break;
                }
                this._retryTimer.backoffAndSleep();
            }
        }
        return success;
    }

    private Connection getConnection() throws SQLException {
        Connection conn = null;
        BootstrapConn bsConn = null;
        if (this._bootstrapDao == null) {
            bsConn = new BootstrapConn();
            try {
                boolean autoCommit = false;
                bsConn.initBootstrapConn(false, 2, this._config.getBootstrapDBUsername(), this._config.getBootstrapDBPassword(), this._config.getBootstrapDBHostname(), this._config.getBootstrapDBName());
                this._bootstrapDao = new BootstrapDBMetaDataDAO(bsConn, this._config.getBootstrapDBHostname(), this._config.getBootstrapDBUsername(), this._config.getBootstrapDBPassword(), this._config.getBootstrapDBName(), false);
            }
            catch (SQLException e) {
                LOG.fatal((Object)"Unable to get Bootstrap DB Connection", (Throwable)e);
                throw e;
            }
            catch (Exception ex) {
                LOG.fatal((Object)"Unable to get Bootstrap DB Connection", (Throwable)ex);
                return null;
            }
        }
        try {
            conn = this._bootstrapDao.getBootstrapConn().getDBConn();
        }
        catch (SQLException sqlEx) {
            LOG.fatal((Object)"NOT able to get Bootstrap DB Connection", (Throwable)sqlEx);
            throw sqlEx;
        }
        return conn;
    }

    private String getLogTableName(int applyLogId, int srcid) throws SQLException {
        return this.getBootstrapConn().getLogTableName(applyLogId, srcid);
    }

    private String getSrcTableName(int srcid) throws SQLException, BootstrapDatabaseTooOldException {
        return this.getBootstrapConn().getSrcTableName(srcid);
    }

    private BootstrapConn getBootstrapConn() {
        return this._bootstrapDao.getBootstrapConn();
    }

    private void log(int srcid, Formatter logLine) {
        logLine.flush();
        String newLogLine = logLine.toString();
        boolean skipLog = true;
        int saveLastLogLineRepeat = this._lastLogLineRepeatCount;
        String lastLogLine = this._lastLogLine;
        if (newLogLine.equals(lastLogLine) && this._lastLogLineRepeatCount < 1000) {
            ++this._lastLogLineRepeatCount;
        } else {
            skipLog = false;
            lastLogLine = newLogLine;
            this._lastLogLineRepeatCount = 0;
        }
        if (!skipLog) {
            this._lastLogLine = lastLogLine;
            LOG.info((Object)("skipLog = false, last line repeated: " + saveLastLogLineRepeat));
            LOG.info((Object)("newLogLine = " + newLogLine));
        }
    }

    class sourcePositions {
        private int _srcid;
        private int _tabrid;
        private int _logrid;
        private int _applylogid;
        private int _producelogid;
        private int _logmaxrid;
        private long _logwindowscn;
        private PreparedStatement _applyStmt;
        private long _lastlogmaxscn;
        private final String _source;

        sourcePositions(String source) throws Exception {
            this._source = source;
            this.init();
        }

        public void init() throws SQLException {
            try {
                BootstrapApplierThread.this.getConnection();
                BootstrapDBMetaDataDAO.SourceStatusInfo srcIdStatus = BootstrapApplierThread.this._bootstrapDao.getSrcIdStatusFromDB(this._source, false);
                this._srcid = srcIdStatus.getSrcId();
                if (BootstrapApplierThread.this._config.isBootstrapDBStateCheck() && !BootstrapProducerStatus.isReadyForConsumption((int)srcIdStatus.getStatus())) {
                    throw new BootstrapDatabaseTooOldException("Bootstrap DB not ready to read events from relay, Status :" + srcIdStatus);
                }
                this.refresh();
                this._applyStmt = this.createApplyStatement();
                BootstrapApplierThread.this._minScn = BootstrapApplierThread.this._bootstrapDao.getMinScnOfSnapshots(new int[]{srcIdStatus.getSrcId()});
            }
            catch (BootstrapDatabaseTooOldException e) {
                throw new RuntimeException(e);
            }
        }

        public void close() {
            DBHelper.close((Statement)this._applyStmt);
            this._applyStmt = null;
        }

        void refresh() throws SQLException {
            ResultSet rs = null;
            try {
                PreparedStatement stmt = BootstrapApplierThread.this.getPositionsStmt();
                stmt.setString(1, this._source);
                rs = stmt.executeQuery();
                if (rs.next()) {
                    this._producelogid = rs.getInt(1);
                    this._logrid = rs.getInt(2);
                    this._applylogid = rs.getInt(3);
                    this._tabrid = rs.getInt(4);
                    this._logmaxrid = rs.getInt(5);
                    long currentLogMaxScn = rs.getLong(6);
                    if (currentLogMaxScn > 0L) {
                        if (this._lastlogmaxscn > currentLogMaxScn) {
                            throw new RuntimeException("_lastlogmaxscn=" + this._lastlogmaxscn + " currentLogMaxScn=" + currentLogMaxScn + " applylogid=" + this._applylogid + " producerlogid=" + this._producelogid + " tabrid=" + this._tabrid + " logrid=" + this._logrid + " logmaxrid=" + this._logmaxrid);
                        }
                        this._lastlogmaxscn = currentLogMaxScn;
                    }
                }
                rs.close();
            }
            catch (SQLException e) {
                LOG.error((Object)"Error occured during refresh of sourcePositions", (Throwable)e);
                if (null != rs) {
                    rs.close();
                    rs = null;
                }
                throw e;
            }
        }

        public String toString() {
            return "SrcId: " + this._srcid + " ProducerLogId: " + this._producelogid + " Logrid: " + this._logrid + " ApplyLogId: " + this._applylogid + " Tabrid: " + this._tabrid + " LogMaxrid: " + this._logmaxrid;
        }

        void save() throws SQLException {
            LOG.debug((Object)("Saving state " + this));
            this._logwindowscn = BootstrapApplierThread.this.getWindowScnforSource(this._srcid, this._applylogid, this._tabrid);
            BootstrapApplierThread.this.setTabPosition(this._srcid, this._applylogid, this._tabrid, this._logwindowscn);
            if (BootstrapApplierThread.this._minScn == -1L) {
                long newMinScn;
                long l = newMinScn = BootstrapApplierThread.this._bootstrapDao.isSeeded(this._srcid) ? 0L : BootstrapApplierThread.this._bootstrapDao.getMinWindowScnFromSnapshot(this._srcid, this._tabrid, 10);
                if (BootstrapApplierThread.this._minScn != newMinScn && newMinScn != -1L) {
                    LOG.info((Object)("Applier setting minScn=" + newMinScn));
                    if (newMinScn != 0L) {
                        --newMinScn;
                    }
                    BootstrapApplierThread.this._bootstrapDao.updateMinScnOfSnapshot(this._srcid, newMinScn);
                    BootstrapApplierThread.this._minScn = newMinScn;
                }
            }
        }

        public int getTabPos() {
            return this._tabrid;
        }

        public int getLogPos() {
            return this._logrid;
        }

        public int getSrcId() {
            return this._srcid;
        }

        public int getApplyId() {
            return this._applylogid;
        }

        public long getApplyWindowSCN() {
            return this._logwindowscn;
        }

        public PreparedStatement getApplyStmt() throws BootstrapDatabaseTooOldException, SQLException {
            if (null != this._applyStmt) {
                return this._applyStmt;
            }
            this._applyStmt = this.createApplyStatement();
            return this._applyStmt;
        }

        public applyBatch getNextApplyBatch() throws SQLException {
            int _torid = this._tabrid;
            if (this._applylogid == this._producelogid) {
                if (this._tabrid == this._logrid) {
                    this.refresh();
                } else {
                    _torid = Math.min(this._logrid, this._tabrid + 1000);
                }
            } else {
                if (this._logmaxrid == 0) {
                    this.refresh();
                }
                if (this._tabrid == this._logmaxrid) {
                    ++this._applylogid;
                    _torid = 0;
                    this._tabrid = 0;
                    this._logmaxrid = 0;
                    DBHelper.close((Statement)this._applyStmt);
                    this._applyStmt = null;
                } else {
                    _torid = Math.min(this._logmaxrid, this._tabrid + 1000);
                }
            }
            applyBatch nextBatch = new applyBatch(this._applylogid, this._tabrid, _torid);
            this._tabrid = _torid;
            return nextBatch;
        }

        private PreparedStatement createApplyStatement() throws SQLException, BootstrapDatabaseTooOldException {
            Connection conn = BootstrapApplierThread.this.getConnection();
            PreparedStatement applyStmt = null;
            StringBuilder sql = new StringBuilder();
            sql.append("insert into ");
            sql.append(BootstrapApplierThread.this.getSrcTableName(this._srcid));
            sql.append(" (scn, srckey, val) ");
            sql.append("select windowscn, srckey, val from ");
            sql.append(BootstrapApplierThread.this.getLogTableName(this._applylogid, this._srcid) + " B ");
            sql.append(" where B.id > ? and B.id <= ?");
            sql.append(" on duplicate key update scn = B.windowscn, srckey=B.srckey, val=B.val");
            applyStmt = conn.prepareStatement(sql.toString());
            LOG.info((Object)("Created apply statement: " + sql.toString()));
            return applyStmt;
        }
    }

    static class applyBatch {
        private final int _logid;
        private final int _fromrid;
        private final int _torid;

        applyBatch(int logid, int fromrid, int torid) {
            this._logid = logid;
            this._fromrid = fromrid;
            this._torid = torid;
        }

        public int getLogId() {
            return this._logid;
        }

        public int getFromrid() {
            return this._fromrid;
        }

        public int getTorid() {
            return this._torid;
        }

        public String toString() {
            return "LogId: " + this._logid + " From: " + this._fromrid + "  To: " + this._torid;
        }
    }
}

