/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.BootstrapResultMessage;
import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.DispatcherState;
import com.linkedin.databus.client.GenericDispatcher;
import com.linkedin.databus.client.RelayPullThread;
import com.linkedin.databus.client.consumer.MultiConsumerCallback;
import com.linkedin.databus.client.pub.CheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.DatabusCombinedConsumer;
import com.linkedin.databus.client.pub.RegistrationId;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public class BootstrapDispatcher
extends GenericDispatcher<DatabusCombinedConsumer> {
    public static final String MODULE = BootstrapDispatcher.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final RelayPullThread _relayPuller;
    private Checkpoint _lastCkpt;
    private DbusClientMode _bootstrapMode;

    public BootstrapDispatcher(String name, DatabusSourcesConnection.StaticConfig connConfig, List<DatabusSubscription> subs, CheckpointPersistenceProvider checkpointPersistor, DbusEventBuffer dataEventsBuffer, MultiConsumerCallback asyncCallback, RelayPullThread relayPuller, MBeanServer mbeanServer, DatabusHttpClientImpl serverHandle, RegistrationId registrationId, Logger log) {
        super(name, connConfig, subs, checkpointPersistor, dataEventsBuffer, asyncCallback, mbeanServer, serverHandle, registrationId, connConfig.getBstDispatcherRetries(), log);
        this._relayPuller = relayPuller;
    }

    @Override
    protected void doStartDispatchEvents() {
        this._bootstrapMode = DbusClientMode.BOOTSTRAP_SNAPSHOT;
        this._lastCkpt = null;
        super.doStartDispatchEvents();
    }

    @Override
    protected boolean processSysEvent(DispatcherState curState, DbusEvent event) {
        boolean success = true;
        boolean debugEnabled = this.getLog().isDebugEnabled();
        Checkpoint ckptInEvent = null;
        int eventSrcId = event.getSourceId();
        if (event.isCheckpointMessage()) {
            ByteBuffer eventValue = event.value();
            byte[] eventBytes = new byte[eventValue.limit()];
            eventValue.get(eventBytes);
            if (eventValue.limit() > 0) {
                try {
                    String cpString = new String(eventBytes, "UTF-8");
                    this._lastCkpt = ckptInEvent = new Checkpoint(cpString);
                    this.getLog().info((Object)("Bootstrap checkpoint received from the bootstrap server: " + ckptInEvent));
                    this._bootstrapMode = this._lastCkpt.getConsumptionMode();
                    curState.setEventsSeen(true);
                    if (this._bootstrapMode == DbusClientMode.ONLINE_CONSUMPTION) {
                        this.getLog().info((Object)"Bootstrap is complete. Switching to relay consumption");
                        Checkpoint restartCkpt = this._lastCkpt.clone();
                        this._relayPuller.enqueueMessage(BootstrapResultMessage.createBootstrapCompleteMessage(restartCkpt));
                    }
                }
                catch (RuntimeException e) {
                    this.getLog().error((Object)"Error while processing internal databus event", (Throwable)e);
                    success = false;
                }
                catch (IOException e) {
                    this.getLog().error((Object)"Error while processing internal databus event", (Throwable)e);
                    success = false;
                }
            } else {
                this.getLog().error((Object)"Missing checkpoint in control message");
                success = false;
            }
        } else {
            if (debugEnabled) {
                this.getLog().debug((Object)(this.getName() + ": control srcid:" + eventSrcId));
            }
            success = super.processSysEvent(curState, event);
        }
        return success;
    }

    @Override
    protected Checkpoint createCheckpoint(DispatcherState curState, DbusEvent event) {
        if (null != curState.getCurrentSource()) {
            if (null == this._lastCkpt) {
                throw new DatabusRuntimeException("Unable to create a checkpoint");
            }
            this._lastCkpt.onEvent(event);
        }
        return this._lastCkpt;
    }
}

