/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.DatabusStreamConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractHttpResponseProcessorDecorator;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import org.apache.log4j.Logger;
import org.jboss.netty.handler.codec.http.HttpResponse;

class StreamHttpResponseProcessor
extends AbstractNettyHttpConnection.BaseHttpResponseProcessor {
    public static final String MODULE = StreamHttpResponseProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final DatabusStreamConnectionStateMessage _stateReuse;
    private final ActorMessageQueue _callback;

    public StreamHttpResponseProcessor(AbstractNettyHttpConnection parent, ActorMessageQueue callback, DatabusStreamConnectionStateMessage stateReuse, ExtendedReadTimeoutHandler readTimeOutHandler) {
        super(parent, readTimeOutHandler);
        this._stateReuse = stateReuse;
        this._callback = callback;
    }

    @Override
    public void finishResponse() throws Exception {
        super.finishResponse();
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)"finished response for /stream");
        }
    }

    @Override
    public void startResponse(HttpResponse response) throws Exception {
        block4: {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)"started response for /stream");
                }
                this._decorated = new ChunkedBodyReadableByteChannel();
                super.startResponse(response);
                if (!this._errorHandled) {
                    this._stateReuse.switchToStreamSuccess((ChunkedBodyReadableByteChannel)this._decorated);
                    this._callback.enqueueMessage((Object)this._stateReuse);
                }
            }
            catch (Exception e) {
                LOG.error((Object)"Error reading events from server", (Throwable)e);
                if (this._errorHandled) break block4;
                this._stateReuse.switchToStreamResponseError();
                this._callback.enqueueMessage((Object)this._stateReuse);
            }
        }
    }

    @Override
    public void handleChannelException(Throwable cause) {
        DbusPrettyLogUtils.logExceptionAtError((String)"Exception during /stream response: ", (Throwable)cause, (Logger)LOG);
        if (this._responseStatus != AbstractHttpResponseProcessorDecorator.ResponseStatus.CHUNKS_SEEN && this._responseStatus != AbstractHttpResponseProcessorDecorator.ResponseStatus.CHUNKS_FINISHED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Enqueueing /stream response error state to puller queue");
            }
            this._stateReuse.switchToStreamResponseError();
            this._callback.enqueueMessage((Object)this._stateReuse);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Skipping enqueueing /stream response error state to puller queue");
        }
        super.handleChannelException(cause);
    }
}

