/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.DatabusBootstrapConnection;
import com.linkedin.databus.client.DatabusBootstrapConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.BootstrapStartScnHttpResponseProcessor;
import com.linkedin.databus.client.netty.BootstrapTargetScnHttpResponseProcessor;
import com.linkedin.databus.client.netty.GenericHttpResponseHandler;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.client.netty.StreamHttpResponseProcessor;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import com.linkedin.databus2.core.container.monitoring.mbean.ContainerStatisticsCollector;
import com.linkedin.databus2.core.filter.DbusKeyFilter;
import java.util.Formatter;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.util.Timer;

public class NettyHttpDatabusBootstrapConnection
extends AbstractNettyHttpConnection
implements DatabusBootstrapConnection {
    public static final String MODULE = NettyHttpDatabusBootstrapConnection.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final ActorMessageQueue _callback;
    private DatabusBootstrapConnectionStateMessage _callbackStateReuse;
    private ExtendedReadTimeoutHandler _readTimeOutHandler;
    private State _curState;
    private Checkpoint _checkpoint;
    private String _sourcesIdList;
    private String _sourcesNameList;
    private int _freeBufferSpace;
    private final RemoteExceptionHandler _remoteExceptionHandler;
    private DbusKeyFilter _filter;
    private GenericHttpResponseHandler _handler;

    public NettyHttpDatabusBootstrapConnection(ServerInfo server, ActorMessageQueue callback, ClientBootstrap bootstrap, ContainerStatisticsCollector containerStatsCollector, RemoteExceptionHandler remoteExceptionHandler, Timer timeoutTimer, long writeTimeoutMs, long readTimeoutMs, int protocolVersion, ChannelGroup channelGroup) {
        super(server, bootstrap, containerStatsCollector, timeoutTimer, writeTimeoutMs, readTimeoutMs, channelGroup, protocolVersion, LOG);
        this._callback = callback;
        this._remoteExceptionHandler = remoteExceptionHandler;
    }

    public NettyHttpDatabusBootstrapConnection(ServerInfo relay, ActorMessageQueue callback, ChannelFactory channelFactory, ContainerStatisticsCollector containerStatsCollector, RemoteExceptionHandler remoteExceptionHandler, Timer timeoutTimer, long writeTimeoutMs, long readTimeoutMs, int protocolVersion, ChannelGroup channelGroup) {
        this(relay, callback, new ClientBootstrap(channelFactory), containerStatsCollector, remoteExceptionHandler, timeoutTimer, writeTimeoutMs, readTimeoutMs, protocolVersion, channelGroup);
    }

    @Override
    public void requestTargetScn(Checkpoint checkpoint, DatabusBootstrapConnectionStateMessage stateReuse) {
        this._checkpoint = checkpoint;
        this._callbackStateReuse = stateReuse;
        this._handler = null;
        if (!this.hasConnection()) {
            this.connect(State.TARGET_SCN_REQUEST_CONNECT);
        } else {
            this.onTargetScnConnectSuccess();
        }
    }

    void onTargetScnConnectSuccess() {
        this._curState = State.TARGET_SCN_REQUEST_WRITE;
        ChannelPipeline channelPipeline = this._channel.getPipeline();
        this._readTimeOutHandler = (ExtendedReadTimeoutHandler)channelPipeline.get("client request read timeout handler");
        this._readTimeOutHandler.start(channelPipeline.getContext((ChannelHandler)this._readTimeOutHandler));
        BootstrapTargetScnHttpResponseProcessor targetResponseProcessor = new BootstrapTargetScnHttpResponseProcessor(this, this._callback, this._callbackStateReuse, this._checkpoint, this._remoteExceptionHandler, this._readTimeOutHandler);
        String url = this.createTargetScnRequestUrl();
        LOG.info((Object)("Sending " + url));
        HttpRequest request = this.createEmptyRequest(url);
        this.sendRequest(request, new TargetScnRequestResultListener(), targetResponseProcessor);
    }

    private String createTargetScnRequestUrl() {
        return String.format("/targetSCN?source=%s&checkPoint=%s", this._checkpoint.getSnapshotSource(), this._checkpoint.toString());
    }

    @Override
    public void requestStartScn(Checkpoint checkpoint, DatabusBootstrapConnectionStateMessage stateReuse, String sourceNamesList) {
        this._checkpoint = checkpoint;
        this._callbackStateReuse = stateReuse;
        this._sourcesNameList = sourceNamesList;
        this._handler = null;
        if (!this.hasConnection()) {
            this.connect(State.START_SCN_REQUEST_CONNECT);
        } else {
            this.onStartScnConnectSuccess();
        }
    }

    void onStartScnConnectSuccess() {
        this._curState = State.START_SCN_REQUEST_WRITE;
        ChannelPipeline channelPipeline = this._channel.getPipeline();
        this._readTimeOutHandler = (ExtendedReadTimeoutHandler)channelPipeline.get("client request read timeout handler");
        this._readTimeOutHandler.start(channelPipeline.getContext((ChannelHandler)this._readTimeOutHandler));
        BootstrapStartScnHttpResponseProcessor sourcesResponseProcessor = new BootstrapStartScnHttpResponseProcessor(this, this._callback, this._callbackStateReuse, this._checkpoint, this._remoteExceptionHandler, this._readTimeOutHandler);
        String url = this.createStartScnRequestUrl();
        LOG.info((Object)("Sending " + url));
        HttpRequest request = this.createEmptyRequest(url);
        this.sendRequest(request, new StartScnRequestResultListener(), sourcesResponseProcessor);
    }

    private String createStartScnRequestUrl() {
        return String.format("/startSCN?sources=%s&checkPoint=%s", this._sourcesNameList, this._checkpoint.toString());
    }

    @Override
    public void requestStream(String sourcesIdList, DbusKeyFilter filter, int freeBufferSpace, Checkpoint cp, DatabusBootstrapConnectionStateMessage stateReuse) {
        this._checkpoint = cp;
        this._callbackStateReuse = stateReuse;
        this._sourcesIdList = sourcesIdList;
        this._freeBufferSpace = freeBufferSpace;
        this._filter = filter;
        this._handler = null;
        if (!this.hasConnection()) {
            this.connect(State.STREAM_REQUEST_CONNECT);
        } else {
            this.onStreamConnectSuccess();
        }
    }

    void onStreamConnectSuccess() {
        this._curState = State.STREAM_REQUEST_WRITE;
        ChannelPipeline channelPipeline = this._channel.getPipeline();
        this._readTimeOutHandler = (ExtendedReadTimeoutHandler)channelPipeline.get("client request read timeout handler");
        this._readTimeOutHandler.start(channelPipeline.getContext((ChannelHandler)this._readTimeOutHandler));
        StreamHttpResponseProcessor streamResponseProcessor = new StreamHttpResponseProcessor(this, this._callback, this._callbackStateReuse, this._readTimeOutHandler);
        StringBuilder uriString = new StringBuilder(10240);
        boolean error = this.populateBootstrapRequestUrl(uriString);
        if (error) {
            return;
        }
        String url = uriString.toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Sending " + url));
        }
        HttpRequest request = this.createEmptyRequest(url);
        this.sendRequest(request, new BootstrapRequestResultListener(), streamResponseProcessor);
    }

    private boolean populateBootstrapRequestUrl(StringBuilder uriString) {
        boolean error = false;
        ObjectMapper objMapper = new ObjectMapper();
        String filterStr = null;
        if (null != this._filter) {
            try {
                filterStr = objMapper.writeValueAsString((Object)this._filter);
            }
            catch (Exception ex) {
                LOG.error((Object)("Got exception while serializing filter. Filter was : " + this._filter), (Throwable)ex);
                error = true;
                this.onRequestFailure(uriString.toString(), (Throwable)ex);
            }
        }
        Formatter uriFmt = new Formatter(uriString);
        if (null != filterStr) {
            uriFmt.format("/bootstrap?sources=%s&checkPoint=%s&output=binary&batchSize=%d&filter=%s", this._sourcesIdList, this._checkpoint.toString(), this._freeBufferSpace, filterStr);
        } else {
            uriFmt.format("/bootstrap?sources=%s&checkPoint=%s&output=binary&batchSize=%d", this._sourcesIdList, this._checkpoint.toString(), this._freeBufferSpace);
        }
        uriFmt.close();
        return error;
    }

    private void connect(State connectState) {
        this._curState = connectState;
        this.connectWithListener(new MyConnectListener());
    }

    private void onConnectSuccess(Channel channel) {
        switch (this._curState) {
            case START_SCN_REQUEST_CONNECT: {
                this.onStartScnConnectSuccess();
                break;
            }
            case TARGET_SCN_REQUEST_CONNECT: {
                this.onTargetScnConnectSuccess();
                break;
            }
            case STREAM_REQUEST_CONNECT: {
                this.onStreamConnectSuccess();
                break;
            }
            default: {
                throw new RuntimeException("don't know what to do in state:" + (Object)((Object)this._curState));
            }
        }
    }

    @Override
    protected GenericHttpResponseHandler getHandler() {
        return this._handler;
    }

    private void onRequestFailure(HttpRequest req, Throwable cause) {
        this.onRequestFailure(null == req ? (String)null : req.getUri(), cause);
    }

    private void onRequestFailure(String req, Throwable cause) {
        LOG.info((Object)("request failure: req=" + req + " cause=" + cause));
        if (this.shouldIgnoreWriteTimeoutException(cause)) {
            LOG.error((Object)"got RequestFailure because of WriteTimeoutException");
            return;
        }
        switch (this._curState) {
            case START_SCN_REQUEST_CONNECT: 
            case START_SCN_REQUEST_WRITE: {
                this._callbackStateReuse.switchToStartScnRequestError();
                break;
            }
            case TARGET_SCN_REQUEST_CONNECT: 
            case TARGET_SCN_REQUEST_WRITE: {
                this._callbackStateReuse.switchToTargetScnRequestError();
                break;
            }
            case STREAM_REQUEST_CONNECT: 
            case STREAM_REQUEST_WRITE: {
                this._callbackStateReuse.switchToStreamRequestError();
                break;
            }
            default: {
                throw new RuntimeException("don't know what to do in state:" + (Object)((Object)this._curState));
            }
        }
        this._callback.enqueueMessage((Object)this._callbackStateReuse);
    }

    private class BootstrapRequestResultListener
    implements AbstractNettyHttpConnection.SendRequestResultListener {
        private BootstrapRequestResultListener() {
        }

        @Override
        public void onSendRequestSuccess(HttpRequest req) {
        }

        @Override
        public void onSendRequestFailure(HttpRequest req, Throwable cause) {
            NettyHttpDatabusBootstrapConnection.this.onRequestFailure(req, cause);
        }
    }

    private class TargetScnRequestResultListener
    implements AbstractNettyHttpConnection.SendRequestResultListener {
        private TargetScnRequestResultListener() {
        }

        @Override
        public void onSendRequestSuccess(HttpRequest req) {
        }

        @Override
        public void onSendRequestFailure(HttpRequest req, Throwable cause) {
            NettyHttpDatabusBootstrapConnection.this.onRequestFailure(req, cause);
        }
    }

    private class StartScnRequestResultListener
    implements AbstractNettyHttpConnection.SendRequestResultListener {
        private StartScnRequestResultListener() {
        }

        @Override
        public void onSendRequestSuccess(HttpRequest req) {
        }

        @Override
        public void onSendRequestFailure(HttpRequest req, Throwable cause) {
            NettyHttpDatabusBootstrapConnection.this.onRequestFailure(req, cause);
        }
    }

    private class MyConnectListener
    implements AbstractNettyHttpConnection.ConnectResultListener {
        private MyConnectListener() {
        }

        @Override
        public void onConnectSuccess(Channel channel) {
            NettyHttpDatabusBootstrapConnection.this.onConnectSuccess(channel);
        }

        @Override
        public void onConnectFailure(Throwable cause) {
            NettyHttpDatabusBootstrapConnection.this.onRequestFailure(null, cause);
        }
    }

    private static enum State {
        TARGET_SCN_REQUEST_CONNECT,
        TARGET_SCN_REQUEST_WRITE,
        START_SCN_REQUEST_CONNECT,
        START_SCN_REQUEST_WRITE,
        STREAM_REQUEST_CONNECT,
        STREAM_REQUEST_WRITE;

    }
}

