/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.DatabusClientDSCUpdater;
import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.DispatcherState;
import com.linkedin.databus.client.SingleSourceSCN;
import com.linkedin.databus.client.SourcesMessage;
import com.linkedin.databus.client.consumer.MultiConsumerCallback;
import com.linkedin.databus.client.pub.CheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.ConsumerCallbackResult;
import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.client.pub.RegistrationId;
import com.linkedin.databus.client.pub.SCN;
import com.linkedin.databus.client.pub.SharedCheckpointPersistenceProvider;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusErrorEvent;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventInternalReadable;
import com.linkedin.databus.core.DbusEventSerializable;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.DispatcherRetriesExhaustedException;
import com.linkedin.databus.core.async.AbstractActorMessageQueue;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.util.IdNamePair;
import com.linkedin.databus2.core.BackoffTimerStaticConfig;
import com.linkedin.databus2.core.mbean.DatabusReadOnlyStatus;
import com.linkedin.databus2.schemas.SchemaId;
import com.linkedin.databus2.schemas.VersionedSchema;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public abstract class GenericDispatcher<C>
extends AbstractActorMessageQueue {
    public static final String MODULE = GenericDispatcher.class.getName();
    private final AtomicBoolean _stopDispatch = new AtomicBoolean(false);
    private final List<DatabusSubscription> _subsList;
    private final CheckpointPersistenceProvider _checkpointPersistor;
    private final DbusEventBuffer _dataEventsBuffer;
    private final MultiConsumerCallback _asyncCallback;
    private final DispatcherState _internalState;
    private final DatabusReadOnlyStatus _statusMbean;
    private final MBeanServer _mbeanServer;
    private final DatabusSourcesConnection.StaticConfig _connConfig;
    private boolean _inInternalLoop;
    private DatabusHttpClientImpl _serverHandle = null;
    private long _currentWindowSizeInBytes = 0L;
    private long _numCheckPoints = 0L;
    protected long _lastWindowScn = -1L;
    protected long _lastEowTsNsecs = -1L;
    private RegistrationId _registrationId;
    protected boolean _schemaIdCheck = true;

    public GenericDispatcher(String name, DatabusSourcesConnection.StaticConfig connConfig, List<DatabusSubscription> subsList, CheckpointPersistenceProvider checkpointPersistor, DbusEventBuffer dataEventsBuffer, MultiConsumerCallback asyncCallback, Logger log) {
        this(name, connConfig, subsList, checkpointPersistor, dataEventsBuffer, asyncCallback, null, null, null, connConfig.getDispatcherRetries(), log);
    }

    public GenericDispatcher(String name, DatabusSourcesConnection.StaticConfig connConfig, List<DatabusSubscription> subsList, CheckpointPersistenceProvider checkpointPersistor, DbusEventBuffer dataEventsBuffer, MultiConsumerCallback asyncCallback, MBeanServer mbeanServer, DatabusHttpClientImpl serverHandle, RegistrationId registrationId, BackoffTimerStaticConfig dispatcherRetries, Logger log) {
        super(name, dispatcherRetries, false, log);
        this._subsList = subsList;
        this._checkpointPersistor = checkpointPersistor;
        this._dataEventsBuffer = dataEventsBuffer;
        this._asyncCallback = asyncCallback;
        this._internalState = DispatcherState.create(dataEventsBuffer, this.getName() + ".iter");
        this._inInternalLoop = false;
        this._serverHandle = serverHandle;
        this._mbeanServer = mbeanServer;
        this._statusMbean = new DatabusReadOnlyStatus(this.getName(), this.getStatus(), -1L);
        this._statusMbean.registerAsMbean(this._mbeanServer);
        this._connConfig = connConfig;
        this._currentWindowSizeInBytes = 0L;
        this._registrationId = registrationId;
        this._internalState.switchToStartDispatchEvents();
        this.enqueueMessage(this._internalState);
    }

    public Logger getLog() {
        return this._log;
    }

    protected void setSchemaIdCheck(boolean schemaIdCheck) {
        this._schemaIdCheck = schemaIdCheck;
    }

    protected boolean executeAndChangeState(Object message) {
        boolean success = true;
        if (message instanceof DispatcherState) {
            DispatcherState newState = (DispatcherState)message;
            if (newState != this._internalState) {
                switch (newState.getStateId()) {
                    case CLOSED: {
                        this._internalState.switchToClosed();
                        this.shutdown();
                        break;
                    }
                    case STOP_DISPATCH_EVENTS: {
                        this._internalState.switchToStopDispatch();
                        break;
                    }
                    default: {
                        this._log.error((Object)("Unknown dispatcher message: " + (Object)((Object)this._internalState.getStateId())));
                        success = false;
                        break;
                    }
                }
            } else {
                this._inInternalLoop = false;
                switch (this._internalState.getStateId()) {
                    case INITIAL: {
                        break;
                    }
                    case CLOSED: {
                        this.doStopDispatch(this._internalState);
                        this.shutdown();
                        break;
                    }
                    case STOP_DISPATCH_EVENTS: {
                        this.doStopDispatch(this._internalState);
                        break;
                    }
                    case START_DISPATCH_EVENTS: {
                        this.doStartDispatchEvents();
                        break;
                    }
                    case EXPECT_EVENT_WINDOW: 
                    case REPLAY_DATA_EVENTS: 
                    case EXPECT_STREAM_DATA_EVENTS: {
                        this.doDispatchEvents();
                        break;
                    }
                    default: {
                        this._log.error((Object)("Unknown internal state id: " + (Object)((Object)this._internalState.getStateId())));
                        success = false;
                    }
                }
            }
        } else if (message instanceof SourcesMessage) {
            SourcesMessage srcMsg = (SourcesMessage)message;
            switch (srcMsg.getTypeId()) {
                case SET_SOURCES_IDS: {
                    this._internalState.addSources(srcMsg.getSources());
                    break;
                }
                case SET_SOURCES_SCHEMAS: {
                    this._internalState.addSchemas(srcMsg.getSourcesSchemas(), srcMsg.getMetadataSchemas());
                    break;
                }
                default: {
                    this._log.error((Object)("Unknown sources message type: " + srcMsg));
                    break;
                }
            }
        } else {
            success = super.executeAndChangeState(message);
        }
        return success;
    }

    protected void doStopDispatch(DispatcherState curState) {
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Entered stopConsumption");
        }
        if (null != curState.getEventsIterator()) {
            curState.getEventsIterator().close();
        }
        if (null != curState.getLastSuccessfulIterator()) {
            curState.getLastSuccessfulIterator().close();
        }
        ConsumerCallbackResult stopSuccess = ConsumerCallbackResult.ERROR;
        try {
            stopSuccess = this._asyncCallback.onStopConsumption();
        }
        catch (RuntimeException e) {
            DbusPrettyLogUtils.logExceptionAtError((String)"Internal stopConsumption error", (Throwable)e, (Logger)this._log);
        }
        if (ConsumerCallbackResult.SUCCESS == stopSuccess || ConsumerCallbackResult.CHECKPOINT == stopSuccess) {
            if (debugEnabled) {
                this._log.debug((Object)"stopConsumption succeeded.");
            }
        } else {
            this.getStatus().suspendOnError((Throwable)new RuntimeException("Stop dispatcher failed"));
            this._log.error((Object)"stopConsumption failed.");
        }
        this.getStatus().suspendOnError((Throwable)new RuntimeException("Processing of events stopped"));
        this._stopDispatch.set(true);
        if (this._serverHandle != null && this._serverHandle.isClusterEnabled()) {
            this._log.error((Object)"Suspend while in clusterMode: shutting down");
            this._serverHandle.shutdownAsynchronously();
            return;
        }
    }

    protected void doRollback(DispatcherState curState) {
        this.doRollback(curState, curState.getLastSuccessfulScn(), true, true);
    }

    protected void doRollback(DispatcherState curState, SCN rollbackScn, boolean checkRetries, boolean regressItr) {
        boolean success = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (!curState.getStateId().equals((Object)DispatcherState.StateId.ROLLBACK)) {
            success = false;
            this._log.error((Object)("ROLLBACK state is expected by the dispatcher, but the current state is:" + (Object)((Object)curState.getStateId())));
        }
        int retriesLeft = Integer.MAX_VALUE;
        if (checkRetries) {
            retriesLeft = this.getStatus().getRetriesLeft();
            this._log.info((Object)("Rolling back the dispatcher to last successful checkpoint. Number of remaining retries for dispatcher to replay events = " + retriesLeft));
            boolean bl = success = success && retriesLeft > 0;
            if (success) {
                if (0 == this.getStatus().getRetriesNum()) {
                    this.getStatus().retryOnError("rollback");
                } else {
                    this.getStatus().retryOnLastError();
                }
            }
        }
        if (success) {
            Checkpoint lastCp = curState.getLastSuccessfulCheckpoint();
            if (null != lastCp || !regressItr) {
                ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
                try {
                    this._log.info((Object)("Rolling back to SCN : " + rollbackScn));
                    callbackResult = this.getAsyncCallback().onRollback(rollbackScn);
                }
                catch (RuntimeException e) {
                    this._log.error((Object)("Internal onRollback error: " + e.getMessage()), (Throwable)e);
                }
                success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
                if (success) {
                    if (debugEnabled) {
                        this._log.debug((Object)"Rollback consumer callback succeeded");
                    }
                } else {
                    this._log.error((Object)"Rollback consumer callback failed");
                }
            }
            if (regressItr) {
                if (null != curState.getLastSuccessfulIterator()) {
                    this._log.info((Object)("Rolled back to last successful checkpoint position in the buffer: " + curState.getLastSuccessfulIterator()));
                    this._currentWindowSizeInBytes = 0L;
                    curState.switchToReplayDataEvents();
                } else {
                    this._log.fatal((Object)("Unable to rollback, this usually means that the events belonging to the last checkpoint are no longer to be found in the buffer. Please checkpoint more frequently to avoid this. Restarting the client will fix this problem, last checkpoint found: \n" + lastCp));
                    curState.switchToClosed();
                }
            }
        } else {
            DispatcherRetriesExhaustedException exp = new DispatcherRetriesExhaustedException();
            this._log.info((Object)"Invoke onError callback as dispatcher retries have exhausted");
            this.getAsyncCallback().onError((Throwable)exp);
            curState.switchToClosed();
        }
    }

    protected boolean doStartStreamEventWindow(DispatcherState curState) {
        boolean success = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Entered startDataEventSequence");
        }
        if (!curState.getStateId().equals((Object)DispatcherState.StateId.START_STREAM_EVENT_WINDOW)) {
            success = false;
            this._log.error((Object)("START_STREAM_EVENT_WINDOW state expected but found : " + (Object)((Object)curState.getStateId())));
        } else {
            ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
            try {
                callbackResult = this.getAsyncCallback().onStartDataEventSequence(curState.getStartWinScn());
            }
            catch (RuntimeException e) {
                this._log.error((Object)("Internal onStartDataEventSequence error: " + e.getMessage()), (Throwable)e);
            }
            success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
            if (success) {
                if (debugEnabled) {
                    this._log.debug((Object)("startDataEventSequence succeeded:" + curState.getStartWinScn()));
                }
            } else {
                this._log.error((Object)("startDataEventSequence failed:" + curState.getStartWinScn()));
            }
        }
        return success;
    }

    protected boolean doEndStreamEventWindow(DispatcherState curState) {
        boolean success = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Entered endDataEventSequence");
        }
        if (!curState.getStateId().equals((Object)DispatcherState.StateId.END_STREAM_EVENT_WINDOW)) {
            success = false;
            this._log.error((Object)("END_STREAM_EVENT_WINDOW state expected but found :" + (Object)((Object)curState.getStateId())));
        } else {
            ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
            try {
                callbackResult = this.getAsyncCallback().onEndDataEventSequence(curState.getEndWinScn());
            }
            catch (RuntimeException e) {
                this._log.error((Object)("Internal onEndDataEventSequence error: " + e.getMessage()), (Throwable)e);
            }
            success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
            if (success) {
                if (debugEnabled) {
                    this._log.debug((Object)("endDataEventSequence callback succeeded:" + curState.getEndWinScn()));
                }
            } else {
                this._log.error((Object)("endDataEventSequence callback failed, the end window scn is: " + curState.getEndWinScn()));
            }
        }
        return success;
    }

    protected boolean doStartStreamSource(DispatcherState curState) {
        boolean success = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Entered startSource");
        }
        if (!curState.getStateId().equals((Object)DispatcherState.StateId.START_STREAM_SOURCE)) {
            success = false;
            this._log.error((Object)("START_STREAM_SOURCE state expected but found:" + (Object)((Object)curState.getStateId())));
        } else {
            String sourceName = curState.getCurrentSource().getName();
            ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
            try {
                callbackResult = this.getAsyncCallback().onStartSource(sourceName, curState.getCurrentSourceSchema());
            }
            catch (RuntimeException e) {
                this._log.error((Object)("Internal onStartSource error: " + e.getMessage()), (Throwable)e);
            }
            success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
            if (!success) {
                this._log.error((Object)("startSource failed for the source: " + sourceName));
            } else {
                if (debugEnabled) {
                    this._log.debug((Object)("startSource succeeded for the source: " + sourceName));
                }
                curState.switchToExpectStreamDataEvents();
            }
        }
        return success;
    }

    protected boolean doEndStreamSource(DispatcherState curState) {
        boolean success = true;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Entered endSource");
        }
        if (!curState.getStateId().equals((Object)DispatcherState.StateId.END_STREAM_SOURCE)) {
            success = false;
            this._log.error((Object)("END_STREAM_SOURCE state expected but found :" + (Object)((Object)curState.getStateId())));
        } else if (null == curState.getCurrentSource()) {
            success = false;
            this._log.error((Object)"Missing source information in the current state");
        } else if (curState.getCurrentSource().getId() >= 0L) {
            String sourceName = curState.getCurrentSource().getName();
            ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
            try {
                callbackResult = this.getAsyncCallback().onEndSource(sourceName, curState.getCurrentSourceSchema());
            }
            catch (RuntimeException e) {
                this._log.error((Object)("Internal onEndSource error:" + e.getMessage()), (Throwable)e);
            }
            success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
            if (!success) {
                this._log.error((Object)("Method endSource() failed for the source : " + sourceName));
            } else {
                if (debugEnabled) {
                    this._log.debug((Object)("endSource succeeded:" + sourceName));
                }
                curState.resetSourceInfo();
                curState.switchToExpectStreamDataEvents();
            }
        }
        return success;
    }

    protected boolean doCheckStartSource(DispatcherState curState, Long eventSrcId, SchemaId schemaId) {
        boolean success = true;
        if (eventSrcId >= 0L) {
            IdNamePair source = curState.getSources().get(eventSrcId);
            if (null == source) {
                this._log.error((Object)("Unable to find source: srcid=" + eventSrcId));
                success = false;
            } else {
                VersionedSchema exactSchema;
                VersionedSchema verSchema = curState.getSchemaSet().getLatestVersionByName(source.getName());
                VersionedSchema versionedSchema = exactSchema = this._schemaIdCheck ? curState.getSchemaSet().getById(schemaId) : null;
                if (null == verSchema) {
                    this._log.error((Object)("Unable to find schema: srcid=" + source.getId() + " name=" + source.getName()));
                    success = false;
                } else if (this._schemaIdCheck && null == exactSchema) {
                    this._log.error((Object)("Unable to find schema: srcid=" + source.getId() + " name=" + source.getName() + " schemaId=" + schemaId));
                    success = false;
                } else if (verSchema.getSchema() != curState.getCurrentSourceSchema()) {
                    curState.switchToStartStreamSource(source, verSchema.getSchema());
                    success = this.doStartStreamSource(curState);
                }
            }
        }
        return success;
    }

    protected boolean storeCheckpoint(DispatcherState curState, Checkpoint cp, SCN winScn) throws IOException {
        boolean persistCheckpoint;
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"About to store checkpoint");
        }
        boolean success = true;
        ConsumerCallbackResult callbackResult = this.getAsyncCallback().onCheckpoint(winScn);
        boolean bl = persistCheckpoint = !ConsumerCallbackResult.isSkipCheckpoint((ConsumerCallbackResult)callbackResult) && ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
        if (persistCheckpoint) {
            if (null != this.getCheckpointPersistor()) {
                this.getCheckpointPersistor().storeCheckpointV3(this.getSubsciptionsList(), cp, this._registrationId);
                ++this._numCheckPoints;
            }
            curState.storeCheckpoint(cp, winScn);
            this.removeEvents(curState);
            if (debugEnabled) {
                this._log.debug((Object)("Checkpoint saved: " + cp.toString()));
            }
        } else if (debugEnabled) {
            this._log.debug((Object)("Checkpoint " + cp + " not saved as callback returned " + callbackResult));
        }
        return success;
    }

    protected boolean processSysEvent(DispatcherState curState, DbusEvent event) {
        this._log.warn((Object)("Unknown system event: srcid=" + event.srcId()));
        return true;
    }

    protected boolean processDataEvent(DispatcherState curState, DbusEvent event) {
        ConsumerCallbackResult callbackResult = this.getAsyncCallback().onDataEvent(event, (DbusEventDecoder)curState.getEventDecoder());
        boolean success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
        if (!success) {
            this._log.error((Object)"Method onDataEvent failed on consumer callback returned an error.");
        } else {
            this._log.debug((Object)("Event queued: " + event.toString()));
        }
        return success;
    }

    protected boolean processDataEventsBatch(DispatcherState curState) {
        DbusPrettyLogUtils.logExceptionAtDebug((String)"Flushing batched events", null, (Logger)this._log);
        ConsumerCallbackResult callbackResult = this.getAsyncCallback().flushCallQueue(-1L);
        boolean success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
        if (!success) {
            this._log.error((Object)"Error dispatching events, the consumer callback returned an error");
        }
        return success;
    }

    protected void updateDSCTimestamp(long timestampInMillis) {
        if (this._serverHandle != null && this._serverHandle.getDSCUpdater() != null) {
            DatabusClientDSCUpdater updater = this._serverHandle.getDSCUpdater();
            updater.writeTimestamp(timestampInMillis);
        }
    }

    protected void doStartDispatchEvents() {
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug((Object)"Entered startDispatch");
        }
        this._asyncCallback.setSourceMap(this._internalState.getSources());
        this.getStatus().start();
        ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
        try {
            callbackResult = this._asyncCallback.onStartConsumption();
        }
        catch (RuntimeException e) {
            this._log.error((Object)("Internal startConsumption error: " + e.getMessage()), (Throwable)e);
        }
        Boolean callSuccess = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
        if (!callSuccess.booleanValue()) {
            this._log.error((Object)"StartConsumption failed.");
            this._internalState.switchToStopDispatch();
            this.doStopDispatch(this._internalState);
        } else {
            this._stopDispatch.set(false);
            this._internalState.switchToExpectEventWindow();
            this.doDispatchEvents();
        }
    }

    boolean hasCheckpointThresholdBeenExceeded() {
        boolean exceeded;
        long maxWindowSizeInBytes = (long)((double)this._dataEventsBuffer.getAllocatedSize() * this._connConfig.getCheckpointThresholdPct() / 100.0);
        boolean bl = exceeded = this.getCurrentWindowSizeInBytes() > maxWindowSizeInBytes;
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)("Threshold check : CurrentWindowSize=" + this.getCurrentWindowSizeInBytes() + " MaxWindowSize=" + maxWindowSizeInBytes + " bufferFreeSpace=" + this._dataEventsBuffer.getBufferFreeSpace()));
        }
        if (exceeded) {
            this._log.info((Object)("The checkpoint threshold has exceeded. The CurrentWindowSize=" + this.getCurrentWindowSizeInBytes() + " MaxWindowSize=" + maxWindowSizeInBytes + " bufferFreeSpace=" + this._dataEventsBuffer.getBufferFreeSpace()));
        }
        return exceeded;
    }

    /*
     * Unable to fully structure code
     */
    protected void doDispatchEvents() {
        debugEnabled = this._log.isDebugEnabled();
        traceEnabled = this._log.isTraceEnabled();
        curState = this._internalState;
        if (!(this._stopDispatch.get() || curState.getEventsIterator().hasNext() || this.checkForShutdownRequest())) {
            if (debugEnabled) {
                this._log.debug((Object)"Waiting for events");
            }
            curState.getEventsIterator().await(50L, TimeUnit.MILLISECONDS);
        }
        success = true;
        hasQueuedEvents = false;
        while (success && !this._stopDispatch.get() && curState.getStateId() != DispatcherState.StateId.STOP_DISPATCH_EVENTS && null != curState.getEventsIterator() && curState.getEventsIterator().hasNext() && !this.checkForShutdownRequest() && !this.hasMessages()) {
            nextEvent = curState.getEventsIterator().next();
            this._currentWindowSizeInBytes += (long)nextEvent.size();
            if (traceEnabled) {
                this._log.trace((Object)("Got event:" + nextEvent));
            }
            eventSrcId = nextEvent.srcId();
            if (curState.isSCNRegress()) {
                scn = new SingleSourceSCN((int)nextEvent.physicalPartitionId(), nextEvent.sequence());
                this._log.info((Object)("We are regressing to SCN: " + scn));
                curState.switchToRollback();
                this.doRollback(curState, (SCN)scn, false, false);
                curState.setSCNRegress(false);
                curState.switchToExpectEventWindow();
            }
            if (null != this.getAsyncCallback().getStats()) {
                this.getAsyncCallback().getStats().registerWindowSeen(nextEvent.timestampInNanos(), nextEvent.sequence());
            }
            if (nextEvent.isControlMessage()) {
                if (nextEvent.isEndOfPeriodMarker()) {
                    if (curState.isEventsSeen()) {
                        if (null != curState.getCurrentSource()) {
                            curState.switchToEndStreamSource();
                            success = this.doEndStreamSource(curState);
                        }
                        endWinScn = null;
                        if (success) {
                            this._lastWindowScn = nextEvent.sequence();
                            this._lastEowTsNsecs = nextEvent.timestampInNanos();
                            endWinScn = new SingleSourceSCN((int)nextEvent.physicalPartitionId(), this._lastWindowScn);
                            curState.switchToEndStreamEventWindow((SCN)endWinScn);
                            success = this.doEndStreamEventWindow(curState);
                        }
                        if (success) {
                            try {
                                cp = this.createCheckpoint(curState, (DbusEvent)nextEvent);
                                success = this.doStoreCheckpoint(curState, (DbusEvent)nextEvent, cp, (SCN)endWinScn);
                            }
                            catch (SharedCheckpointException e) {
                                return;
                            }
                        }
                    } else {
                        success = true;
                        if (this._log.isDebugEnabled()) {
                            this._log.debug((Object)("skipping empty window: " + nextEvent.sequence()));
                        }
                        if (nextEvent.sequence() > 0L) {
                            this._lastWindowScn = nextEvent.sequence();
                            if (nextEvent.timestampInNanos() > 0L) {
                                this._lastEowTsNsecs = nextEvent.timestampInNanos();
                            }
                            ckpt = this.createCheckpoint(curState, (DbusEvent)nextEvent);
                            try {
                                success = this.doStoreCheckpoint(curState, (DbusEvent)nextEvent, ckpt, (SCN)new SingleSourceSCN((int)nextEvent.physicalPartitionId(), nextEvent.sequence()));
                            }
                            catch (SharedCheckpointException e) {
                                return;
                            }
                        } else {
                            this._log.warn((Object)("EOP with scn=" + nextEvent.sequence()));
                        }
                    }
                    if (success) {
                        curState.switchToExpectEventWindow();
                        if (nextEvent.sequence() > 0L && !this.getStatus().isRunningStatus()) {
                            this.getStatus().resume();
                        }
                    }
                } else if (nextEvent.isErrorEvent()) {
                    this._log.info((Object)("Error event: " + nextEvent.sequence()));
                    success = this.processErrorEvent(curState, (DbusEventInternalReadable)nextEvent);
                } else {
                    success = this.processSysEvent(curState, (DbusEvent)nextEvent);
                    if (success && nextEvent.isCheckpointMessage()) {
                        sysCheckpt = this.createCheckpoint(curState, (DbusEvent)nextEvent);
                        try {
                            v0 = scn = sysCheckpt.getConsumptionMode() == DbusClientMode.ONLINE_CONSUMPTION ? nextEvent.sequence() : sysCheckpt.getBootstrapSinceScn().longValue();
                            if (scn <= 0L && sysCheckpt.getConsumptionMode() != DbusClientMode.BOOTSTRAP_SNAPSHOT) ** GOTO lbl111
                            success = this.doStoreCheckpoint(curState, (DbusEvent)nextEvent, sysCheckpt, (SCN)new SingleSourceSCN((int)nextEvent.physicalPartitionId(), scn));
                        }
                        catch (SharedCheckpointException e) {
                            return;
                        }
                    }
                }
            } else {
                curState.setEventsSeen(true);
                if (curState.getStateId().equals((Object)DispatcherState.StateId.EXPECT_EVENT_WINDOW) || curState.getStateId().equals((Object)DispatcherState.StateId.REPLAY_DATA_EVENTS)) {
                    startScn = new SingleSourceSCN((int)nextEvent.physicalPartitionId(), nextEvent.sequence());
                    curState.switchToStartStreamEventWindow((SCN)startScn);
                    success = this.doStartStreamEventWindow(curState);
                    if (success && eventSrcId >= 0L) {
                        success = this.doCheckStartSource(curState, eventSrcId, new SchemaId(nextEvent.schemaId()));
                    }
                } else {
                    if (null != curState.getCurrentSource() && !eventSrcId.equals(curState.getCurrentSource().getId())) {
                        curState.switchToEndStreamSource();
                        success = this.doEndStreamSource(curState);
                    }
                    if (success) {
                        success = this.doCheckStartSource(curState, eventSrcId, new SchemaId(nextEvent.schemaId()));
                    }
                }
                if (success && (success = this.processDataEvent(curState, (DbusEvent)nextEvent))) {
                    hasQueuedEvents = true;
                    if (this.hasCheckpointThresholdBeenExceeded()) {
                        this._log.info((Object)("Attempting to checkpoint (only if the consumer callback for onCheckpoint returns SUCCESS), because " + this.getCurrentWindowSizeInBytes() + " bytes reached without checkpoint "));
                        success = this.processDataEventsBatch(curState);
                        if (success) {
                            hasQueuedEvents = false;
                            cp = this.createCheckpoint(curState, (DbusEvent)nextEvent);
                            lastScn = cp.getConsumptionMode() == DbusClientMode.ONLINE_CONSUMPTION ? curState.getStartWinScn() : new SingleSourceSCN((int)nextEvent.physicalPartitionId(), cp.getBootstrapSinceScn().longValue());
                            try {
                                success = this.doStoreCheckpoint(curState, (DbusEvent)nextEvent, cp, lastScn);
                            }
                            catch (SharedCheckpointException e) {
                                return;
                            }
                            curState.switchToExpectStreamDataEvents();
                            if (!this.getStatus().isRunningStatus()) {
                                this.getStatus().resume();
                            }
                        }
                    }
                }
            }
lbl111:
            // 14 sources

            if (!success || !this.hasCheckpointThresholdBeenExceeded() || !(success = this.processDataEventsBatch(curState))) continue;
            this._log.warn((Object)("Checkpoint not stored, but removing older events from buffer to guarantee progress (checkpoint threshold has exceeded), consider checkpointing more frequently. Triggered on control-event=" + nextEvent.isControlMessage()));
            this.removeEvents(curState);
        }
        if (!this._stopDispatch.get() && !this.checkForShutdownRequest()) {
            if (success) {
                if (hasQueuedEvents && !(success = this.processDataEventsBatch(curState))) {
                    this._log.error((Object)"Unable to flush partial window");
                }
                if (debugEnabled) {
                    this._log.debug((Object)("doDispatchEvents to " + curState.toString()));
                }
            }
            if (!success) {
                curState.switchToRollback();
                this.doRollback(curState);
            }
            this.enqueueMessage(curState);
        }
    }

    protected boolean doStoreCheckpoint(DispatcherState curState, DbusEvent nextEvent, Checkpoint cp, SCN endWinScn) throws SharedCheckpointException {
        boolean success;
        block4: {
            success = this.processDataEventsBatch(curState);
            if (!success) {
                this._log.error((Object)("Consumers did not process callback successfully (callback did not return success). The current checkpoint= " + nextEvent.isCheckpointMessage() + " end of period marker=" + nextEvent.isEndOfPeriodMarker()));
            } else {
                try {
                    this.storeCheckpoint(curState, cp, endWinScn);
                }
                catch (IOException e) {
                    this._log.error((Object)("Checkpoint persisting failed, the checkpoint is : " + cp));
                    if (!this.isSharedCheckpoint()) break block4;
                    this.handleErrStoringSharedCheckpoint();
                    throw new SharedCheckpointException(e);
                }
            }
        }
        return success;
    }

    protected boolean isSharedCheckpoint() {
        return this.getCheckpointPersistor() instanceof SharedCheckpointPersistenceProvider;
    }

    protected void handleErrStoringSharedCheckpoint() {
        if (this._serverHandle != null) {
            this._log.info((Object)"Server should be shutdown! \n");
            this._serverHandle.shutdownAsynchronously();
        }
    }

    private boolean processErrorEvent(DispatcherState curState, DbusEventInternalReadable nextEvent) {
        boolean success = false;
        DbusErrorEvent errorEvent = null;
        if (nextEvent.isErrorEvent()) {
            errorEvent = DbusEventSerializable.getErrorEventFromDbusEvent((DbusEventInternalReadable)nextEvent);
            if (null == errorEvent) {
                this._log.error((Object)"Null error event received at dispatcher");
            } else {
                this._log.info((Object)("Delivering error event to consumers: " + errorEvent));
                ConsumerCallbackResult callbackResult = ConsumerCallbackResult.ERROR;
                try {
                    callbackResult = this._asyncCallback.onError(errorEvent.returnActualException());
                }
                catch (RuntimeException e) {
                    this._log.error((Object)("Internal onError error: " + e.getMessage()), (Throwable)e);
                }
                success = ConsumerCallbackResult.isSuccess((ConsumerCallbackResult)callbackResult);
            }
        } else {
            this._log.error((Object)("Unexcpected event received while DbusErrorEvent is expected! " + nextEvent));
        }
        return success;
    }

    public List<DatabusSubscription> getSubsciptionsList() {
        return this._subsList;
    }

    public CheckpointPersistenceProvider getCheckpointPersistor() {
        return this._checkpointPersistor;
    }

    public MultiConsumerCallback getAsyncCallback() {
        return this._asyncCallback;
    }

    public DatabusComponentStatus getStatus() {
        return this.getComponentStatus();
    }

    public long getCurrentWindowSizeInBytes() {
        return this._currentWindowSizeInBytes;
    }

    public void setCurrentWindowSizeInBytes(long currentWindowSizeInBytes) {
        this._currentWindowSizeInBytes = currentWindowSizeInBytes;
    }

    long getNumCheckPoints() {
        return this._numCheckPoints;
    }

    public void setNumCheckPoints(long numCheckPoints) {
        this._numCheckPoints = numCheckPoints;
    }

    protected abstract Checkpoint createCheckpoint(DispatcherState var1, DbusEvent var2);

    public static Checkpoint createOnlineConsumptionCheckpoint(long lastCompleteWindowScn, long lastEowTsNsecs, DispatcherState curState, DbusEvent event) {
        long windowScn = lastCompleteWindowScn;
        if (windowScn < 0L) {
            windowScn = event.isCheckpointMessage() ? event.sequence() : (event.sequence() > 0L ? event.sequence() - 1L : 0L);
        }
        return Checkpoint.createOnlineConsumptionCheckpoint((long)windowScn, (long)lastEowTsNsecs);
    }

    protected void removeEvents(DispatcherState state) {
        boolean isDebugEnabled = this._log.isDebugEnabled();
        if (isDebugEnabled) {
            this._log.debug((Object)"Removing events after checkpoint");
            this._log.debug((Object)("Buffer space available before remove: " + state.getEventsIterator().getEventBuffer().getBufferFreeSpace()));
        }
        state.removeEvents();
        this._currentWindowSizeInBytes = 0L;
        if (isDebugEnabled) {
            this._log.debug((Object)("Buffer space available after remove: " + state.getEventsIterator().getEventBuffer().getBufferFreeSpace()));
        }
    }

    public void shutdown() {
        if (this._statusMbean != null) {
            this._statusMbean.unregisterMbean(this._mbeanServer);
        }
        super.shutdown();
    }

    protected void onShutdown() {
        if (DispatcherState.StateId.CLOSED != this._internalState.getStateId()) {
            this._internalState.switchToClosed();
        }
    }

    public void getQueueListString(StringBuilder sb) {
        super.getQueueListString(sb);
        sb.append('+');
        if (null != this._internalState) {
            sb.append(this._internalState.toString());
        }
    }

    public DispatcherState getDispatcherState() {
        return this._internalState;
    }

    public void enqueueMessage(Object message) {
        if (message == this._internalState) {
            if (this._inInternalLoop) {
                throw new RuntimeException("Loop already scheduled; queued messages: " + this.getQueueListString() + "; new message: " + message);
            }
            this._inInternalLoop = true;
        }
        super.enqueueMessage(message);
    }

    public static class SharedCheckpointException
    extends Exception {
        private static final long serialVersionUID = 1L;

        public SharedCheckpointException(Exception cause) {
            super(cause);
        }
    }
}

