/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.DatabusBootstrapConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractHttpResponseProcessorDecorator;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.handler.codec.http.HttpResponse;

class BootstrapStartScnHttpResponseProcessor
extends AbstractNettyHttpConnection.BaseHttpResponseProcessor {
    public static final String MODULE = BootstrapStartScnHttpResponseProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final ActorMessageQueue _callback;
    private final DatabusBootstrapConnectionStateMessage _stateReuse;
    private final Checkpoint _checkpoint;
    private final RemoteExceptionHandler _remoteExceptionHandler;

    public BootstrapStartScnHttpResponseProcessor(AbstractNettyHttpConnection parent, ActorMessageQueue bootstrapPullThread, DatabusBootstrapConnectionStateMessage readStartScnState, Checkpoint checkpoint, RemoteExceptionHandler remoteExceptionHandler, ExtendedReadTimeoutHandler readTimeOutHandler) {
        super(parent, readTimeOutHandler);
        this._callback = bootstrapPullThread;
        this._stateReuse = readStartScnState;
        this._checkpoint = checkpoint;
        this._remoteExceptionHandler = remoteExceptionHandler;
    }

    @Override
    public void finishResponse() throws Exception {
        block12: {
            super.finishResponse();
            if (this._errorHandled) {
                return;
            }
            try {
                String exceptionName = RemoteExceptionHandler.getExceptionName((ChunkedBodyReadableByteChannel)this._decorated);
                Throwable remoteException = this._remoteExceptionHandler.getException((ChunkedBodyReadableByteChannel)this._decorated);
                if (null != remoteException && remoteException instanceof BootstrapDatabaseTooOldException) {
                    this._remoteExceptionHandler.handleException(remoteException);
                    break block12;
                }
                if (null != exceptionName) {
                    LOG.error((Object)("/startScn response error: " + RemoteExceptionHandler.getExceptionMessage((ChunkedBodyReadableByteChannel)this._decorated)));
                    this._stateReuse.switchToStartScnResponseError();
                    LOG.error((Object)"Failed to process /startscn response");
                    break block12;
                }
                String hostHdr = "UNKNOWN";
                String svcHdr = "UNKNOWN";
                if (null != this.getParent()) {
                    hostHdr = this.getParent().getRemoteHost();
                    svcHdr = this.getParent().getRemoteService();
                    LOG.info((Object)("initiated bootstrap sesssion to host " + hostHdr + " service " + svcHdr));
                }
                InputStream bodyStream = Channels.newInputStream((ReadableByteChannel)((Object)this._decorated));
                ObjectMapper mapper = new ObjectMapper();
                String scnString = (String)mapper.readValue(bodyStream, String.class);
                ServerInfo serverInfo = null;
                String serverHostPort = ((ChunkedBodyReadableByteChannel)this._decorated).getMetadata("X-DBUS2-BS-HOSTPORT");
                try {
                    serverInfo = ServerInfo.buildServerInfoFromHostPort((String)serverHostPort, (String)":");
                }
                catch (Exception ex) {
                    LOG.error((Object)("Unable to extract Boostrap Server info from StartSCN response. ServerInfo was :" + serverHostPort), (Throwable)ex);
                }
                LOG.info((Object)("Response startScn:" + scnString + ", from bootstrap Server :" + serverHostPort));
                long startScn = Long.parseLong(scnString);
                Checkpoint ckpt = this._checkpoint;
                if (startScn < 0L) {
                    LOG.error((Object)("unexpected value for startSCN: " + startScn));
                    this._stateReuse.switchToStartScnResponseError();
                } else if (ckpt.getConsumptionMode() != DbusClientMode.BOOTSTRAP_SNAPSHOT) {
                    LOG.error((Object)("StartScnResponseProcessor: expecting in client mode: " + DbusClientMode.BOOTSTRAP_SNAPSHOT + " while in the incorrect mode: " + ckpt.getConsumptionMode()));
                } else {
                    LOG.info((Object)("Start SCN " + startScn + " received for bootstrap snapshot source " + ckpt.getSnapshotSource()));
                    ckpt.setBootstrapStartScn(Long.valueOf(startScn));
                    ckpt.setBootstrapServerInfo(serverHostPort);
                    this._stateReuse.switchToStartScnSuccess(this._checkpoint, null, serverInfo);
                }
            }
            catch (Exception ex) {
                LOG.error((Object)"Failed to process /startscn response", (Throwable)ex);
                this._stateReuse.switchToStartScnResponseError();
            }
        }
        this._callback.enqueueMessage((Object)this._stateReuse);
    }

    @Override
    public void startResponse(HttpResponse response) throws Exception {
        this._decorated = new ChunkedBodyReadableByteChannel();
        super.startResponse(response);
    }

    @Override
    public void handleChannelException(Throwable cause) {
        DbusPrettyLogUtils.logExceptionAtError((String)"Exception during /startSCN response: ", (Throwable)cause, (Logger)LOG);
        if (this._responseStatus != AbstractHttpResponseProcessorDecorator.ResponseStatus.CHUNKS_FINISHED) {
            LOG.info((Object)"Enqueueing StartSCN Response Error State to Puller Queue");
            this._stateReuse.switchToStartScnResponseError();
            this._callback.enqueueMessage((Object)this._stateReuse);
        } else {
            LOG.info((Object)"Skipping Enqueueing StartSCN Response Error State to Puller Queue");
        }
        super.handleChannelException(cause);
    }
}

