/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.DatabusRelayConnection;
import com.linkedin.databus.client.DatabusRelayConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.RegisterHttpResponseProcessor;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.client.netty.SourcesHttpResponseProcessor;
import com.linkedin.databus.client.netty.StreamHttpResponseProcessor;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.core.CheckpointMult;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus.core.data_model.PhysicalPartition;
import com.linkedin.databus.core.util.Range;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import com.linkedin.databus2.core.container.monitoring.mbean.ContainerStatisticsCollector;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilter;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Formatter;
import java.util.Map;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.util.Timer;

public class NettyHttpDatabusRelayConnection
extends AbstractNettyHttpConnection
implements DatabusRelayConnection {
    public static final String MODULE = NettyHttpDatabusRelayConnection.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final ActorMessageQueue _callback;
    private DatabusRelayConnectionStateMessage _callbackStateReuse;
    private ExtendedReadTimeoutHandler _readTimeOutHandler;
    private CheckpointMult _checkpoint;
    private State _curState;
    private String _sourcesSubsList;
    private int _freeBufferSpace;
    private DbusKeyCompositeFilter _filter;
    private boolean _enableReadFromLatestSCN = false;
    final int _maxEventVersion;

    public NettyHttpDatabusRelayConnection(ServerInfo relay, ActorMessageQueue callback, ClientBootstrap bootstrap, ContainerStatisticsCollector containerStatsCollector, RemoteExceptionHandler remoteExceptionHandler, Timer timeoutTimer, long writeTimeoutMs, long readTimeoutMs, int protocolVersion, int maxEventVersion, ChannelGroup channelGroup) {
        super(relay, bootstrap, containerStatsCollector, timeoutTimer, writeTimeoutMs, readTimeoutMs, channelGroup, protocolVersion, LOG);
        this._callback = callback;
        this._maxEventVersion = maxEventVersion;
    }

    public NettyHttpDatabusRelayConnection(ServerInfo relay, ActorMessageQueue callback, ChannelFactory channelFactory, ContainerStatisticsCollector containerStatsCollector, RemoteExceptionHandler remoteExceptionHandler, Timer timeoutTimer, long writeTimeoutMs, long readTimeoutMs, int protocolVersion, int maxEventVersion, ChannelGroup channelGroup) {
        this(relay, callback, new ClientBootstrap(channelFactory), containerStatsCollector, remoteExceptionHandler, timeoutTimer, writeTimeoutMs, readTimeoutMs, protocolVersion, maxEventVersion, channelGroup);
    }

    public boolean isEnableReadFromLatestSCN() {
        return this._enableReadFromLatestSCN;
    }

    @Override
    public void requestSources(DatabusRelayConnectionStateMessage stateReuse) {
        this._callbackStateReuse = stateReuse;
        if (!this.hasConnection()) {
            this.connect(State.SOURCES_REQUEST_CONNECT);
        } else {
            this.onSourcesConnectSuccess();
        }
    }

    void onSourcesConnectSuccess() {
        this._curState = State.SOURCES_REQUEST_WRITE;
        ChannelPipeline channelPipeline = this._channel.getPipeline();
        this._readTimeOutHandler = (ExtendedReadTimeoutHandler)channelPipeline.get("client request read timeout handler");
        this._readTimeOutHandler.start(channelPipeline.getContext((ChannelHandler)this._readTimeOutHandler));
        SourcesHttpResponseProcessor<DatabusRelayConnectionStateMessage> sourcesResponseProcessor = new SourcesHttpResponseProcessor<DatabusRelayConnectionStateMessage>(this, this._callback, this._callbackStateReuse, this._readTimeOutHandler);
        String uriString = "/sources?protocolVersion=" + this.getProtocolVersion();
        LOG.info((Object)("Sending " + uriString));
        HttpRequest request = this.createEmptyRequest(uriString);
        this.sendRequest(request, new SourcesRequestResultListener(), sourcesResponseProcessor);
    }

    @Override
    public void requestRegister(String sourcesIdList, DatabusRelayConnectionStateMessage stateReuse) {
        this._sourcesSubsList = sourcesIdList;
        this._callbackStateReuse = stateReuse;
        if (!this.hasConnection()) {
            this._curState = State.REGISTER_REQUEST_CONNECT;
            this.onRequestFailure((String)null, (Throwable)new ClosedChannelException());
        } else {
            this.onRegisterConnectSuccess();
        }
    }

    void onRegisterConnectSuccess() {
        this._curState = State.REGISTER_REQUEST_WRITE;
        ChannelPipeline channelPipeline = this._channel.getPipeline();
        this._readTimeOutHandler = (ExtendedReadTimeoutHandler)channelPipeline.get("client request read timeout handler");
        this._readTimeOutHandler.start(channelPipeline.getContext((ChannelHandler)this._readTimeOutHandler));
        RegisterHttpResponseProcessor registerResponseProcessor = new RegisterHttpResponseProcessor(this, this._callback, this._callbackStateReuse, this._readTimeOutHandler);
        String url = this.createRegisterUrl();
        LOG.info((Object)("Sending " + url));
        HttpRequest request = this.createEmptyRequest(url);
        this.sendRequest(request, new RegisterRequestResultListener(), registerResponseProcessor);
    }

    private String createRegisterUrl() {
        StringBuilder uriString = new StringBuilder(1024);
        uriString.append("/register?").append("protocolVersion").append("=").append(this.getProtocolVersion());
        if (this.getProtocolVersion() < 3) {
            uriString.append("&sources=").append(this._sourcesSubsList);
        }
        String url = uriString.toString();
        return url;
    }

    @Override
    public void requestStream(String sourcesSubsList, DbusKeyCompositeFilter filter, int freeBufferSpace, CheckpointMult cp, Range keyRange, DatabusRelayConnectionStateMessage stateReuse) {
        this._checkpoint = cp;
        this._callbackStateReuse = stateReuse;
        this._sourcesSubsList = sourcesSubsList;
        this._freeBufferSpace = freeBufferSpace;
        this._filter = filter;
        if (!this.hasConnection()) {
            this._curState = State.STREAM_REQUEST_CONNECT;
            this.onRequestFailure("/stream", (Throwable)new ClosedChannelException());
        } else {
            this.onStreamConnectSuccess();
        }
    }

    void formRequest(Formatter formatter, String filtersStr) {
        StringBuilder fmtString = new StringBuilder(1024);
        fmtString.append("/stream?").append("protocolVersion").append("=").append(this.getProtocolVersion()).append(this.getProtocolVersion() >= 3 ? "&subs" : "&sources").append("=%s&streamFromLatestScn=%s&").append(this.getProtocolVersion() >= 3 ? "checkPointMult" : "checkPoint").append("=%s&output=binary&size=%d");
        if (filtersStr != null) {
            fmtString.append("&filters=").append(filtersStr);
        }
        if (this._maxEventVersion > 0) {
            fmtString.append("&").append("maxev").append("=").append(this._maxEventVersion);
        }
        formatter.format(fmtString.toString(), this._sourcesSubsList, Boolean.toString(this._enableReadFromLatestSCN), this.getProtocolVersion() >= 3 ? this._checkpoint.toString() : this._checkpoint.getCheckpoint(PhysicalPartition.ANY_PHYSICAL_PARTITION), this._freeBufferSpace);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("request string for stream (protocolVersion=" + this.getProtocolVersion() + "):" + formatter.toString()));
        }
    }

    void onStreamConnectSuccess() {
        boolean debugEnabled = LOG.isDebugEnabled();
        this._curState = State.STREAM_REQUEST_WRITE;
        ChannelPipeline channelPipeline = this._channel.getPipeline();
        this._readTimeOutHandler = (ExtendedReadTimeoutHandler)channelPipeline.get("client request read timeout handler");
        this._readTimeOutHandler.start(channelPipeline.getContext((ChannelHandler)this._readTimeOutHandler));
        StreamHttpResponseProcessor streamResponseProcessor = new StreamHttpResponseProcessor(this, this._callback, this._callbackStateReuse, this._readTimeOutHandler);
        StringBuilder uriString = new StringBuilder(1024);
        boolean error = this.populateStreamRequestUrl(uriString);
        if (error) {
            return;
        }
        String url = uriString.toString();
        if (debugEnabled) {
            LOG.debug((Object)("Sending " + url));
        }
        HttpRequest request = this.createEmptyRequest(url);
        this.sendRequest(request, new StreamRequestResultListener(), streamResponseProcessor);
    }

    private boolean populateStreamRequestUrl(StringBuilder uriString) {
        ObjectMapper objMapper = new ObjectMapper();
        Formatter uriFmt = new Formatter(uriString);
        String filtersStr = null;
        boolean error = false;
        if (null != this._filter) {
            try {
                Map fMap = this._filter.getFilterMap();
                if (null != fMap && fMap.size() > 0) {
                    filtersStr = objMapper.writeValueAsString((Object)fMap);
                }
            }
            catch (IOException ex) {
                LOG.error((Object)("Got exception while serializing Filters. Filter Map was : " + this._filter), (Throwable)ex);
                error = true;
                this.onRequestFailure(uriString.toString(), (Throwable)ex);
            }
            catch (RuntimeException ex) {
                LOG.error((Object)("Got exception while serializing Filters. Filter Map was : " + this._filter), (Throwable)ex);
                error = true;
                this.onRequestFailure(uriString.toString(), (Throwable)ex);
            }
        }
        this.formRequest(uriFmt, filtersStr);
        return error;
    }

    private void connect(State connectState) {
        this._curState = connectState;
        this.connectWithListener(new MyConnectListener());
    }

    @Override
    public void enableReadFromLatestScn(boolean enable) {
        this._enableReadFromLatestSCN = enable;
    }

    private void onConnectSuccess(Channel channel) {
        switch (this._curState) {
            case SOURCES_REQUEST_CONNECT: {
                this.onSourcesConnectSuccess();
                break;
            }
            case REGISTER_REQUEST_CONNECT: {
                this.onRegisterConnectSuccess();
                break;
            }
            case STREAM_REQUEST_CONNECT: {
                this.onStreamConnectSuccess();
                break;
            }
            default: {
                throw new RuntimeException("don't know what to do in state:" + (Object)((Object)this._curState));
            }
        }
    }

    private void onRequestFailure(HttpRequest req, Throwable cause) {
        this.onRequestFailure(null == req ? (String)null : req.getUri(), cause);
    }

    private void onRequestFailure(String req, Throwable cause) {
        LOG.info((Object)("request failure: req=" + req + "  cause=" + cause));
        if (this.shouldIgnoreWriteTimeoutException(cause)) {
            LOG.error((Object)"got RequestFailure because of WriteTimeoutException");
            return;
        }
        switch (this._curState) {
            case SOURCES_REQUEST_CONNECT: 
            case SOURCES_REQUEST_WRITE: {
                this._callbackStateReuse.switchToSourcesRequestError();
                break;
            }
            case REGISTER_REQUEST_CONNECT: 
            case REGISTER_REQUEST_WRITE: {
                this._callbackStateReuse.switchToRegisterRequestError();
                break;
            }
            case STREAM_REQUEST_CONNECT: 
            case STREAM_REQUEST_WRITE: {
                this._callbackStateReuse.switchToStreamRequestError();
                break;
            }
            default: {
                throw new RuntimeException("don't know what to do in state:" + (Object)((Object)this._curState));
            }
        }
        this._callback.enqueueMessage((Object)this._callbackStateReuse);
    }

    @Override
    public int getMaxEventVersion() {
        return this._maxEventVersion;
    }

    private class StreamRequestResultListener
    implements AbstractNettyHttpConnection.SendRequestResultListener {
        private StreamRequestResultListener() {
        }

        @Override
        public void onSendRequestSuccess(HttpRequest req) {
            NettyHttpDatabusRelayConnection.this._enableReadFromLatestSCN = false;
        }

        @Override
        public void onSendRequestFailure(HttpRequest req, Throwable cause) {
            NettyHttpDatabusRelayConnection.this.onRequestFailure(req, cause);
        }
    }

    private class RegisterRequestResultListener
    implements AbstractNettyHttpConnection.SendRequestResultListener {
        private RegisterRequestResultListener() {
        }

        @Override
        public void onSendRequestSuccess(HttpRequest req) {
            NettyHttpDatabusRelayConnection.this._enableReadFromLatestSCN = false;
        }

        @Override
        public void onSendRequestFailure(HttpRequest req, Throwable cause) {
            NettyHttpDatabusRelayConnection.this.onRequestFailure(req, cause);
        }
    }

    private class SourcesRequestResultListener
    implements AbstractNettyHttpConnection.SendRequestResultListener {
        private SourcesRequestResultListener() {
        }

        @Override
        public void onSendRequestSuccess(HttpRequest req) {
            NettyHttpDatabusRelayConnection.this._enableReadFromLatestSCN = false;
        }

        @Override
        public void onSendRequestFailure(HttpRequest req, Throwable cause) {
            NettyHttpDatabusRelayConnection.this.onRequestFailure(req, cause);
        }
    }

    private class MyConnectListener
    implements AbstractNettyHttpConnection.ConnectResultListener {
        private MyConnectListener() {
        }

        @Override
        public void onConnectSuccess(Channel channel) {
            NettyHttpDatabusRelayConnection.this.onConnectSuccess(channel);
        }

        @Override
        public void onConnectFailure(Throwable cause) {
            NettyHttpDatabusRelayConnection.this.onRequestFailure(null, cause);
        }
    }

    private static enum State {
        SOURCES_REQUEST_CONNECT,
        SOURCES_REQUEST_WRITE,
        REGISTER_REQUEST_CONNECT,
        REGISTER_REQUEST_WRITE,
        STREAM_REQUEST_CONNECT,
        STREAM_REQUEST_WRITE;

    }
}

