/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.CouchbaseConnectionFactory;
import net.spy.memcached.CouchbaseNode;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.couch.AsyncConnectionManager;
import net.spy.memcached.protocol.couch.HttpOperation;
import net.spy.memcached.vbucket.Reconfigurable;
import net.spy.memcached.vbucket.config.Bucket;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.nio.protocol.EventListener;
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.DirectByteBufferAllocator;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;

public final class CouchbaseConnection
extends SpyThread
implements Reconfigurable {
    private static final int NUM_CONNS = 1;
    private volatile boolean shutDown = false;
    protected volatile boolean reconfiguring = false;
    protected volatile boolean running = true;
    private final CouchbaseConnectionFactory connFactory;
    private final ConcurrentLinkedQueue<CouchbaseNode> nodesToShutdown;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue<ConnectionObserver>();
    private List<CouchbaseNode> nodes;
    private int nextNode;

    public CouchbaseConnection(CouchbaseConnectionFactory cf, List<InetSocketAddress> addrs, Collection<ConnectionObserver> obs) throws IOException {
        this.connFactory = cf;
        this.nodesToShutdown = new ConcurrentLinkedQueue();
        this.connObservers.addAll(obs);
        this.nodes = this.createConnections(addrs);
        this.nextNode = 0;
        this.start();
    }

    private List<CouchbaseNode> createConnections(List<InetSocketAddress> addrs) throws IOException {
        LinkedList<CouchbaseNode> nodeList = new LinkedList<CouchbaseNode>();
        for (InetSocketAddress a : addrs) {
            SyncBasicHttpParams params = new SyncBasicHttpParams();
            params.setIntParameter("http.socket.timeout", 5000).setIntParameter("http.connection.timeout", 5000).setIntParameter("http.socket.buffer-size", 8192).setBooleanParameter("http.connection.stalecheck", false).setBooleanParameter("http.tcp.nodelay", true).setParameter("http.useragent", (Object)"Spymemcached Client/1.1");
            ImmutableHttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[]{new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()});
            AsyncNHttpClientHandler protocolHandler = new AsyncNHttpClientHandler((HttpProcessor)httpproc, (NHttpRequestExecutionHandler)new CouchbaseNode.MyHttpRequestExecutionHandler(), (ConnectionReuseStrategy)new DefaultConnectionReuseStrategy(), (ByteBufferAllocator)new DirectByteBufferAllocator(), (HttpParams)params);
            protocolHandler.setEventListener((EventListener)new CouchbaseNode.EventLogger());
            AsyncConnectionManager connMgr = new AsyncConnectionManager(new HttpHost(a.getHostName(), a.getPort()), 1, (NHttpClientHandler)protocolHandler, (HttpParams)params);
            this.getLogger().info("Added %s to connect queue", a);
            CouchbaseNode node = this.connFactory.createCouchDBNode(a, connMgr);
            node.init();
            nodeList.add(node);
        }
        return nodeList;
    }

    public void addOp(HttpOperation op) {
        this.nodes.get(this.getNextNode()).addOp(op);
    }

    public void handleIO() {
        for (CouchbaseNode node : this.nodes) {
            node.doWrites();
        }
        for (CouchbaseNode qa : this.nodesToShutdown) {
            this.nodesToShutdown.remove(qa);
            Collection<HttpOperation> notCompletedOperations = qa.destroyWriteQueue();
            try {
                qa.shutdown();
            }
            catch (IOException e) {
                this.getLogger().error("Error shutting down connection to " + qa.getSocketAddress());
            }
            this.redistributeOperations(notCompletedOperations);
        }
    }

    private void redistributeOperations(Collection<HttpOperation> ops) {
        int added = 0;
        for (HttpOperation op : ops) {
            this.addOp(op);
            ++added;
        }
        assert (added > 0) : "Didn't add any new operations when redistributing";
    }

    private int getNextNode() {
        ++this.nextNode;
        return this.nextNode %= this.nodes.size();
    }

    protected void checkState() {
        if (this.shutDown) {
            throw new IllegalStateException("Shutting down");
        }
        assert (this.isAlive()) : "IO Thread is not running.";
    }

    public boolean shutdown() throws IOException {
        if (this.shutDown) {
            this.getLogger().info("Suppressing duplicate attempt to shut down");
            return false;
        }
        this.shutDown = true;
        this.running = false;
        for (CouchbaseNode n : this.nodes) {
            if (n == null) continue;
            n.shutdown();
            if (!n.hasWriteOps()) continue;
            this.getLogger().warn("Shutting down with ops waiting to be written");
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(Bucket bucket) {
        this.reconfiguring = true;
        try {
            List<String> servers = bucket.getConfig().getServers();
            HashSet<InetSocketAddress> newServerAddresses = new HashSet<InetSocketAddress>();
            ArrayList<InetSocketAddress> newServers = new ArrayList<InetSocketAddress>();
            for (String server : servers) {
                int finalColon = server.lastIndexOf(58);
                if (finalColon < 1) {
                    throw new IllegalArgumentException("Invalid server ``" + server + "'' in vbucket's server list");
                }
                String hostPart = server.substring(0, finalColon);
                InetSocketAddress address = new InetSocketAddress(hostPart, Integer.parseInt("5984"));
                newServerAddresses.add(address);
                newServers.add(address);
            }
            ArrayList<CouchbaseNode> oddNodes = new ArrayList<CouchbaseNode>();
            ArrayList<CouchbaseNode> stayNodes = new ArrayList<CouchbaseNode>();
            ArrayList<InetSocketAddress> stayServers = new ArrayList<InetSocketAddress>();
            for (CouchbaseNode current : this.nodes) {
                if (newServerAddresses.contains(current.getSocketAddress())) {
                    stayNodes.add(current);
                    stayServers.add(current.getSocketAddress());
                    continue;
                }
                oddNodes.add(current);
            }
            newServers.removeAll(stayServers);
            List<CouchbaseNode> newNodes = this.createConnections(newServers);
            ArrayList<CouchbaseNode> mergedNodes = new ArrayList<CouchbaseNode>();
            mergedNodes.addAll(stayNodes);
            mergedNodes.addAll(newNodes);
            this.nodes = mergedNodes;
            this.nodesToShutdown.addAll(oddNodes);
        }
        catch (IOException e) {
            this.getLogger().error((Object)"Connection reconfiguration failed", e);
        }
        finally {
            this.reconfiguring = false;
        }
    }

    @Override
    public void run() {
        while (this.running) {
            if (this.reconfiguring) continue;
            try {
                this.handleIO();
            }
            catch (Exception e) {
                this.logRunException(e);
            }
        }
        this.getLogger().info("Shut down memcached client");
    }

    private void logRunException(Exception e) {
        if (this.shutDown) {
            this.getLogger().debug((Object)"Exception occurred during shutdown", e);
        } else {
            this.getLogger().warn((Object)"Problem handling memcached IO", e);
        }
    }
}

