/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.BasePullThread;
import com.linkedin.databus.client.CheckpointMessage;
import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.ConnectionState;
import com.linkedin.databus.client.ConnectionStateFactory;
import com.linkedin.databus.client.ConnectionStateMessage;
import com.linkedin.databus.client.DatabusBootstrapConnection;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.SourcesMessage;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.BootstrapCheckpointHandler;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.DbusEventInternalReadable;
import com.linkedin.databus.core.InvalidCheckpointException;
import com.linkedin.databus.core.InvalidEventException;
import com.linkedin.databus.core.PendingEventTooLargeException;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus.core.async.LifecycleMessage;
import com.linkedin.databus.core.util.IdNamePair;
import com.linkedin.databus2.core.BackoffTimer;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilter;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilterConfig;
import com.linkedin.databus2.core.filter.DbusKeyFilter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public class BootstrapPullThread
extends BasePullThread {
    public static final Short START_OF_SNAPSHOT_SRCID = -20001;
    public static final Short START_OF_CATCHUP_SRCID = -20002;
    public static final Short END_OF_BOOTSTRAP_SRCID = -20003;
    private static final EnumSet<ConnectionState.StateId> SHOULD_TEAR_CONNECTION = EnumSet.of(ConnectionState.StateId.START_SCN_REQUEST_SENT, new ConnectionState.StateId[]{ConnectionState.StateId.START_SCN_RESPONSE_SUCCESS, ConnectionState.StateId.START_SCN_REQUEST_ERROR, ConnectionState.StateId.START_SCN_RESPONSE_ERROR, ConnectionState.StateId.TARGET_SCN_REQUEST_SENT, ConnectionState.StateId.TARGET_SCN_RESPONSE_SUCCESS, ConnectionState.StateId.TARGET_SCN_REQUEST_ERROR, ConnectionState.StateId.TARGET_SCN_RESPONSE_ERROR, ConnectionState.StateId.STREAM_REQUEST_SENT, ConnectionState.StateId.STREAM_REQUEST_SUCCESS, ConnectionState.StateId.STREAM_REQUEST_ERROR, ConnectionState.StateId.STREAM_RESPONSE_ERROR, ConnectionState.StateId.BOOTSTRAP_DONE});
    private Checkpoint _resumeCkpt;
    private final RemoteExceptionHandler _remoteExceptionHandler;
    private DbusKeyCompositeFilter _bootstrapFilter;
    private final List<DbusKeyCompositeFilterConfig> _bootstrapFilterConfigs;
    private long numEventsInCurrentState = 0L;
    private final double _pullerBufferUtilizationPct;
    private DatabusBootstrapConnection _lastOpenConnection;
    private final BackoffTimer _retriesBeforeCkptCleanup;
    private ReentrantLock _v3BootstrapLock = null;

    public BootstrapPullThread(String name, DatabusSourcesConnection sourcesConn, DbusEventBuffer dbusEventBuffer, ConnectionStateFactory connStateFactory, Set<ServerInfo> bootstrapServers, List<DbusKeyCompositeFilterConfig> bootstrapFilterConfigs, double pullerBufferUtilPct, MBeanServer mbeanServer, DbusEventFactory eventFactory) {
        this(name, sourcesConn, dbusEventBuffer, connStateFactory, bootstrapServers, bootstrapFilterConfigs, pullerBufferUtilPct, mbeanServer, eventFactory, null, null);
    }

    public BootstrapPullThread(String name, DatabusSourcesConnection sourcesConn, DbusEventBuffer dbusEventBuffer, ConnectionStateFactory connStateFactory, Set<ServerInfo> bootstrapServers, List<DbusKeyCompositeFilterConfig> bootstrapFilterConfigs, double pullerBufferUtilPct, MBeanServer mbeanServer, DbusEventFactory eventFactory, ReentrantLock v3BootstrapLock, Logger log) {
        super(name, sourcesConn.getConnectionConfig().getBstPullerRetries(), sourcesConn, dbusEventBuffer, connStateFactory, bootstrapServers, mbeanServer, eventFactory, log);
        this._retriesBeforeCkptCleanup = new BackoffTimer("BSPullerRetriesBeforeCkptCleanup", sourcesConn.getConnectionConfig().getBsPullerRetriesBeforeCkptCleanup());
        this._bootstrapFilterConfigs = bootstrapFilterConfigs;
        this._remoteExceptionHandler = new RemoteExceptionHandler(sourcesConn, dbusEventBuffer, eventFactory);
        this._pullerBufferUtilizationPct = pullerBufferUtilPct;
        this._v3BootstrapLock = v3BootstrapLock;
    }

    @Override
    protected boolean shouldDelayTearConnection(ConnectionState.StateId stateId) {
        boolean delayTear = SHOULD_TEAR_CONNECTION.contains((Object)stateId);
        return delayTear;
    }

    @Override
    protected boolean executeAndChangeState(Object message) {
        boolean success = true;
        if (message instanceof ConnectionStateMessage) {
            if (this._componentStatus.getStatus() != DatabusComponentStatus.Status.RUNNING) {
                this._log.warn((Object)("not running: " + message.toString()));
            } else {
                ConnectionStateMessage stateMsg = (ConnectionStateMessage)message;
                ConnectionState currentState = stateMsg.getConnState();
                switch (stateMsg.getStateId()) {
                    case INITIAL: {
                        break;
                    }
                    case BOOTSTRAP_DONE: {
                        break;
                    }
                    case CLOSED: {
                        this.shutdown();
                        break;
                    }
                    case BOOTSTRAP: 
                    case PICK_SERVER: {
                        this.doPickBootstrapServer(currentState);
                        break;
                    }
                    case REQUEST_START_SCN: {
                        this.doRequestStartScn(currentState);
                        break;
                    }
                    case START_SCN_RESPONSE_SUCCESS: {
                        this.doStartScnResponseSuccess(currentState);
                        break;
                    }
                    case REQUEST_TARGET_SCN: {
                        this.doRequestTargetScn(currentState);
                        break;
                    }
                    case TARGET_SCN_RESPONSE_SUCCESS: {
                        this.doTargetScnResponseSuccess(currentState);
                        break;
                    }
                    case REQUEST_STREAM: {
                        this.doRequestBootstrapStream(currentState);
                        break;
                    }
                    case STREAM_REQUEST_SUCCESS: {
                        this.doReadBootstrapEvents(currentState);
                        break;
                    }
                    case STREAM_RESPONSE_DONE: {
                        this.doStreamResponseDone(currentState);
                        break;
                    }
                    case STREAM_REQUEST_ERROR: {
                        this.processStreamRequestError(currentState);
                        break;
                    }
                    case STREAM_RESPONSE_ERROR: {
                        this.processStreamResponseError(currentState);
                        break;
                    }
                    case START_SCN_REQUEST_ERROR: {
                        this.processStartScnRequestError(currentState);
                        break;
                    }
                    case START_SCN_RESPONSE_ERROR: {
                        this.processStartScnResponseError(currentState);
                        break;
                    }
                    case TARGET_SCN_REQUEST_ERROR: {
                        this.processTargetScnRequestError(currentState);
                        break;
                    }
                    case TARGET_SCN_RESPONSE_ERROR: {
                        this.processTargetScnResponseError(currentState);
                        break;
                    }
                    default: {
                        this._log.error((Object)("Unknown state in BootstrapPullThread: " + (Object)((Object)currentState.getStateId())));
                        success = false;
                        break;
                    }
                }
            }
        } else if (message instanceof CheckpointMessage) {
            CheckpointMessage cpMessage = (CheckpointMessage)message;
            switch (cpMessage.getTypeId()) {
                case SET_CHECKPOINT: {
                    this.doSetResumeCheckpoint(cpMessage);
                    break;
                }
                default: {
                    this._log.error((Object)("Unkown CheckpointMessage in BootstrapPullThread: " + cpMessage.getTypeId()));
                    success = false;
                    break;
                }
            }
        } else if (message instanceof SourcesMessage) {
            SourcesMessage sourcesMessage = (SourcesMessage)message;
            switch (sourcesMessage.getTypeId()) {
                case SET_SOURCES_IDS: {
                    this.doSetSourcesIds(sourcesMessage);
                    break;
                }
                case SET_SOURCES_SCHEMAS: {
                    this.doSetSourcesSchemas(sourcesMessage);
                    break;
                }
                default: {
                    this._log.error((Object)("Unkown CheckpointMessage in BootstrapPullThread: " + (Object)((Object)sourcesMessage.getTypeId())));
                    success = false;
                    break;
                }
            }
        } else {
            success = super.executeAndChangeState(message);
        }
        return success;
    }

    private void doSetSourcesSchemas(SourcesMessage sourcesMessage) {
        Set<Long> curIds;
        Set<Long> newIds;
        if (null != this._currentState.getSourcesSchemas() && !(newIds = sourcesMessage.getSourcesSchemas().keySet()).containsAll(curIds = this._currentState.getSourcesSchemas().keySet())) {
            String msg = "Expected schemas for sources " + curIds + "; got: " + newIds;
            this._log.error((Object)msg);
            this._currentState.switchToClosed();
            this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)new DatabusException(msg)));
            return;
        }
        this._currentState.setSourcesSchemas(sourcesMessage.getSourcesSchemas());
        this._sourcesConn.getBootstrapDispatcher().enqueueMessage(sourcesMessage);
    }

    private void doSetSourcesIds(SourcesMessage sourcesMessage) {
        this._currentState.setSourcesIds(sourcesMessage.getSources());
        this._currentState.setSourcesIdListString(sourcesMessage.getSourcesIdListString());
        this._sourcesConn.getBootstrapDispatcher().enqueueMessage(sourcesMessage);
    }

    private void doSetResumeCheckpoint(CheckpointMessage cpMessage) {
        this._resumeCkpt = cpMessage.getCheckpoint();
        if (null != this._resumeCkpt) {
            boolean success;
            DbusEventInternalReadable cpEvent = this.getEventFactory().createCheckpointEvent(this._resumeCkpt);
            try {
                success = this._currentState.getDataEventsBuffer().injectEvent(cpEvent);
            }
            catch (InvalidEventException e) {
                this._log.error((Object)("unable to create checkpoint event for checkpoint " + this._resumeCkpt + "; error: " + (Object)((Object)e)), (Throwable)e);
                success = false;
            }
            if (!success) {
                this._log.error((Object)"Unable to write bootstrap phase marker");
            }
        }
        this._log.info((Object)("resume checkpoint: " + this._resumeCkpt));
    }

    protected void doStart(LifecycleMessage lcMessage) {
        this.lockV3Bootstrap();
        super.doStart(lcMessage);
        this._currentState.clearBootstrapState();
        this._currentState.switchToPickServer();
        this.enqueueMessage(this._currentState);
    }

    protected void doResume(LifecycleMessage lcMessage) {
        this.lockV3Bootstrap();
        super.doResume(lcMessage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onShutdown() {
        try {
            if (null != this._lastOpenConnection) {
                this._log.info((Object)"closing open connection");
                this._lastOpenConnection.close();
                this._lastOpenConnection = null;
            }
        }
        finally {
            this.unlockV3Bootstrap(true);
        }
        this._log.info((Object)"shutdown complete.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doPause(LifecycleMessage lcMessage) {
        try {
            super.doPause(lcMessage);
        }
        finally {
            this.unlockV3Bootstrap();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doSuspendOnError(LifecycleMessage lcMessage) {
        try {
            super.doSuspendOnError(lcMessage);
            BootstrapPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats(), -1L);
        }
        finally {
            this.unlockV3Bootstrap();
        }
    }

    protected void onResume() {
        this._currentState.switchToPickServer();
        this.enqueueMessage(this._currentState);
    }

    protected void doPickBootstrapServer(ConnectionState curState) {
        int serversNum = this._servers.size();
        if (0 == serversNum) {
            this._sourcesConn.getConnectionStatus().suspendOnError((Throwable)new DatabusException("No bootstrap services specified"));
            return;
        }
        if (null == this._resumeCkpt) {
            this._sourcesConn.getConnectionStatus().suspendOnError((Throwable)new DatabusException("Bootstrapping checkpoint is not set!"));
            return;
        }
        boolean restartBootstrap = false;
        String bsServerInfo = this._resumeCkpt.getBootstrapServerInfo();
        ServerInfo lastReadBS = null;
        if (null != bsServerInfo) {
            try {
                lastReadBS = ServerInfo.buildServerInfoFromHostPort((String)bsServerInfo, (String)":");
            }
            catch (Exception ex) {
                this._log.error((Object)("Unable to fetch bootstrap serverInfo from checkpoint, ServerInfo :" + bsServerInfo), (Throwable)ex);
            }
        }
        if (null == lastReadBS) {
            restartBootstrap = true;
        }
        int retriesLeft = 0;
        DatabusBootstrapConnection bootstrapConn = null;
        ServerInfo serverInfo = lastReadBS;
        if (!restartBootstrap) {
            while (null == bootstrapConn && (retriesLeft = this._retriesBeforeCkptCleanup.getRemainingRetriesNum()) >= 0 && !this.checkForShutdownRequest()) {
                this._log.info((Object)("Retry picking last used bootstrap server :" + serverInfo + "; retries left:" + retriesLeft));
                if (lastReadBS.equals((Object)this._curServer)) {
                    this._retriesBeforeCkptCleanup.backoffAndSleep();
                }
                try {
                    bootstrapConn = this._sourcesConn.getBootstrapConnFactory().createConnection(serverInfo, (ActorMessageQueue)this, this._remoteExceptionHandler);
                    this._log.info((Object)("picked last used bootstrap server:" + serverInfo));
                }
                catch (Exception e) {
                    this._log.error((Object)("Unable to get connection to bootstrap server:" + serverInfo), (Throwable)e);
                }
            }
            if (null == bootstrapConn && this._retriesBeforeCkptCleanup.getRemainingRetriesNum() < 0) {
                this._log.info((Object)("Exhausted retrying the same bootstrap server :" + lastReadBS));
            }
        }
        if (this.checkForShutdownRequest()) {
            this._log.info((Object)"Shutting down bootstrap");
            return;
        }
        Random rng = new Random();
        if (null == bootstrapConn) {
            this._log.info((Object)"Restarting bootstrap as client might be getting bootstrap data from different server instance !!");
            this._log.info((Object)("Old Checkpoint :" + this._resumeCkpt));
            curState.getBstCheckpointHandler().resetForServerChange(this._resumeCkpt);
            this._log.info((Object)("New Checkpoint :" + this._resumeCkpt));
            this._retriesBeforeCkptCleanup.reset();
        } else {
            this._curServer = serverInfo;
        }
        while (null == bootstrapConn && (retriesLeft = this._status.getRetriesLeft()) >= 0 && !this.checkForShutdownRequest()) {
            this._log.info((Object)("picking a bootstrap server; retries left:" + retriesLeft));
            this.backoffOnPullError();
            this._curServerIdx = this._curServerIdx < 0 ? rng.nextInt(serversNum) : (this._curServerIdx + 1) % serversNum;
            Iterator setIter = this._servers.iterator();
            for (int i = 0; i <= this._curServerIdx; ++i) {
                serverInfo = (ServerInfo)setIter.next();
            }
            this._curServer = serverInfo;
            try {
                bootstrapConn = this._sourcesConn.getBootstrapConnFactory().createConnection(serverInfo, (ActorMessageQueue)this, this._remoteExceptionHandler);
                this._log.info((Object)("picked a bootstrap server:" + serverInfo.toSimpleString()));
            }
            catch (Exception e) {
                this._log.error((Object)("Unable to get connection to bootstrap server:" + serverInfo), (Throwable)e);
            }
        }
        DatabusBootstrapConnection oldBootstrapConn = curState.getBootstrapConnection();
        if (null != oldBootstrapConn) {
            this.resetConnectionAndSetFlag();
        }
        this._lastOpenConnection = bootstrapConn;
        if (this.checkForShutdownRequest()) {
            return;
        }
        if (null == bootstrapConn) {
            this._log.error((Object)"bootstrap server retries exhausted");
            this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)new DatabusException("bootstrap server retries exhausted")));
            return;
        }
        BootstrapPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
        curState.bootstrapServerSelected(serverInfo.getAddress(), bootstrapConn, this._curServer);
        this._log.info((Object)("resuming bootstrap from checkpoint: " + this._resumeCkpt));
        curState.setCheckpoint(this._resumeCkpt);
        this.determineNextStateFromCheckpoint(curState);
        this.enqueueMessage(curState);
    }

    private void doRequestTargetScn(ConnectionState curState) {
        this._log.debug((Object)"Sending /targetScn request");
        curState.switchToTargetScnRequestSent();
        BootstrapPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
        curState.getBootstrapConnection().requestTargetScn(curState.getCheckpoint(), curState);
    }

    protected void doTargetScnResponseSuccess(ConnectionState curState) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            Checkpoint cp = curState.getCheckpoint();
            curState.getBstCheckpointHandler().advanceAfterSnapshotPhase(cp);
            curState.getBstCheckpointHandler().advanceAfterTargetScn(cp);
            curState.switchToRequestStream(curState.getCheckpoint());
            this.enqueueMessage(curState);
        }
    }

    private void doRequestStartScn(ConnectionState curState) {
        this._log.debug((Object)"Sending /startScn request");
        String sourceNames = curState.getSourcesNameList();
        curState.switchToStartScnRequestSent();
        BootstrapPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
        curState.getBootstrapConnection().requestStartScn(curState.getCheckpoint(), curState, sourceNames);
    }

    private void doStartScnResponseSuccess(ConnectionState curState) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            ServerInfo bsServerInfo = curState.getCurrentBSServerInfo();
            if (null == bsServerInfo) {
                String msg = "Bootstrap Server did not provide its server info in StartSCN !! Switching to PICK_SERVER. CurrentServer :" + this._curServer;
                this._log.error((Object)msg);
                curState.switchToStartScnResponseError();
            } else if (!bsServerInfo.equals((Object)this._curServer)) {
                this._log.info((Object)("Bootstrap server responded and current server does not match. Switching to Pick Server !!  curServer: " + this._curServer + ", Responded Server :" + bsServerInfo));
                this._log.info((Object)("Checkpoint before clearing :" + this._resumeCkpt));
                String bsServerInfoStr = this._resumeCkpt.getBootstrapServerInfo();
                Long startScn = this._resumeCkpt.getBootstrapStartScn();
                curState.getBstCheckpointHandler().resetForServerChange(this._resumeCkpt);
                curState.getBstCheckpointHandler().setStartScnAfterServerChange(this._resumeCkpt, startScn);
                this._resumeCkpt.setBootstrapServerInfo(bsServerInfoStr);
                this._log.info((Object)("Checkpoint after clearing :" + this._resumeCkpt));
                curState.switchToPickServer();
            } else {
                curState.switchToRequestStream(curState.getCheckpoint());
            }
            this.enqueueMessage(curState);
        }
    }

    protected void doRequestBootstrapStream(ConnectionState curState) {
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Checking for free space");
        }
        int freeBufferThreshold = (int)((double)this._sourcesConn.getConnectionConfig().getFreeBufferThreshold() * 100.0 / this._pullerBufferUtilizationPct);
        int freeSpace = curState.getDataEventsBuffer().getBufferFreeReadSpace();
        if (freeSpace >= freeBufferThreshold) {
            Map fMap;
            Checkpoint cp = curState.getCheckpoint();
            if (debugEnabled) {
                this._log.debug((Object)("Checkpoint at RequestBootstrapData: " + cp.toString()));
            }
            this._log.debug((Object)"Sending /bootstrap request");
            Map<String, IdNamePair> srcNameMap = curState.getSourcesNameMap();
            String curSrcName = null;
            curSrcName = cp.getConsumptionMode() == DbusClientMode.BOOTSTRAP_SNAPSHOT ? cp.getSnapshotSource() : cp.getCatchupSource();
            if (null == this._bootstrapFilter) {
                this._bootstrapFilter = new DbusKeyCompositeFilter();
                Map<String, IdNamePair> srcNameIdMap = curState.getSourcesNameMap();
                for (DbusKeyCompositeFilterConfig conf : this._bootstrapFilterConfigs) {
                    Map cMap = conf.getConfigMap();
                    HashMap fConfMap = new HashMap();
                    for (Map.Entry e : cMap.entrySet()) {
                        IdNamePair idName = srcNameIdMap.get(e.getKey());
                        if (null == idName) continue;
                        fConfMap.put(idName.getId(), e.getValue());
                    }
                    this._bootstrapFilter.merge(new DbusKeyCompositeFilter(fConfMap));
                }
                this._bootstrapFilter.dedupe();
            }
            DbusKeyFilter filter = null;
            IdNamePair srcEntry = srcNameMap.get(curSrcName);
            if (null != srcEntry && null != (fMap = this._bootstrapFilter.getFilterMap())) {
                filter = (DbusKeyFilter)fMap.get(srcEntry.getId());
            }
            int fetchSize = (int)((double)curState.getDataEventsBuffer().getBufferFreeReadSpace() / 100.0 * this._pullerBufferUtilizationPct);
            fetchSize = Math.max(freeBufferThreshold, fetchSize);
            curState.switchToStreamRequestSent();
            BootstrapPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
            curState.getBootstrapConnection().requestStream(curState.getSourcesIdListString(), filter, fetchSize, cp, curState);
        } else {
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException ie) {
                // empty catch block
            }
            this.enqueueMessage(curState);
        }
    }

    protected void doReadBootstrapEvents(ConnectionState curState) {
        boolean success = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        boolean enqueueMessage = true;
        try {
            Checkpoint cp = curState.getCheckpoint();
            DbusEventBuffer eventBuffer = curState.getDataEventsBuffer();
            if (debugEnabled) {
                this._log.debug((Object)"Sending bootstrap events to buffer");
            }
            DbusEventInternalReadable cpEvent = this.getEventFactory().createCheckpointEvent(cp);
            byte[] cpEventBytes = new byte[cpEvent.size()];
            if (debugEnabled) {
                this._log.debug((Object)("checkpoint event size: " + cpEventBytes.length));
                this._log.debug((Object)("checkpoint event:" + cpEvent.toString()));
            }
            cpEvent.getRawBytes().get(cpEventBytes);
            ByteArrayInputStream cpIs = new ByteArrayInputStream(cpEventBytes);
            ReadableByteChannel cpRbc = Channels.newChannel(cpIs);
            UnifiedClientStats unifiedClientStats = this._sourcesConn.getUnifiedClientStats();
            BootstrapPullThread.sendHeartbeat(unifiedClientStats);
            int ecnt = eventBuffer.readEvents(cpRbc);
            boolean bl = success = ecnt > 0;
            if (!success) {
                this._log.error((Object)"Unable to write bootstrap phase marker");
            } else {
                ChunkedBodyReadableByteChannel readChannel = curState.getReadChannel();
                String remoteErrorName = RemoteExceptionHandler.getExceptionName(readChannel);
                Throwable remoteError = this._remoteExceptionHandler.getException(readChannel);
                if (null != remoteError && remoteError instanceof BootstrapDatabaseTooOldException) {
                    this._log.error((Object)"Bootstrap database is too old!");
                    this._remoteExceptionHandler.handleException(remoteError);
                    curState.switchToStreamResponseError();
                } else if (null != remoteErrorName) {
                    this._log.error((Object)("read events error: " + RemoteExceptionHandler.getExceptionMessage(readChannel)));
                    curState.switchToStreamResponseError();
                } else {
                    BootstrapPullThread.sendHeartbeat(unifiedClientStats);
                    int eventsNum = eventBuffer.readEvents((ReadableByteChannel)readChannel, curState.getListeners(), this._sourcesConn.getBootstrapEventsStatsCollector());
                    if (eventsNum == 0 && this._remoteExceptionHandler.getPendingEventSize(readChannel) > eventBuffer.getMaxReadBufferCapacity()) {
                        String err = "ReadBuffer max capacity(" + eventBuffer.getMaxReadBufferCapacity() + ") is less than event size(" + this._remoteExceptionHandler.getPendingEventSize(readChannel) + "). Increase databus.client.connectionDefaults.bstEventBuffer.maxEventSize and restart.";
                        this._log.fatal((Object)err);
                        this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)new PendingEventTooLargeException(err)));
                        return;
                    }
                    this.resetServerRetries();
                    if (debugEnabled) {
                        this._log.debug((Object)"Sending events to buffer");
                    }
                    this.numEventsInCurrentState += (long)eventsNum;
                    this._log.info((Object)("Bootstrap events read so far: " + this.numEventsInCurrentState));
                    String status = readChannel.getMetadata("PhaseCompleted");
                    BootstrapCheckpointHandler ckptHandler = curState.getBstCheckpointHandler();
                    if (status != null) {
                        if (cp.getConsumptionMode() == DbusClientMode.BOOTSTRAP_CATCHUP) {
                            ckptHandler.finalizeCatchupPhase(cp);
                        } else if (cp.getConsumptionMode() == DbusClientMode.BOOTSTRAP_SNAPSHOT) {
                            ckptHandler.finalizeSnapshotPhase(cp);
                        } else {
                            throw new RuntimeException("Invalid bootstrap phase: " + cp.getConsumptionMode());
                        }
                        this._log.info((Object)("Bootstrap events read :" + this.numEventsInCurrentState + " during phase:" + cp.getConsumptionMode() + " [" + cp.getBootstrapSnapshotSourceIndex() + "," + cp.getBootstrapCatchupSourceIndex() + "]"));
                        this.numEventsInCurrentState = 0L;
                    } else if (eventsNum > 0) {
                        cp.bootstrapCheckPoint();
                    }
                    curState.switchToStreamResponseDone();
                }
            }
        }
        catch (InterruptedException ie) {
            this._log.error((Object)"interupted", (Throwable)ie);
            success = false;
        }
        catch (InvalidEventException e) {
            this._log.error((Object)("error reading events from server: " + e.getMessage()), (Throwable)e);
            success = false;
        }
        catch (RuntimeException e) {
            this._log.error((Object)("runtime error reading events from server: " + e.getMessage()), (Throwable)e);
            success = false;
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
            enqueueMessage = false;
        } else if (!success) {
            curState.switchToPickServer();
        }
        if (enqueueMessage) {
            this.enqueueMessage(curState);
        }
    }

    protected void doStreamResponseDone(ConnectionState curState) {
        boolean debugEnabled = this._log.isDebugEnabled();
        Checkpoint cp = curState.getCheckpoint();
        if (debugEnabled) {
            this._log.debug((Object)("Checkpoint at EventsDone: " + cp));
        }
        this.determineNextStateFromCheckpoint(curState);
        this._retriesBeforeCkptCleanup.reset();
        this.enqueueMessage(curState);
    }

    protected void processBootstrapComplete(Checkpoint cp, ConnectionState curState) throws IOException, DatabusException {
        this.logBootstrapPhase(DbusClientMode.BOOTSTRAP_CATCHUP, cp.getBootstrapSnapshotSourceIndex(), cp.getBootstrapCatchupSourceIndex());
        this._log.info((Object)("Bootstrap got completed !! Checkpoint is :" + cp.toString()));
        cp.setWindowScn(cp.getBootstrapTargetScn());
        cp.setPrevScn(cp.getBootstrapTargetScn());
        cp.setConsumptionMode(DbusClientMode.ONLINE_CONSUMPTION);
        cp.resetBootstrap();
        DbusEventBuffer eventBuffer = curState.getDataEventsBuffer();
        try {
            DbusEventInternalReadable cpEvent = this.getEventFactory().createCheckpointEvent(cp);
            boolean success = eventBuffer.injectEvent(cpEvent);
            if (!success) {
                this._log.error((Object)"Unable to write bootstrap phase marker");
            } else {
                DbusEventInternalReadable eopEvent = curState.createEopEvent(cp, this.getEventFactory());
                success = eventBuffer.injectEvent(eopEvent);
                if (!success) {
                    this._log.error((Object)"Unable to write bootstrap EOP marker");
                }
            }
        }
        catch (InvalidEventException iee) {
            this._log.error((Object)"Unable to write bootstrap phase marker", (Throwable)iee);
        }
        this.unlockV3Bootstrap();
    }

    protected void sendErrorEventToDispatcher(ConnectionState curState) {
    }

    private void logBootstrapPhase(DbusClientMode mode, int snapshotSrcId, int catchupSrcId) {
        this._log.info((Object)("Bootstrap phase completed - " + mode + " [" + snapshotSrcId + ", " + catchupSrcId + "]"));
    }

    private void processStreamRequestError(ConnectionState state) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processStreamResponseError(ConnectionState state) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processTargetScnResponseError(ConnectionState currentState) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            currentState.switchToPickServer();
            this.enqueueMessage(currentState);
        }
    }

    private void processTargetScnRequestError(ConnectionState currentState) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            currentState.switchToPickServer();
            this.enqueueMessage(currentState);
        }
    }

    private void processStartScnResponseError(ConnectionState currentState) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            currentState.switchToPickServer();
            this.enqueueMessage(currentState);
        }
    }

    private void processStartScnRequestError(ConnectionState currentState) {
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            currentState.switchToPickServer();
            this.enqueueMessage(currentState);
        }
    }

    @Override
    protected void resetConnection() {
        DatabusBootstrapConnection bootstrapConnection = this._currentState.getBootstrapConnection();
        if (null != bootstrapConnection) {
            bootstrapConnection.close();
            this._currentState.setBootstrapConnection(null);
        }
    }

    protected BackoffTimer getRetriesBeforeCkptCleanup() {
        return this._retriesBeforeCkptCleanup;
    }

    private void determineNextStateFromCheckpoint(ConnectionState curState) {
        try {
            Checkpoint cp = curState.getCheckpoint();
            BootstrapCheckpointHandler cpHandler = curState.getBstCheckpointHandler();
            cpHandler.assertBootstrapCheckpoint(cp);
            switch (cp.getConsumptionMode()) {
                case BOOTSTRAP_SNAPSHOT: {
                    this.determineNextStateFromSnapshotCheckpoint(cp, cpHandler, curState);
                    break;
                }
                case BOOTSTRAP_CATCHUP: {
                    this.determineNextStateFromCatchupCheckpoint(cp, cpHandler, curState);
                    break;
                }
                default: {
                    this._log.fatal((Object)("unexpected bootstrap checkpoint type: " + cp + "; shutting down"));
                    curState.switchToClosed();
                    break;
                }
            }
        }
        catch (InvalidCheckpointException e) {
            this._log.fatal((Object)"invalid bootstrap checkpoint:", (Throwable)e);
            curState.switchToClosed();
        }
    }

    private void determineNextStateFromSnapshotCheckpoint(Checkpoint cp, BootstrapCheckpointHandler cpHandler, ConnectionState curState) {
        if (!cp.isBootstrapStartScnSet()) {
            curState.switchToRequestStartScn(cp);
        } else if (!cp.isSnapShotSourceCompleted()) {
            curState.switchToRequestStream(cp);
        } else {
            this.logBootstrapPhase(DbusClientMode.BOOTSTRAP_SNAPSHOT, cp.getBootstrapSnapshotSourceIndex(), cp.getBootstrapCatchupSourceIndex());
            curState.switchToRequestTargetScn(cp);
        }
    }

    private void determineNextStateFromCatchupCheckpoint(Checkpoint cp, BootstrapCheckpointHandler cpHandler, ConnectionState curState) {
        if (!cp.isCatchupSourceCompleted()) {
            curState.switchToRequestStream(cp);
        } else {
            this.logBootstrapPhase(DbusClientMode.BOOTSTRAP_CATCHUP, cp.getBootstrapSnapshotSourceIndex(), cp.getBootstrapCatchupSourceIndex());
            cpHandler.advanceAfterCatchupPhase(cp);
            if (cpHandler.needsMoreCatchup(cp)) {
                curState.switchToRequestStream(cp);
            } else if (cpHandler.needsMoreSnapshot(cp)) {
                curState.switchToRequestStream(cp);
            } else {
                DbusEventInternalReadable eopEvent = this.getEventFactory().createLongKeyEOPEvent(cp.getBootstrapTargetScn().longValue(), (short)0);
                try {
                    boolean success = curState.getDataEventsBuffer().injectEvent(eopEvent);
                    if (success) {
                        try {
                            this.processBootstrapComplete(cp, curState);
                            curState.switchToBootstrapDone();
                        }
                        catch (IOException e) {
                            this._log.error((Object)"Unable to persist checkpoint at the end of bootstrap", (Throwable)e);
                            curState.switchToPickServer();
                        }
                        catch (DatabusException e) {
                            this._log.error((Object)"Unable to complete bootstrap", (Throwable)e);
                            curState.switchToPickServer();
                        }
                    } else {
                        this._log.error((Object)"Unable to write bootstrap EOP marker");
                        curState.switchToPickServer();
                    }
                }
                catch (InvalidEventException e1) {
                    this._log.error((Object)"Unable to write bootstrap EOP marker", (Throwable)e1);
                }
            }
        }
    }

    private void lockV3Bootstrap() {
        if (null != this._v3BootstrapLock) {
            if (this._v3BootstrapLock.isHeldByCurrentThread()) {
                this._log.warn((Object)("lockV3Bootstrap is a no-op as the thread is already owner of bootstrap lock. Lock state = " + this._v3BootstrapLock.toString()));
                return;
            }
            this._log.info((Object)("Waiting for bootstrap lock " + ((Object)((Object)this)).toString()));
            this._v3BootstrapLock.lock();
            this._log.info((Object)("Obtained the bootstrap lock " + ((Object)((Object)this)).toString()));
        }
    }

    private void unlockV3Bootstrap(boolean shutdownCase) {
        if (null != this._v3BootstrapLock) {
            if (!this._v3BootstrapLock.isHeldByCurrentThread()) {
                String errMsg = "unlockV3Bootstrap is a no-op as current thread is NOT owner of bootstrap lock. Lock state = " + this._v3BootstrapLock.toString();
                if (shutdownCase) {
                    this._log.info((Object)errMsg);
                } else {
                    this._log.warn((Object)errMsg);
                }
                return;
            }
            this._v3BootstrapLock.unlock();
            this._log.info((Object)("Unlocked BootstrapPuller " + ((Object)((Object)this)).toString()));
        }
    }

    private void unlockV3Bootstrap() {
        this.unlockV3Bootstrap(false);
    }

    protected ReentrantLock getV3BootstrapLock() {
        return this._v3BootstrapLock;
    }
}

