/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.BasePullThread;
import com.linkedin.databus.client.BootstrapResultMessage;
import com.linkedin.databus.client.CheckpointMessage;
import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.ConnectionState;
import com.linkedin.databus.client.ConnectionStateFactory;
import com.linkedin.databus.client.ConnectionStateMessage;
import com.linkedin.databus.client.DatabusRelayConnection;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.SourcesMessage;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.CheckpointMult;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.DbusEventInternalReadable;
import com.linkedin.databus.core.InvalidEventException;
import com.linkedin.databus.core.PendingEventTooLargeException;
import com.linkedin.databus.core.PullerRetriesExhaustedException;
import com.linkedin.databus.core.SCNRegressMessage;
import com.linkedin.databus.core.ScnNotFoundException;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus.core.async.LifecycleMessage;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.data_model.LogicalSource;
import com.linkedin.databus.core.data_model.LogicalSourceId;
import com.linkedin.databus.core.data_model.PhysicalPartition;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.IdNamePair;
import com.linkedin.databus2.core.BackoffTimer;
import com.linkedin.databus2.core.BackoffTimerStaticConfig;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.monitoring.mbean.HttpStatisticsCollector;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import com.linkedin.databus2.core.container.request.RegisterResponseEntry;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilter;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilterConfig;
import java.io.ByteArrayInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public class RelayPullThread
extends BasePullThread {
    private static final int RELAY_CALLS_MERGE_FREQ = 10;
    private static final ArrayList<RegisterResponseEntry> EMPTY_REGISTER_LIST = new ArrayList();
    private static final ArrayList<Integer> EMPTY_STREAM_LIST = new ArrayList();
    private final boolean _isConsumeCurrent;
    private final HttpStatisticsCollector _relayCallsStats;
    private DatabusRelayConnection _lastOpenConnection;
    private DbusKeyCompositeFilter _relayFilter;
    private final List<DbusKeyCompositeFilterConfig> _relayFilterConfigs;
    private int _unmergedHttpCallsStats = 0;
    private long _streamCallStartMs = 0L;
    private long _noEventsConnectionResetTimeSec;
    private long _timeSinceEventsSec = System.currentTimeMillis();
    private final BackoffTimer _retriesOnFallOff;
    private final RemoteExceptionHandler _remoteExceptionHandler;
    private final boolean _isReadLatestScnOnErrorEnabled;
    private final double _pullerBufferUtilizationPct;

    public RelayPullThread(String name, DatabusSourcesConnection sourcesConn, DbusEventBuffer dbusEventBuffer, ConnectionStateFactory connStateFactory, Set<ServerInfo> relays, List<DbusKeyCompositeFilterConfig> relayFilterConfigs, boolean isConsumeCurrent, boolean isReadLatestScnOnErrorEnabled, double pullerBufferUtilPct, int noEventsConnectionResetTimeSec, MBeanServer mbeanServer, DbusEventFactory eventFactory, Logger log) {
        super(name, sourcesConn.getConnectionConfig().getPullerRetries(), sourcesConn, dbusEventBuffer, connStateFactory, relays, mbeanServer, eventFactory, log);
        this._relayFilterConfigs = relayFilterConfigs;
        this._isConsumeCurrent = isConsumeCurrent;
        this._remoteExceptionHandler = new RemoteExceptionHandler(sourcesConn, dbusEventBuffer, eventFactory);
        this._relayCallsStats = this._sourcesConn.getLocalRelayCallsStatsCollector();
        this._isReadLatestScnOnErrorEnabled = isReadLatestScnOnErrorEnabled;
        this._pullerBufferUtilizationPct = pullerBufferUtilPct;
        this._retriesOnFallOff = new BackoffTimer("RetriesOnFallOff", new BackoffTimerStaticConfig(0L, 0L, 1.0, 0L, sourcesConn.getConnectionConfig().getNumRetriesOnFallOff()));
        this._noEventsConnectionResetTimeSec = noEventsConnectionResetTimeSec;
    }

    protected void onResume() {
        this._currentState.switchToPickServer();
        this.enqueueMessage(this._currentState);
    }

    protected void onShutdown() {
        if (null != this._lastOpenConnection) {
            this._log.info((Object)"closing open connection during onShutdown()");
            this._lastOpenConnection.close();
            this._lastOpenConnection = null;
            this._log.info((Object)"Closed open connection during onShutdown()");
        }
    }

    protected void doSuspendOnError(LifecycleMessage lcMessage) {
        super.doSuspendOnError(lcMessage);
        RelayPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats(), -1L);
    }

    @Override
    protected boolean shouldDelayTearConnection(ConnectionState.StateId stateId) {
        boolean delayTear = false;
        switch (stateId) {
            case SOURCES_REQUEST_SENT: 
            case SOURCES_RESPONSE_SUCCESS: 
            case SOURCES_RESPONSE_ERROR: 
            case SOURCES_REQUEST_ERROR: 
            case REGISTER_REQUEST_SENT: 
            case REGISTER_RESPONSE_SUCCESS: 
            case REGISTER_RESPONSE_ERROR: 
            case REGISTER_REQUEST_ERROR: 
            case STREAM_REQUEST_SENT: 
            case STREAM_REQUEST_SUCCESS: 
            case STREAM_RESPONSE_ERROR: 
            case STREAM_REQUEST_ERROR: 
            case BOOTSTRAP_REQUESTED: {
                delayTear = true;
                break;
            }
            default: {
                delayTear = false;
            }
        }
        return delayTear;
    }

    @Override
    protected boolean executeAndChangeState(Object message) {
        boolean success = true;
        if (message instanceof ConnectionStateMessage) {
            if (this._componentStatus.getStatus() != DatabusComponentStatus.Status.RUNNING) {
                this._log.warn((Object)(" not running: " + message.toString()));
            } else {
                ConnectionStateMessage stateMsg = (ConnectionStateMessage)message;
                ConnectionState currentState = stateMsg.getConnState();
                switch (stateMsg.getStateId()) {
                    case INITIAL: {
                        break;
                    }
                    case CLOSED: {
                        this.shutdown();
                        break;
                    }
                    case PICK_SERVER: {
                        this.doPickRelay(currentState);
                        break;
                    }
                    case REQUEST_SOURCES: {
                        this.doRequestSources(currentState);
                        break;
                    }
                    case SOURCES_RESPONSE_SUCCESS: {
                        this.doSourcesResponseSuccess(currentState);
                        break;
                    }
                    case REQUEST_REGISTER: {
                        this.doRequestRegister(currentState);
                        break;
                    }
                    case REGISTER_RESPONSE_SUCCESS: {
                        this.doRegisterResponseSuccess(currentState);
                        break;
                    }
                    case BOOTSTRAP: {
                        this.doBootstrap(currentState);
                        break;
                    }
                    case REQUEST_STREAM: {
                        this.doRequestStream(currentState);
                        break;
                    }
                    case STREAM_REQUEST_SUCCESS: {
                        this.doReadDataEvents(currentState);
                        break;
                    }
                    case STREAM_RESPONSE_DONE: {
                        this.doStreamResponseDone(currentState);
                        break;
                    }
                    case SOURCES_REQUEST_ERROR: {
                        this.processSourcesRequestError(currentState);
                        break;
                    }
                    case SOURCES_RESPONSE_ERROR: {
                        this.processSourcesResponseError(currentState);
                        break;
                    }
                    case REGISTER_REQUEST_ERROR: {
                        this.processRegisterRequestError(currentState);
                        break;
                    }
                    case REGISTER_RESPONSE_ERROR: {
                        this.processRegisterResponseError(currentState);
                        break;
                    }
                    case STREAM_REQUEST_ERROR: {
                        this.processStreamRequestError(currentState);
                        break;
                    }
                    case STREAM_RESPONSE_ERROR: {
                        this.processStreamResponseError(currentState);
                        break;
                    }
                    default: {
                        this._log.error((Object)("Unknown state in RelayPullThread: " + (Object)((Object)currentState.getStateId())));
                        success = false;
                        break;
                    }
                }
            }
        } else if (message instanceof BootstrapResultMessage) {
            BootstrapResultMessage bootstrapResultMessage = (BootstrapResultMessage)message;
            switch (bootstrapResultMessage.getTypeId()) {
                case BOOTSTRAP_COMPLETE: {
                    this.doBootstrapComplete(bootstrapResultMessage);
                    break;
                }
                case BOOTSTRAP_FAILED: {
                    this.doBootstrapFailed(bootstrapResultMessage);
                    break;
                }
                default: {
                    this._log.error((Object)("Unknown BootstrapResultChangeMessage in RelayPullThread: " + (Object)((Object)bootstrapResultMessage.getTypeId())));
                    success = false;
                    break;
                }
            }
        } else {
            success = super.executeAndChangeState(message);
        }
        return success;
    }

    private void doBootstrapFailed(BootstrapResultMessage bootstrapResultMessage) {
        this._log.error((Object)"bootstrap failed", bootstrapResultMessage.getFailureReason());
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            this._currentState.switchToBootstrap(this._currentState.getCheckpoint());
            this.enqueueMessage(this._currentState);
        }
    }

    private void doBootstrapComplete(BootstrapResultMessage bootstrapResultMessage) {
        Checkpoint cp = bootstrapResultMessage.getBootstrapCheckpoint();
        if (null == cp) {
            bootstrapResultMessage.switchToBootstrapFailed(new RuntimeException("No persistent checkpoint found at the end of bootstrap!"));
            this.doBootstrapFailed(bootstrapResultMessage);
        } else {
            this._currentState.setRelayFellOff(false);
            DbusClientMode consumptionMode = cp.getConsumptionMode();
            this._log.info((Object)("Bootstrap completed: Consumption Mode=" + consumptionMode + " startScn=" + cp.getBootstrapStartScn() + " targetScn=" + cp.getBootstrapTargetScn() + " sinceScn=" + cp.getBootstrapSinceScn() + " windowScn=" + cp.getWindowScn()));
            UnifiedClientStats unifiedClientStats = this._sourcesConn.getUnifiedClientStats();
            if (unifiedClientStats != null) {
                boolean isBootstrapping = consumptionMode == DbusClientMode.BOOTSTRAP_SNAPSHOT || consumptionMode == DbusClientMode.BOOTSTRAP_CATCHUP;
                unifiedClientStats.setBootstrappingState(isBootstrapping);
            }
            cp.resetBootstrap();
            if (this.toTearConnAfterHandlingResponse()) {
                this.tearConnectionAndEnqueuePickServer();
            } else {
                this._currentState.switchToRequestStream(cp);
                this.enqueueMessage(this._currentState);
            }
        }
    }

    protected void doStart(LifecycleMessage lcMessage) {
        this._log.info((Object)"Relay Puller doStart ");
        if (this._currentState.getStateId() != ConnectionState.StateId.INITIAL) {
            return;
        }
        super.doStart(lcMessage);
        this._currentState.switchToPickServer();
        this.enqueueMessage(this._currentState);
    }

    protected void doPickRelay(ConnectionState curState) {
        int retriesLeft;
        int serversNum = this._servers.size();
        if (0 == serversNum) {
            this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)new DatabusException("No relays specified")));
            return;
        }
        Random rng = new Random();
        DatabusRelayConnection relayConn = null;
        ServerInfo serverInfo = null;
        BackoffTimer originalCounter = this._status.getRetriesCounter();
        if (curState.isRelayFellOff()) {
            this._status.setRetriesCounter(this._retriesOnFallOff);
        }
        while (null == relayConn && (retriesLeft = this._status.getRetriesLeft()) >= 0 && !this.checkForShutdownRequest()) {
            this._log.info((Object)("picking a relay; retries left:" + retriesLeft + ", Backoff Timer :" + this._status.getRetriesCounter() + ", Are we retrying because of SCNNotFoundException : " + curState.isRelayFellOff()));
            this.backoffOnPullError();
            this._curServerIdx = this._curServerIdx < 0 ? rng.nextInt(serversNum) : (this._curServerIdx + 1) % serversNum;
            Iterator setIter = this._servers.iterator();
            for (int i = 0; i <= this._curServerIdx; ++i) {
                serverInfo = (ServerInfo)setIter.next();
            }
            try {
                relayConn = this._sourcesConn.getRelayConnFactory().createRelayConnection(serverInfo, (ActorMessageQueue)this, this._remoteExceptionHandler);
                this._log.info((Object)("picked a relay:" + serverInfo.toSimpleString()));
            }
            catch (Exception e) {
                this._log.error((Object)("Unable to get connection to relay:" + serverInfo.toSimpleString()), (Throwable)e);
            }
        }
        this._status.setRetriesCounter(originalCounter);
        if (!this.checkForShutdownRequest()) {
            this._curServer = serverInfo;
            if (null == relayConn) {
                if (this._currentState.isRelayFellOff()) {
                    boolean enqueueMessage = false;
                    try {
                        enqueueMessage = this.onRelayFellOff(curState, curState.getCheckpoint(), (Throwable)new ScnNotFoundException("Retries on SCNNotFoundException exhausted !!"));
                    }
                    catch (InterruptedException ie) {
                        this._log.error((Object)"interrupted while processing onRelayFellOff", (Throwable)ie);
                        curState.switchToPickServer();
                        this.enqueueMessage(curState);
                    }
                    catch (InvalidEventException e) {
                        this._log.error((Object)("error trying to notify dispatcher of bootstrapping :" + e.getMessage()), (Throwable)e);
                        curState.switchToPickServer();
                        this.enqueueMessage(curState);
                    }
                    if (enqueueMessage) {
                        this.enqueueMessage(curState);
                    }
                } else {
                    try {
                        this._log.info((Object)"Puller retries exhausted. Injecting an error event on dispatcher queue to invoke onError callback");
                        this._remoteExceptionHandler.handleException((Throwable)new PullerRetriesExhaustedException());
                    }
                    catch (InterruptedException ie) {
                        this._log.error((Object)"Interrupted while processing retries exhausted", (Throwable)ie);
                    }
                    catch (InvalidEventException e) {
                        this._log.error((Object)"Error trying to notify dispatcher of puller retries getting exhausted", (Throwable)e);
                    }
                    this._log.error((Object)"Cannot find a relay");
                }
            } else {
                DatabusRelayConnection oldRelayConn = curState.getRelayConnection();
                if (null != oldRelayConn) {
                    this.resetConnectionAndSetFlag();
                }
                RelayPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
                this._log.info((Object)"Relay Puller switching to request sources");
                curState.switchToRequestSources(serverInfo, serverInfo.getAddress(), relayConn);
                this._lastOpenConnection = relayConn;
                this.enqueueMessage(curState);
            }
        }
    }

    protected void doRequestSources(ConnectionState curState) {
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerSourcesCall();
        }
        this._log.debug((Object)"Sending /sources request");
        curState.switchToSourcesRequestSent();
        RelayPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
        curState.getRelayConnection().requestSources(curState);
    }

    private String buildSubsList(List<DatabusSubscription> subs, Map<String, IdNamePair> sourceNameMap) {
        StringBuilder sb = new StringBuilder(128);
        sb.append("[");
        boolean first = true;
        for (DatabusSubscription sub : subs) {
            if (!first) {
                sb.append(',');
            }
            DatabusSubscription realSub = sub;
            LogicalSource ls = sub.getLogicalSource();
            if (!ls.idKnown() && !ls.isWildcard()) {
                IdNamePair sourceEntry = sourceNameMap.get(ls.getName());
                if (null == sourceEntry) {
                    throw new RuntimeException("FATAL! unable to find logical source " + ls.getName() + " in " + sourceNameMap);
                }
                realSub = new DatabusSubscription(sub.getPhysicalSource(), sub.getPhysicalPartition(), new LogicalSourceId(new LogicalSource(Integer.valueOf(sourceEntry.getId().intValue()), ls.getName()), sub.getLogicalPartition().getId()));
            }
            sb.append(realSub.toJsonString());
            first = false;
        }
        sb.append("]");
        return sb.toString();
    }

    protected void doSourcesResponseSuccess(ConnectionState curState) {
        String subsString;
        this.mergeRelayCallsStats();
        Map<String, IdNamePair> sourceNameMap = curState.getSourcesNameMap();
        StringBuilder sb = new StringBuilder();
        boolean firstSource = true;
        boolean error = false;
        ArrayList<IdNamePair> sourcesList = new ArrayList<IdNamePair>(this._sourcesConn.getSourcesNames().size());
        if (curState.getRelayConnection().getProtocolVersion() < 3) {
            for (String sourceName : this._sourcesConn.getSourcesNames()) {
                IdNamePair source = sourceNameMap.get(sourceName);
                if (null == source) {
                    this._log.error((Object)("Source not found on server: " + sourceName));
                    error = true;
                    break;
                }
                if (!firstSource) {
                    sb.append(',');
                }
                sb.append(source.getId().toString());
                firstSource = false;
                sourcesList.add(source);
            }
        }
        String sourcesIdList = sb.toString();
        String string = subsString = error ? "ERROR" : this.buildSubsList(curState.getSubscriptions(), sourceNameMap);
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("Source ids: " + sourcesIdList));
            this._log.debug((Object)("Subs : " + subsString));
        }
        this._sourcesConn.getRelayDispatcher().enqueueMessage(SourcesMessage.createSetSourcesIdsMessage(curState.getSources(), sourcesIdList));
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            if (error) {
                curState.switchToPickServer();
            } else {
                curState.switchToRequestSourcesSchemas(sourcesIdList, subsString);
                if (this._sourcesConn.isBootstrapEnabled()) {
                    this._sourcesConn.getBootstrapPuller().enqueueMessage(SourcesMessage.createSetSourcesIdsMessage(curState.getSources(), curState.getSourcesIdListString()));
                }
                String hostHdr = curState.getHostName();
                String svcHdr = curState.getSvcName();
                this._log.info((Object)("Connected to relay " + hostHdr + " with a service identifier " + svcHdr));
            }
            this.enqueueMessage(curState);
        }
    }

    protected void doRequestRegister(ConnectionState curState) {
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerRegisterCall(EMPTY_REGISTER_LIST);
        }
        this._log.debug((Object)"Sending /sources request");
        curState.swichToRegisterRequestSent();
        RelayPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
        curState.getRelayConnection().requestRegister(curState.getSourcesIdListString(), curState);
    }

    protected void doRegisterResponseSuccess(ConnectionState curState) {
        boolean enqueueMessage = true;
        this.mergeRelayCallsStats();
        if (curState.getSourcesSchemas().size() < this._sourcesConn.getSourcesNames().size()) {
            this._log.error((Object)("Expected " + this._sourcesConn.getSourcesNames().size() + " schemas, got: " + curState.getSourcesSchemas().size()));
            curState.switchToPickServer();
        } else {
            this._sourcesConn.getRelayDispatcher().enqueueMessage(SourcesMessage.createSetSourcesSchemasMessage(curState.getSourcesSchemas(), curState.getMetadataSchemas()));
            Checkpoint cp = this._currentState.getCheckpoint();
            if (null == cp) {
                this._log.info((Object)"no existing checkpoint found; attempting to load persistent checkpoint");
                cp = this._sourcesConn.loadPersistentCheckpoint();
            }
            if (null == cp) {
                this._log.info((Object)(this.getName() + ": no checkpoint found"));
                cp = new Checkpoint();
                cp.setConsumptionMode(DbusClientMode.ONLINE_CONSUMPTION);
                cp.setWindowScn(Long.valueOf(0L));
                cp.clearBootstrapStartScn();
                if (this._isConsumeCurrent) {
                    cp.setFlexible();
                    this._log.info((Object)"Setting flexible checkpoint: consumeCurrent is true");
                }
            } else {
                this._log.info((Object)("persisted checkpoint loaded: " + cp.toString()));
            }
            if (cp.getFlexible()) {
                curState.setFlexibleCheckpointRequest(true);
            }
            if (this.toTearConnAfterHandlingResponse()) {
                this.tearConnectionAndEnqueuePickServer();
                enqueueMessage = false;
            } else {
                if (this._sourcesConn.isBootstrapEnabled()) {
                    this._sourcesConn.getBootstrapPuller().enqueueMessage(SourcesMessage.createSetSourcesSchemasMessage(curState.getSourcesSchemas(), curState.getMetadataSchemas()));
                }
                if (DbusClientMode.BOOTSTRAP_SNAPSHOT == cp.getConsumptionMode() || DbusClientMode.BOOTSTRAP_CATCHUP == cp.getConsumptionMode()) {
                    curState.setRelayFellOff(true);
                    if (this._sourcesConn.isBootstrapEnabled()) {
                        curState.switchToBootstrap(cp);
                    } else {
                        String message = "bootstrap checkpoint found but bootstrapping is disabled:" + cp;
                        this._log.error((Object)message);
                        this._status.suspendOnError((Throwable)new DatabusException(message));
                        enqueueMessage = false;
                    }
                } else {
                    if (cp.getWindowOffset() > 0L) {
                        this._log.info((Object)("RelayPuller reconnecting when in middle of event window. Will regress. Current Checkpoint :" + cp));
                        if (cp.getPrevScn() > 0L) {
                            cp.setWindowScn(Long.valueOf(cp.getPrevScn()));
                            cp.setWindowOffset(-1L);
                            curState.setSCNRegress(true);
                        } else if (curState.isFlexibleCheckpointRequest()) {
                            this._log.info((Object)"Switched relays after reading partial window with flexible checkpoint !!");
                            cp.setFlexible();
                            curState.setSCNRegress(true);
                        } else {
                            this._log.fatal((Object)("Checkpoint does not have prevSCN !!. Suspending !! Checkpoint :" + cp));
                            this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)new Exception("Checkpoint does not have prevSCN !!. Suspending !! Checkpoint :" + cp)));
                            enqueueMessage = false;
                        }
                    }
                    if (enqueueMessage) {
                        curState.switchToRequestStream(cp);
                    }
                }
            }
        }
        if (enqueueMessage) {
            this.enqueueMessage(curState);
        }
    }

    protected void doRequestStream(ConnectionState curState) {
        String args;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Checking for free space in buffer");
        }
        int freeBufferThreshold = (int)((double)this._sourcesConn.getConnectionConfig().getFreeBufferThreshold() * 100.0 / this._pullerBufferUtilizationPct);
        try {
            curState.getDataEventsBuffer().waitForFreeSpace((long)freeBufferThreshold);
        }
        catch (InterruptedException ie) {
            this.enqueueMessage(curState);
            return;
        }
        Checkpoint cp = curState.getCheckpoint();
        if (debugEnabled) {
            this._log.debug((Object)("Checkpoint at RequestDataEvents: " + cp.toString()));
        }
        if (null == this._relayFilter) {
            if (debugEnabled) {
                this._log.debug((Object)"Initializing relay filter config");
            }
            this._relayFilter = new DbusKeyCompositeFilter();
            Map<String, IdNamePair> srcNameIdMap = curState.getSourcesNameMap();
            for (DbusKeyCompositeFilterConfig conf : this._relayFilterConfigs) {
                Map cMap = conf.getConfigMap();
                HashMap fConfMap = new HashMap();
                for (Map.Entry e : cMap.entrySet()) {
                    IdNamePair idName = srcNameIdMap.get(e.getKey());
                    if (null == idName) continue;
                    fConfMap.put(idName.getId(), e.getValue());
                }
                if (debugEnabled) {
                    this._log.debug((Object)("FilterConfMap is :" + fConfMap));
                }
                this._relayFilter.merge(new DbusKeyCompositeFilter(fConfMap));
            }
            if (debugEnabled) {
                this._log.debug((Object)("Merged Filter (before deduping) is :" + this._relayFilter));
            }
            this._relayFilter.dedupe();
            if (debugEnabled) {
                this._log.debug((Object)("Merged Filter (after deduping) is :" + this._relayFilter));
            }
        }
        this._streamCallStartMs = System.currentTimeMillis();
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerStreamRequest(cp, EMPTY_STREAM_LIST);
        }
        int fetchSize = (int)((double)curState.getDataEventsBuffer().getBufferFreeReadSpace() / 100.0 * this._pullerBufferUtilizationPct);
        fetchSize = Math.max(freeBufferThreshold, fetchSize);
        CheckpointMult cpMult = new CheckpointMult();
        if (curState.getRelayConnection().getProtocolVersion() >= 3) {
            args = curState.getSubsListString();
            for (DatabusSubscription sub : curState.getSubscriptions()) {
                PhysicalPartition p = sub.getPhysicalPartition();
                cpMult.addCheckpoint(p, cp);
            }
        } else {
            args = curState.getSourcesIdListString();
            cpMult.addCheckpoint(PhysicalPartition.ANY_PHYSICAL_PARTITION, cp);
        }
        curState.switchToStreamRequestSent();
        RelayPullThread.sendHeartbeat(this._sourcesConn.getUnifiedClientStats());
        curState.getRelayConnection().requestStream(args, this._relayFilter, fetchSize, cpMult, this._sourcesConn.getConnectionConfig().getKeyRange(), curState);
    }

    protected void doReadDataEvents(ConnectionState curState) {
        boolean debugEnabled = this._log.isDebugEnabled();
        boolean enqueueMessage = true;
        try {
            ChunkedBodyReadableByteChannel readChannel = curState.getReadChannel();
            Checkpoint cp = curState.getCheckpoint();
            curState.setRelayFellOff(false);
            String remoteErrorName = RemoteExceptionHandler.getExceptionName(readChannel);
            Throwable knownRemoteError = this._remoteExceptionHandler.getException(readChannel);
            if (null != knownRemoteError && knownRemoteError instanceof ScnNotFoundException) {
                if (this.toTearConnAfterHandlingResponse()) {
                    this.tearConnectionAndEnqueuePickServer();
                    enqueueMessage = false;
                } else {
                    curState.setRelayFellOff(true);
                    if (this._retriesOnFallOff.getRemainingRetriesNum() > 0) {
                        this._log.error((Object)("Got SCNNotFoundException. Retry (" + this._retriesOnFallOff.getRetriesNum() + ") out of " + this._retriesOnFallOff.getConfig().getMaxRetryNum()));
                        curState.switchToPickServer();
                    } else {
                        enqueueMessage = this.onRelayFellOff(curState, cp, knownRemoteError);
                    }
                }
            } else if (null != remoteErrorName) {
                if (this.toTearConnAfterHandlingResponse()) {
                    this.tearConnectionAndEnqueuePickServer();
                    enqueueMessage = false;
                } else {
                    this._log.error((Object)("read events error: " + RemoteExceptionHandler.getExceptionMessage(readChannel)));
                    curState.switchToStreamResponseError();
                }
            } else {
                UnifiedClientStats unifiedClientStats;
                if (debugEnabled) {
                    this._log.debug((Object)"Sending events to buffer");
                }
                DbusEventsStatisticsCollector connCollector = this._sourcesConn.getInboundEventsStatsCollector();
                if (curState.isSCNRegress()) {
                    this._log.info((Object)("SCN Regress requested !! Sending a SCN Regress Message to dispatcher. Curr Ckpt :" + curState.getCheckpoint()));
                    DbusEvent regressEvent = this.getEventFactory().createSCNRegressEvent(new SCNRegressMessage(curState.getCheckpoint()));
                    this.writeEventToRelayDispatcher(curState, regressEvent, "SCN Regress Event from ckpt :" + curState.getCheckpoint());
                    curState.setSCNRegress(false);
                }
                if ((unifiedClientStats = this._sourcesConn.getUnifiedClientStats()) != null) {
                    unifiedClientStats.setBootstrappingState(false);
                    RelayPullThread.sendHeartbeat(unifiedClientStats);
                }
                int eventsNum = curState.getDataEventsBuffer().readEvents((ReadableByteChannel)readChannel, curState.getListeners(), connCollector);
                boolean resetConnection = false;
                if (eventsNum > 0) {
                    this._timeSinceEventsSec = System.currentTimeMillis();
                    cp.checkPoint();
                } else {
                    if (this._remoteExceptionHandler.getPendingEventSize(readChannel) > curState.getDataEventsBuffer().getMaxReadBufferCapacity()) {
                        String err = "ReadBuffer max capacity(" + curState.getDataEventsBuffer().getMaxReadBufferCapacity() + ") is less than event size(" + this._remoteExceptionHandler.getPendingEventSize(readChannel) + "). Increase databus.client.connectionDefaults.eventBuffer.maxEventSize and restart.";
                        this._log.fatal((Object)err);
                        this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)new PendingEventTooLargeException(err)));
                        return;
                    }
                    if (this._noEventsConnectionResetTimeSec > 0L) {
                        boolean bl = resetConnection = (System.currentTimeMillis() - this._timeSinceEventsSec) / 1000L > this._noEventsConnectionResetTimeSec;
                        if (resetConnection) {
                            this._timeSinceEventsSec = System.currentTimeMillis();
                            this._log.warn((Object)("about to reset connection to relay " + curState.getServerInetAddress() + ", because there were no events for " + this._noEventsConnectionResetTimeSec + "secs"));
                        }
                    }
                }
                if (debugEnabled) {
                    this._log.debug((Object)("Events read: " + eventsNum));
                }
                if (this.toTearConnAfterHandlingResponse() || resetConnection) {
                    this.tearConnectionAndEnqueuePickServer();
                    enqueueMessage = false;
                } else {
                    curState.switchToStreamResponseDone();
                    this.resetServerRetries();
                }
            }
            if (enqueueMessage) {
                this.enqueueMessage(curState);
            }
        }
        catch (InterruptedException ie) {
            this._log.warn((Object)"interrupted", (Throwable)ie);
            curState.switchToStreamResponseError();
            this.enqueueMessage(curState);
        }
        catch (InvalidEventException e) {
            this._log.error((Object)("error reading events from server:" + (Object)((Object)e)), (Throwable)e);
            curState.switchToStreamResponseError();
            this.enqueueMessage(curState);
        }
        catch (RuntimeException e) {
            this._log.error((Object)("runtime error reading events from server: " + e), (Throwable)e);
            curState.switchToStreamResponseError();
            this.enqueueMessage(curState);
        }
    }

    protected void doStreamResponseDone(ConnectionState curState) {
        Checkpoint cp = curState.getCheckpoint();
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)"Data events done");
            this._log.debug((Object)("Checkpoint at EventsDone: " + cp.toString()));
        }
        if (curState.isFlexibleCheckpointRequest() && curState.getDataEventsBuffer().getSeenEndOfPeriodScn() > 0L) {
            curState.setFlexibleCheckpointRequest(false);
        }
        if (null != this._sourcesConn.getLocalRelayCallsStatsCollector()) {
            long streamDuration = System.currentTimeMillis() - this._streamCallStartMs;
            this._sourcesConn.getLocalRelayCallsStatsCollector().registerStreamResponse(streamDuration);
            this.mergeRelayCallsStats();
        }
        this._status.getRetriesCounter().sleep();
        curState.switchToRequestStream(cp);
        this.enqueueMessage(curState);
    }

    protected void doBootstrap(ConnectionState curState) {
        if (null != this._lastOpenConnection) {
            this._lastOpenConnection.close();
            this._lastOpenConnection = null;
        }
        Checkpoint bootstrapCkpt = null;
        if (this._sourcesConn.getBootstrapPuller() == null) {
            this._log.warn((Object)"doBootstrap got called, but BootstrapPullThread is null. Is bootstrap disabled?");
            return;
        }
        try {
            bootstrapCkpt = curState.getCheckpoint().clone();
        }
        catch (Exception e) {
            String msg = "Error copying checkpoint:" + curState.getCheckpoint();
            this._log.error((Object)msg, (Throwable)e);
            BootstrapResultMessage bootstrapResultMessage = BootstrapResultMessage.createBootstrapFailedMessage(e);
            this.doBootstrapFailed(bootstrapResultMessage);
            return;
        }
        if (!bootstrapCkpt.isBootstrapStartScnSet()) {
            bootstrapCkpt = curState.getBstCheckpointHandler().createInitialBootstrapCheckpoint(bootstrapCkpt, Long.valueOf(bootstrapCkpt.getWindowScn()));
        }
        this._log.info((Object)("Bootstrap begin: sinceScn=" + bootstrapCkpt.getWindowScn()));
        CheckpointMessage bootstrapCpMessage = CheckpointMessage.createSetCheckpointMessage((Checkpoint)bootstrapCkpt);
        this._sourcesConn.getBootstrapPuller().enqueueMessage(bootstrapCpMessage);
        try {
            Checkpoint cpForDispatcher = new Checkpoint(bootstrapCkpt.toString());
            cpForDispatcher.setConsumptionMode(DbusClientMode.BOOTSTRAP_SNAPSHOT);
            DbusEventInternalReadable cpEvent = this.getEventFactory().createCheckpointEvent(cpForDispatcher);
            this.writeEventToRelayDispatcher(curState, (DbusEvent)cpEvent, "Control Event to start bootstrap");
            curState.switchToBootstrapRequested();
        }
        catch (InterruptedException ie) {
            this._log.error((Object)"Got interrupted while writing control message to bootstrap !!", (Throwable)ie);
            this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)ie));
        }
        catch (Exception e) {
            this.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)e));
            this._log.error((Object)"Exception occured while switching to bootstrap: ", (Throwable)e);
        }
    }

    private void mergeRelayCallsStats() {
        if (null != this._sourcesConn.getLocalRelayCallsStatsCollector() && null != this._sourcesConn.getRelayCallsStatsCollector() && this._unmergedHttpCallsStats >= 10) {
            this._sourcesConn.getRelayCallsStatsCollector().merge(this._sourcesConn.getLocalRelayCallsStatsCollector());
            this._sourcesConn.getLocalRelayCallsStatsCollector().reset();
            this._unmergedHttpCallsStats = 0;
        } else {
            ++this._unmergedHttpCallsStats;
        }
    }

    private void processSourcesRequestError(ConnectionState state) {
        this._log.info((Object)"Sources Request Error");
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerInvalidSourceRequest();
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processSourcesResponseError(ConnectionState state) {
        this._log.info((Object)"Sources Response Error");
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerInvalidSourceRequest();
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processRegisterRequestError(ConnectionState state) {
        this._log.info((Object)"Register Request Error");
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerInvalidRegisterCall();
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processRegisterResponseError(ConnectionState state) {
        this._log.info((Object)"Register Response Error");
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerInvalidRegisterCall();
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processStreamRequestError(ConnectionState state) {
        this._log.info((Object)"Stream Request Error");
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerInvalidStreamRequest();
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    private void processStreamResponseError(ConnectionState state) {
        this._log.info((Object)"Stream Response Error");
        if (null != this._relayCallsStats) {
            this._relayCallsStats.registerInvalidStreamRequest();
        }
        if (this.toTearConnAfterHandlingResponse()) {
            this.tearConnectionAndEnqueuePickServer();
        } else {
            state.switchToPickServer();
            this.enqueueMessage(state);
        }
    }

    @Override
    protected void resetConnection() {
        DatabusRelayConnection conn = this._currentState.getRelayConnection();
        if (null != conn) {
            conn.close();
            this._currentState.setRelayConnection(null);
        }
    }

    private boolean onRelayFellOff(ConnectionState curState, Checkpoint cp, Throwable knownRemoteError) throws InvalidEventException, InterruptedException {
        boolean enqueueMessage = true;
        this._log.error((Object)"Retries on SCNNotFoundException exhausted !!");
        this._retriesOnFallOff.reset();
        if (!this._sourcesConn.isBootstrapEnabled()) {
            this._log.error((Object)"No scn found on relay while no bootstrap services provided:");
            this._log.error((Object)(" bootstrapServices=" + this._sourcesConn.getBootstrapServices() + "; bootstrapRegistrations=" + this._sourcesConn.getBootstrapRegistrations()));
            if (this._isReadLatestScnOnErrorEnabled) {
                this._log.error((Object)"Read Latest SCN Window on SCNNotFoundException is enabled. Will start reading from the lastest SCN Window !!");
                curState.getRelayConnection().enableReadFromLatestScn(true);
                this._currentState.setRelayFellOff(false);
                curState.switchToStreamResponseDone();
            } else {
                this._log.fatal((Object)"Got SCNNotFoundException but Read Latest SCN Window and bootstrap are disabled !!");
                this._remoteExceptionHandler.handleException((Throwable)new BootstrapDatabaseTooOldException(knownRemoteError));
                enqueueMessage = false;
            }
        } else {
            this._log.info((Object)("Requested scn " + cp.getWindowScn() + " not found on relay; switching to bootstrap service"));
            curState.switchToBootstrap(cp);
            UnifiedClientStats unifiedClientStats = this._sourcesConn.getUnifiedClientStats();
            if (unifiedClientStats != null) {
                unifiedClientStats.setBootstrappingState(true);
            }
        }
        return enqueueMessage;
    }

    @Override
    protected void tearConnection() {
        this._currentState.setRelayFellOff(false);
        super.tearConnection();
    }

    public void setNoEventsConnectionResetTimeSec(long noEventsConnectionResetTimeSec) {
        this._noEventsConnectionResetTimeSec = noEventsConnectionResetTimeSec;
    }

    protected BackoffTimer getRetryonFallOff() {
        return this._retriesOnFallOff;
    }

    private void writeEventToRelayDispatcher(ConnectionState curState, DbusEvent event, String message) throws InterruptedException, InvalidEventException {
        boolean success = false;
        BackoffTimerStaticConfig timerConfig = new BackoffTimerStaticConfig(1L, 1000L, 1.0, 1L, -1);
        BackoffTimer timer = new BackoffTimer("EVB More Space Timer", timerConfig);
        timer.reset();
        byte[] eventBytes = new byte[event.size()];
        this._log.info((Object)("Event size: " + eventBytes.length));
        this._log.info((Object)("Event:" + event.toString()));
        event.getRawBytes().get(eventBytes);
        UnifiedClientStats unifiedClientStats = this._sourcesConn.getUnifiedClientStats();
        while (!success && timer.getRemainingRetriesNum() > 0) {
            ByteArrayInputStream cpIs = new ByteArrayInputStream(eventBytes);
            ReadableByteChannel cpRbc = Channels.newChannel(cpIs);
            RelayPullThread.sendHeartbeat(unifiedClientStats);
            int ecnt = curState.getDataEventsBuffer().readEvents(cpRbc);
            if (ecnt <= 0) {
                this._log.error((Object)("Not enough spece in the event buffer to add a control message :" + message));
                boolean interrupted = !timer.backoffAndSleep();
                if (!interrupted) continue;
                throw new InterruptedException("Got interrupted while waiting to write control Message to EVB : " + message);
            }
            this._log.info((Object)("Sent a control message :" + message));
            success = true;
        }
    }

    public DatabusRelayConnection getLastOpenConnection() {
        return this._lastOpenConnection;
    }
}

