/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.HttpResponseProcessor;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus2.core.DatabusException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.RejectedExecutionException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;

public class GenericHttpResponseHandler
extends SimpleChannelHandler {
    public static final String MODULE = GenericHttpResponseHandler.class.getName();
    private HttpResponseProcessor _responseProcessor;
    private final KeepAliveType _keepAlive;
    MessageState _messageState;
    private ChannelState _channelState;
    private HttpResponse _httpResponse;
    private HttpChunkTrailer _httpTrailer;
    private HttpRequest _httpRequest;
    private AbstractNettyHttpConnection.ConnectResultListener _connectListener = null;
    private AbstractNettyHttpConnection.SendRequestResultListener _requestListener = null;
    private AbstractNettyHttpConnection.ChannelCloseListener _closeListener = null;
    private Throwable _lastError;
    private final StateLogger _log;

    GenericHttpResponseHandler(HttpResponseProcessor responseProcessor, KeepAliveType keepAlive) {
        this(keepAlive, null);
        this._responseProcessor = responseProcessor;
    }

    public GenericHttpResponseHandler(KeepAliveType keepAlive) {
        this(keepAlive, null);
    }

    public GenericHttpResponseHandler(KeepAliveType keepAlive, Logger log) {
        this._keepAlive = keepAlive;
        if (log == null) {
            log = Logger.getLogger((String)MODULE);
        }
        this._log = new StateLogger(log);
        this._messageState = MessageState.INIT;
        this.reset();
        this._log.info("Created new Handler");
    }

    public synchronized void setConnectionListener(AbstractNettyHttpConnection.ConnectResultListener listener) {
        this._connectListener = listener;
    }

    public synchronized void setRequestListener(AbstractNettyHttpConnection.SendRequestResultListener listener) {
        this._requestListener = listener;
    }

    public synchronized void setCloseListener(AbstractNettyHttpConnection.ChannelCloseListener listener) {
        this._closeListener = listener;
    }

    public Throwable getLastError() {
        return this._lastError;
    }

    public synchronized void setResponseProcessor(HttpResponseProcessor responseProcessor) throws DatabusException {
        if (responseProcessor == null) {
            throw new RuntimeException("GenericHttpResponseHandler cannot have null responseProcessor");
        }
        if (this._messageState != MessageState.REQUEST_WAIT) {
            String msg = "replacing responseProcessor while in state=" + (Object)((Object)this._messageState);
            this._log.error(msg);
            this._messageState = MessageState.CLOSED;
            throw new DatabusException((Throwable)new IllegalStateException(msg));
        }
        this._log.debug("setting processor " + responseProcessor);
        this._responseProcessor = responseProcessor;
    }

    private boolean validateCurrentState(Channel c, MessageState expectedState) {
        if (this._messageState != expectedState) {
            String msg = "unexpected state: expectedState=" + (Object)((Object)expectedState) + "; actual State" + (Object)((Object)this._messageState);
            IllegalStateException cause = new IllegalStateException(msg);
            this._log.error(msg, cause);
            if (this._messageState == MessageState.INIT) {
                this.informConnectListener(c, cause);
            } else if (this._messageState == MessageState.REQUEST_START || this._messageState == MessageState.REQUEST_WAIT) {
                this.informRequestListener(this._httpRequest, cause);
            } else if (this._messageState.waitForResponse()) {
                if (this._responseProcessor != null) {
                    this._responseProcessor.channelException(cause);
                } else {
                    this._log.error("waiting for response but responseProcessor is null", cause);
                }
            }
            if (c != null) {
                this._log.debug("closing the channel because state validate failed");
                c.close();
            }
            this._messageState = MessageState.CLOSED;
            return false;
        }
        return true;
    }

    synchronized void reset() {
        if (this._messageState != MessageState.INIT && this._messageState != MessageState.CLOSED) {
            String msg = "calling reset in wrong state " + (Object)((Object)this._messageState);
            this._log.error(msg);
            throw new IllegalStateException(msg);
        }
        this._messageState = MessageState.INIT;
        this._channelState = ChannelState.CHANNEL_ACTIVE;
        this._requestListener = null;
        this._connectListener = null;
        this._closeListener = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void informConnectListener(Channel channel, Throwable cause) {
        boolean success = cause == null;
        this._log.debug("informRequestListener: success=" + success + ";ch=" + channel, cause);
        AbstractNettyHttpConnection.ConnectResultListener tempListener = null;
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            if (cause != null) {
                this._lastError = cause;
            }
            if (this._connectListener != null) {
                tempListener = this._connectListener;
                this._connectListener = null;
            }
        }
        if (tempListener != null) {
            this._log.info("Notify about connection completed. success=" + success);
            if (success) {
                tempListener.onConnectSuccess(channel);
            } else {
                tempListener.onConnectFailure(cause);
            }
        } else {
            this._log.warn("informConnectListener called with listener==null; ch=" + channel, cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void informRequestListener(HttpRequest req, Throwable cause) {
        boolean success = cause == null;
        boolean debug = this._log.isDebugEnabled();
        AbstractNettyHttpConnection.SendRequestResultListener tempListener = null;
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            if (cause != null) {
                this._lastError = cause;
            }
            if (this._requestListener != null) {
                tempListener = this._requestListener;
                this._requestListener = null;
            }
        }
        if (debug) {
            this._log.debug("informRequestListener: success=" + success + ";req=" + req, cause);
        }
        if (tempListener != null) {
            this._log.debug("Notify about requestSent completed. success=" + success);
            if (success) {
                tempListener.onSendRequestSuccess(req);
            } else {
                tempListener.onSendRequestFailure(req, cause);
            }
        } else {
            this._log.warn("informRequestListener called with listener==null; req=" + req, cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void informCloseListener() {
        this._log.info("Calling channelCloseListener");
        AbstractNettyHttpConnection.ChannelCloseListener tempListener = null;
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            if (this._closeListener != null) {
                tempListener = this._closeListener;
                this._closeListener = null;
            }
        }
        if (tempListener != null) {
            tempListener.onChannelClose();
        }
    }

    public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this._log.info("channel to peer bound: " + e.getChannel().getRemoteAddress());
        super.channelBound(ctx, e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this._log.debug("channelConnected");
        super.channelConnected(ctx, e);
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            if (!this.validateCurrentState(e.getChannel(), MessageState.INIT)) {
                return;
            }
            this._messageState = MessageState.CONNECTED;
            this._log.info("channel to peer connected: " + e.getChannel().getRemoteAddress());
            this._messageState = MessageState.REQUEST_WAIT;
        }
        this.informConnectListener(e.getChannel(), null);
    }

    MessageState getMessageState() {
        return this._messageState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        boolean debugEnabled = this._log.isDebugEnabled();
        if (debugEnabled) {
            this._log.debug("WriteRequested: chConnected=" + e.getChannel().isConnected() + "; msg=" + e.getMessage());
        }
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            if (e.getMessage() instanceof HttpRequest) {
                this._httpRequest = (HttpRequest)e.getMessage();
                if (!this.validateCurrentState(e.getChannel(), MessageState.REQUEST_WAIT)) {
                    return;
                }
                this._messageState = MessageState.REQUEST_START;
                if (debugEnabled) {
                    this._log.debug("Write Requested  :" + e);
                }
            }
        }
        super.writeRequested(ctx, e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        IllegalStateException cause = null;
        if (this._httpRequest == null) {
            super.writeComplete(ctx, e);
        }
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            this._log.debug("WriteComplete");
            if (!this.validateCurrentState(e.getChannel(), MessageState.REQUEST_START)) {
                this._httpRequest = null;
                return;
            }
            ChannelFuture future = e.getFuture();
            boolean success = future.isSuccess();
            if (!success) {
                String msg = "Write request failed with cause :" + future.getCause();
                this._log.error(msg);
                this._messageState = MessageState.REQUEST_FAILURE;
                cause = new IllegalStateException(msg);
                this._messageState = MessageState.CLOSED;
            } else {
                this._messageState = MessageState.REQUEST_SENT;
                this._log.debug("Write Completed successfully :" + e);
                this._messageState = MessageState.RESPONSE_START;
            }
            this._httpRequest = null;
        }
        this.informRequestListener(this._httpRequest, cause);
        super.writeComplete(ctx, e);
        if (cause != null) {
            e.getChannel().close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        boolean debug = this._log.isDebugEnabled();
        Object message = e.getMessage();
        if (!(message instanceof HttpResponse || message instanceof HttpChunkTrailer || message instanceof HttpChunk)) {
            this._log.debug("Uknown object type:" + message.getClass().getName());
            super.messageReceived(ctx, e);
            return;
        }
        if (null == this._responseProcessor) {
            this._log.error("No response processor set");
            this._messageState = MessageState.CLOSED;
            e.getChannel().close();
            throw new RuntimeException("No response processor set in messageReceived.state=" + (Object)((Object)this._messageState) + ";msg=" + message);
        }
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            if (message instanceof HttpResponse) {
                if (!this.validateCurrentState(e.getChannel(), MessageState.RESPONSE_START)) {
                    this._log.error("MessageReceived(HttpResponse) failed for message: " + message);
                    return;
                }
                if (debug) {
                    this._log.debug("msgRecived. HttpResponse");
                }
                this._httpResponse = (HttpResponse)message;
                this._responseProcessor.startResponse(this._httpResponse);
                if (!this._httpResponse.isChunked()) {
                    this.finishResponse(e);
                } else {
                    this._messageState = MessageState.WAIT_FOR_CHUNK;
                }
            } else if (message instanceof HttpChunkTrailer) {
                if (!this.validateCurrentState(e.getChannel(), MessageState.WAIT_FOR_CHUNK)) {
                    this._log.error("MessageReceived(HttpChunkTrailer) failed for message: " + message);
                    return;
                }
                if (debug) {
                    this._log.debug("msgRecived. HttpChunkTrailer");
                }
                this._httpTrailer = (HttpChunkTrailer)message;
                this._responseProcessor.addTrailer(this._httpTrailer);
                this.finishResponse(e);
            } else if (message instanceof HttpChunk) {
                if (!this.validateCurrentState(e.getChannel(), MessageState.WAIT_FOR_CHUNK)) {
                    this._log.error("MessageReceived(HttpChunk) failed for message: " + message);
                    return;
                }
                if (debug) {
                    this._log.debug("msgRecived. HttpChunk");
                }
                this._messageState = MessageState.WAIT_FOR_CHUNK;
                this._responseProcessor.addChunk((HttpChunk)message);
            }
        }
    }

    private void finishResponse(MessageEvent e) throws Exception {
        this._messageState = MessageState.RESPONSE_FINISH;
        this._log.debug("FINISH_RESPONSE");
        this._responseProcessor.finishResponse();
        this._responseProcessor = null;
        if (this._keepAlive == KeepAliveType.NO_KEEP_ALIVE) {
            e.getChannel().close();
        }
        this._messageState = MessageState.REQUEST_WAIT;
    }

    private void logExceptionMessage(Throwable cause) {
        if (!this._messageState.hasSentRequest()) {
            if (!(cause instanceof ConnectException)) {
                this._log.info("Skipping exception message even before request has been sent. State=" + (Object)((Object)this._messageState), cause);
            } else {
                this._log.info("Got connection Exception", cause);
            }
        } else if (cause instanceof RejectedExecutionException) {
            this._log.info("shutdown in progress");
        } else if (cause instanceof IOException && null != cause.getMessage() && cause.getMessage().contains("Connection reset by peer")) {
            this._log.warn("connection reset by peer");
        } else if (!(cause instanceof ClosedChannelException)) {
            this._log.error("http client exception(" + cause.getClass().getSimpleName() + "):" + cause.getMessage(), cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        boolean debug = this._log.isDebugEnabled();
        Throwable cause = e.getCause();
        if (cause == null) {
            cause = new RuntimeException("exceptionCaught is invoked with empty exception");
        }
        this.logExceptionMessage(cause);
        if (debug) {
            this._log.debug("exceptionCaught.rp=" + this._responseProcessor, cause);
        }
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            switch (this._messageState) {
                case INIT: {
                    this._messageState = MessageState.CONNECT_FAIL;
                    this.informConnectListener(e.getChannel(), cause);
                    break;
                }
                case REQUEST_START: {
                    this._messageState = MessageState.REQUEST_FAILURE;
                    this.informRequestListener(this._httpRequest, cause);
                    break;
                }
                case REQUEST_SENT: 
                case RESPONSE_START: 
                case WAIT_FOR_CHUNK: {
                    this._messageState = MessageState.RESPONSE_FAILURE;
                    if (null == this._responseProcessor) break;
                    this._responseProcessor.channelException(cause);
                    break;
                }
                default: {
                    this._log.warn("exceptionCaught is called", cause);
                }
            }
            this._messageState = MessageState.CLOSED;
            this._channelState = ChannelState.CHANNEL_EXCEPTION;
        }
        super.exceptionCaught(ctx, e);
        e.getChannel().close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel channel = e.getChannel();
        SocketAddress a = null != channel ? channel.getRemoteAddress() : null;
        this._log.info("channel to peer closed: " + a);
        GenericHttpResponseHandler genericHttpResponseHandler = this;
        synchronized (genericHttpResponseHandler) {
            this._channelState = ChannelState.CHANNEL_CLOSED;
            switch (this._messageState) {
                case INIT: {
                    this._log.warn("got closed channel before connecting");
                    this._messageState = MessageState.CONNECT_FAIL;
                    this.informConnectListener(e.getChannel(), new ClosedChannelException());
                    break;
                }
                case REQUEST_WAIT: {
                    this._messageState = MessageState.CLOSED;
                    break;
                }
                case REQUEST_START: {
                    this._log.warn("got closed channel before sending request");
                    this._messageState = MessageState.REQUEST_FAILURE;
                    this.informRequestListener(this._httpRequest, new ClosedChannelException());
                    break;
                }
                case REQUEST_SENT: 
                case RESPONSE_START: 
                case WAIT_FOR_CHUNK: {
                    this._log.error("got closed channel while waiting for response");
                    this._messageState = MessageState.RESPONSE_FAILURE;
                    if (this._responseProcessor == null) break;
                    this._responseProcessor.channelException(new ClosedChannelException());
                    break;
                }
                default: {
                    this._log.warn("closeChannel is called in unexpected state:" + (Object)((Object)this._messageState));
                }
            }
            this._messageState = MessageState.CLOSED;
            this._channelState = ChannelState.CHANNEL_EXCEPTION;
        }
        this.informCloseListener();
        super.channelClosed(ctx, e);
    }

    public String toString() {
        return "GenericHttpResponseHandler [_keepAlive=" + (Object)((Object)this._keepAlive) + ", _messageState=" + (Object)((Object)this._messageState) + ", _channelState=" + (Object)((Object)this._channelState) + "]";
    }

    public synchronized String getHeader(String headerName) {
        String result = null;
        if (null != this._httpResponse && null == (result = this._httpResponse.getHeader(headerName)) && null != this._httpTrailer) {
            result = this._httpTrailer.getHeader(headerName);
        }
        return result;
    }

    public StateLogger getLog() {
        return this._log;
    }

    public class StateLogger {
        private final Logger _log;

        public StateLogger(Logger l) {
            this._log = l;
        }

        protected StringBuilder setPrefix() {
            StringBuilder sb = new StringBuilder();
            sb.append("<").append(((Object)((Object)GenericHttpResponseHandler.this)).hashCode());
            sb.append("_").append((Object)GenericHttpResponseHandler.this._messageState).append(">");
            return sb;
        }

        public void info(String msg) {
            this.info(msg, null);
        }

        public void debug(String msg) {
            this.debug(msg, null);
        }

        public void warn(String msg) {
            this.warn(msg, null);
        }

        public void error(String msg) {
            this.error(msg, null);
        }

        public void info(String msg, Throwable e) {
            if (this.isDebugEnabled()) {
                msg = this.setPrefix().append(msg).toString();
            }
            DbusPrettyLogUtils.logExceptionAtInfo((String)msg, (Throwable)e, (Logger)this._log);
        }

        public void debug(String msg, Throwable e) {
            msg = this.setPrefix().append(msg).toString();
            DbusPrettyLogUtils.logExceptionAtDebug((String)msg, (Throwable)e, (Logger)this._log);
        }

        public void warn(String msg, Throwable e) {
            msg = this.setPrefix().append(msg).toString();
            DbusPrettyLogUtils.logExceptionAtWarn((String)msg, (Throwable)e, (Logger)this._log);
        }

        public void error(String msg, Throwable e) {
            msg = this.setPrefix().append(msg).toString();
            DbusPrettyLogUtils.logExceptionAtError((String)msg, (Throwable)e, (Logger)this._log);
        }

        public boolean isDebugEnabled() {
            return this._log.isDebugEnabled();
        }

        public void setLevel(Level l) {
            this._log.setLevel(l);
        }
    }

    public static enum ChannelState {
        CHANNEL_ACTIVE,
        CHANNEL_EXCEPTION,
        CHANNEL_CLOSED;

    }

    public static enum MessageState {
        INIT,
        CONNECTED,
        CONNECT_FAIL,
        REQUEST_WAIT,
        REQUEST_START,
        REQUEST_SENT,
        REQUEST_FAILURE,
        RESPONSE_START,
        RESPONSE_FINISH,
        RESPONSE_FAILURE,
        WAIT_FOR_CHUNK,
        CLOSED;


        public boolean hasSentRequest() {
            return !this.equals((Object)INIT) && !this.equals((Object)CONNECTED) && !this.equals((Object)REQUEST_WAIT) && !this.equals((Object)REQUEST_START) && !this.equals((Object)REQUEST_FAILURE);
        }

        public boolean waitForResponse() {
            return this.equals((Object)REQUEST_SENT) || this.equals((Object)RESPONSE_START) || this.equals((Object)WAIT_FOR_CHUNK);
        }
    }

    public static enum KeepAliveType {
        KEEP_ALIVE,
        NO_KEEP_ALIVE;

    }
}

