/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.DatabusRelayConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractHttpResponseProcessorDecorator;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus.core.util.IdNamePair;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.jboss.netty.handler.codec.http.HttpResponse;

class SourcesHttpResponseProcessor<M extends DatabusRelayConnectionStateMessage>
extends AbstractNettyHttpConnection.BaseHttpResponseProcessor {
    public static final String MODULE = SourcesHttpResponseProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final ActorMessageQueue _callback;
    private final DatabusRelayConnectionStateMessage _stateReuse;

    public SourcesHttpResponseProcessor(AbstractNettyHttpConnection parent, ActorMessageQueue callback, M stateReuse, ExtendedReadTimeoutHandler readTimeOutHandler) {
        super(parent, readTimeOutHandler);
        this._callback = callback;
        this._stateReuse = stateReuse;
    }

    @Override
    public void finishResponse() throws Exception {
        super.finishResponse();
        if (this._errorHandled) {
            return;
        }
        String sourcesResponseError = "/sources response error: ";
        try {
            String exceptionName = RemoteExceptionHandler.getExceptionName((ChunkedBodyReadableByteChannel)this._decorated);
            if (null != exceptionName) {
                LOG.error((Object)("/sources response error: " + RemoteExceptionHandler.getExceptionMessage((ChunkedBodyReadableByteChannel)this._decorated)));
                this._stateReuse.switchToSourcesResponseError();
            } else {
                String hostHdr = "UNKNOWN";
                String svcHdr = "UNKNOWN";
                if (null != this.getParent()) {
                    hostHdr = this.getParent().getRemoteHost();
                    svcHdr = this.getParent().getRemoteService();
                    LOG.info((Object)("initiated sesssion to host " + hostHdr + " service " + svcHdr));
                }
                InputStream bodyStream = Channels.newInputStream((ReadableByteChannel)((Object)this._decorated));
                ObjectMapper mapper = new ObjectMapper();
                List sources = (List)mapper.readValue(bodyStream, (TypeReference)new TypeReference<List<IdNamePair>>(){});
                this._stateReuse.switchToSourcesSuccess(sources, hostHdr, svcHdr);
            }
        }
        catch (IOException ex) {
            LOG.error((Object)"/sources response error: ", (Throwable)ex);
            this._stateReuse.switchToSourcesResponseError();
        }
        catch (RuntimeException ex) {
            LOG.error((Object)"/sources response error: ", (Throwable)ex);
            this._stateReuse.switchToSourcesResponseError();
        }
        this._callback.enqueueMessage((Object)this._stateReuse);
    }

    @Override
    public void startResponse(HttpResponse response) throws Exception {
        this._decorated = new ChunkedBodyReadableByteChannel();
        super.startResponse(response);
    }

    @Override
    public void handleChannelException(Throwable cause) {
        DbusPrettyLogUtils.logExceptionAtError((String)"Exception during /sources response: ", (Throwable)cause, (Logger)LOG);
        if (this._responseStatus != AbstractHttpResponseProcessorDecorator.ResponseStatus.CHUNKS_FINISHED) {
            LOG.info((Object)"Enqueueing /sources response error state to puller queue");
            this._stateReuse.switchToSourcesResponseError();
            this._callback.enqueueMessage((Object)this._stateReuse);
        } else {
            LOG.info((Object)"Skipping enqueueing /sources response error state to puller queue");
        }
        super.handleChannelException(cause);
    }
}

