/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.DatabusBootstrapConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractHttpResponseProcessorDecorator;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusClientMode;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.InvalidCheckpointException;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import com.linkedin.databus2.core.container.request.BootstrapDatabaseTooOldException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.handler.codec.http.HttpResponse;

class BootstrapTargetScnHttpResponseProcessor
extends AbstractNettyHttpConnection.BaseHttpResponseProcessor {
    public static final String MODULE = BootstrapTargetScnHttpResponseProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final ActorMessageQueue _callback;
    private final DatabusBootstrapConnectionStateMessage _stateReuse;
    private final Checkpoint _checkpoint;
    private final RemoteExceptionHandler _remoteExceptionHandler;

    public BootstrapTargetScnHttpResponseProcessor(AbstractNettyHttpConnection parent, ActorMessageQueue bootstrapPullThread, DatabusBootstrapConnectionStateMessage readStartScnState, Checkpoint checkpoint, RemoteExceptionHandler remoteExceptionHandler, ExtendedReadTimeoutHandler readTimeoutHandler) {
        super(parent, readTimeoutHandler);
        this._callback = bootstrapPullThread;
        this._stateReuse = readStartScnState;
        this._checkpoint = checkpoint;
        this._remoteExceptionHandler = remoteExceptionHandler;
    }

    @Override
    public void finishResponse() throws Exception {
        super.finishResponse();
        if (this._errorHandled) {
            return;
        }
        try {
            String exceptionName = RemoteExceptionHandler.getExceptionName((ChunkedBodyReadableByteChannel)this._decorated);
            Throwable remoteException = this._remoteExceptionHandler.getException((ChunkedBodyReadableByteChannel)this._decorated);
            if (null != remoteException && remoteException instanceof BootstrapDatabaseTooOldException) {
                this._remoteExceptionHandler.handleException(remoteException);
            } else if (null != exceptionName) {
                LOG.error((Object)("/targetScn response error: " + RemoteExceptionHandler.getExceptionMessage((ChunkedBodyReadableByteChannel)this._decorated)));
                this._stateReuse.switchToTargetScnResponseError();
            } else {
                InputStream bodyStream = Channels.newInputStream((ReadableByteChannel)((Object)this._decorated));
                ObjectMapper mapper = new ObjectMapper();
                String scnString = (String)mapper.readValue(bodyStream, String.class);
                LOG.info((Object)("targetScn:" + scnString));
                long targetScn = Long.parseLong(scnString);
                this._stateReuse.switchToTargetScnSuccess();
                Checkpoint ckpt = this._checkpoint;
                if (ckpt.getConsumptionMode() != DbusClientMode.BOOTSTRAP_SNAPSHOT) {
                    throw new InvalidCheckpointException("TargetScnResponseProcessor: expecting in client mode: " + DbusClientMode.BOOTSTRAP_SNAPSHOT, ckpt);
                }
                if (!ckpt.isSnapShotSourceCompleted()) {
                    throw new InvalidCheckpointException("TargetScnResponseProcessor: current snapshot source not completed", ckpt);
                }
                LOG.info((Object)("Target SCN " + targetScn + " received for bootstrap catchup source " + ckpt.getCatchupSource() + " after completion of snapshot source " + ckpt.getSnapshotSource()));
                ckpt.setBootstrapTargetScn(Long.valueOf(targetScn));
            }
        }
        catch (Exception ex) {
            LOG.error((Object)("/targetScn response error:" + ex.getMessage()), (Throwable)ex);
            this._stateReuse.switchToTargetScnResponseError();
        }
        this._callback.enqueueMessage((Object)this._stateReuse);
    }

    @Override
    public void startResponse(HttpResponse response) throws Exception {
        this._decorated = new ChunkedBodyReadableByteChannel();
        super.startResponse(response);
    }

    @Override
    public void handleChannelException(Throwable cause) {
        DbusPrettyLogUtils.logExceptionAtError((String)"Exception during /targetSCN response: ", (Throwable)cause, (Logger)LOG);
        if (this._responseStatus != AbstractHttpResponseProcessorDecorator.ResponseStatus.CHUNKS_FINISHED) {
            LOG.info((Object)"Enqueueing TargetSCN Response Error State to Puller Queue");
            this._stateReuse.switchToTargetScnResponseError();
            this._callback.enqueueMessage((Object)this._stateReuse);
        } else {
            LOG.info((Object)"Skipping Enqueueing TargetSCN Response Error State to Puller Queue");
        }
        super.handleChannelException(cause);
    }
}

