/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import net.spy.SpyObject;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public final class MemcachedConnection
extends SpyObject {
    private static final int DOUBLE_CHECK_EMPTY = 256;
    private static final int EXCESSIVE_EMPTY = 0x1000000;
    private static final long MAX_DELAY = 30000L;
    private volatile boolean shutDown = false;
    private boolean optimizeGets = true;
    private Selector selector = null;
    private final NodeLocator locator;
    private int emptySelects = 0;
    private final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
    private final SortedMap<Long, MemcachedNode> reconnectQueue = new TreeMap<Long, MemcachedNode>();

    public MemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> a) throws IOException {
        this.addedQueue = new ConcurrentLinkedQueue();
        this.selector = Selector.open();
        ArrayList<MemcachedNode> connections = new ArrayList<MemcachedNode>(a.size());
        for (InetSocketAddress sa : a) {
            SocketChannel ch = SocketChannel.open();
            ch.configureBlocking(false);
            MemcachedNode qa = f.createMemcachedNode(sa, ch, bufSize);
            int ops = 0;
            if (ch.connect(sa)) {
                this.getLogger().info("Connected to %s immediately", qa);
                qa.connected();
                assert (ch.isConnected());
            } else {
                this.getLogger().info("Added %s to connect queue", qa);
                ops = 8;
            }
            qa.setSk(ch.register(this.selector, ops, qa));
            assert (ch.isConnected() || qa.getSk().interestOps() == 8) : "Not connected, and not wanting to connect";
            connections.add(qa);
        }
        this.locator = f.createLocator(connections);
    }

    public void setGetOptimization(boolean to) {
        this.optimizeGets = to;
    }

    private boolean selectorsMakeSense() {
        for (MemcachedNode qa : this.locator.getAll()) {
            int sops;
            if (qa.getSk() == null || !qa.getSk().isValid()) continue;
            if (qa.getChannel().isConnected()) {
                sops = qa.getSk().interestOps();
                int expected = 0;
                if (qa.hasReadOp()) {
                    expected |= 1;
                }
                if (qa.hasWriteOp()) {
                    expected |= 4;
                }
                if (qa.getBytesRemainingToWrite() > 0) {
                    expected |= 4;
                }
                assert (sops == expected) : "Invalid ops:  " + qa + ", expected " + expected + ", got " + sops;
                continue;
            }
            sops = qa.getSk().interestOps();
            assert (sops == 8) : "Not connected, and not watching for connect: " + sops;
        }
        this.getLogger().debug("Checked the selectors.");
        return true;
    }

    public void handleIO() throws IOException {
        if (this.shutDown) {
            throw new IOException("No IO while shut down");
        }
        this.handleInputQueue();
        this.getLogger().debug("Done dealing with queue.");
        long delay = 0L;
        if (!this.reconnectQueue.isEmpty()) {
            long now = System.currentTimeMillis();
            long then = this.reconnectQueue.firstKey();
            delay = Math.max(then - now, 1L);
        }
        this.getLogger().debug("Selecting with delay of %sms", delay);
        assert (this.selectorsMakeSense()) : "Selectors don't make sense.";
        int selected = this.selector.select(delay);
        Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
        if (selectedKeys.isEmpty() && !this.shutDown) {
            this.getLogger().debug("No selectors ready, interrupted: " + Thread.interrupted());
            if (++this.emptySelects > 256) {
                for (SelectionKey sk : this.selector.keys()) {
                    this.getLogger().info("%s has %s, interested in %s", sk, sk.readyOps(), sk.interestOps());
                    if (sk.readyOps() != 0) {
                        this.getLogger().info("%s has a ready op, handling IO", sk);
                        this.handleIO(sk);
                        continue;
                    }
                    this.queueReconnect((MemcachedNode)sk.attachment());
                }
                assert (this.emptySelects < 0x1000000) : "Too many empty selects";
            }
        } else {
            this.getLogger().debug("Selected %d, selected %d keys", selected, selectedKeys.size());
            this.emptySelects = 0;
            for (SelectionKey sk : selectedKeys) {
                this.handleIO(sk);
            }
            selectedKeys.clear();
        }
        if (!this.shutDown && !this.reconnectQueue.isEmpty()) {
            this.attemptReconnects();
        }
    }

    private void handleInputQueue() {
        if (!this.addedQueue.isEmpty()) {
            this.getLogger().debug("Handling queue");
            HashSet<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
            HashSet<MemcachedNode> todo = new HashSet<MemcachedNode>();
            try {
                MemcachedNode qa = null;
                while ((qa = (MemcachedNode)this.addedQueue.remove()) != null) {
                    todo.add(qa);
                }
            }
            catch (NoSuchElementException e) {
                // empty catch block
            }
            for (MemcachedNode qa : todo) {
                boolean readyForIO = false;
                if (qa.isActive()) {
                    if (qa.getCurrentWriteOp() != null) {
                        readyForIO = true;
                        this.getLogger().debug("Handling queued write %s", qa);
                    }
                } else {
                    toAdd.add(qa);
                }
                qa.copyInputQueue();
                if (readyForIO) {
                    try {
                        if (qa.getWbuf().hasRemaining()) {
                            this.handleWrites(qa.getSk(), qa);
                        }
                    }
                    catch (IOException e) {
                        this.getLogger().warn((Object)"Exception handling write", e);
                        this.queueReconnect(qa);
                    }
                }
                qa.fixupOps();
            }
            this.addedQueue.addAll(toAdd);
        }
    }

    private void handleIO(SelectionKey sk) {
        MemcachedNode qa = (MemcachedNode)sk.attachment();
        try {
            this.getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", sk, sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.attachment());
            if (sk.isConnectable()) {
                this.getLogger().info("Connection state changed for %s", sk);
                SocketChannel channel = qa.getChannel();
                if (channel.finishConnect()) {
                    assert (channel.isConnected()) : "Not connected.";
                    qa.connected();
                    this.addedQueue.offer(qa);
                    if (qa.getWbuf().hasRemaining()) {
                        this.handleWrites(sk, qa);
                    }
                } else assert (!channel.isConnected()) : "connected";
            } else {
                if (sk.isWritable()) {
                    this.handleWrites(sk, qa);
                }
                if (sk.isReadable()) {
                    this.handleReads(sk, qa);
                }
            }
        }
        catch (Exception e) {
            this.getLogger().info("Reconnecting due to exception on %s", qa, e);
            this.queueReconnect(qa);
        }
        qa.fixupOps();
    }

    private void handleWrites(SelectionKey sk, MemcachedNode qa) throws IOException {
        boolean canWriteMore;
        qa.fillWriteBuffer(this.optimizeGets);
        boolean bl = canWriteMore = qa.getBytesRemainingToWrite() > 0;
        while (canWriteMore) {
            int wrote = qa.writeSome();
            qa.fillWriteBuffer(this.optimizeGets);
            canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0;
        }
    }

    private void handleReads(SelectionKey sk, MemcachedNode qa) throws IOException {
        Operation currentOp = qa.getCurrentReadOp();
        ByteBuffer rbuf = qa.getRbuf();
        SocketChannel channel = qa.getChannel();
        int read = channel.read(rbuf);
        while (read > 0) {
            this.getLogger().debug("Read %d bytes", read);
            rbuf.flip();
            while (rbuf.remaining() > 0) {
                assert (currentOp != null) : "No read operation";
                currentOp.readFromBuffer(rbuf);
                if (currentOp.getState() != OperationState.COMPLETE) continue;
                this.getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining());
                Operation op = qa.removeCurrentReadOp();
                assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
                currentOp = qa.getCurrentReadOp();
            }
            rbuf.clear();
            read = channel.read(rbuf);
        }
    }

    static String dbgBuffer(ByteBuffer b, int size) {
        StringBuilder sb = new StringBuilder();
        byte[] bytes = b.array();
        for (int i = 0; i < size; ++i) {
            char ch = (char)bytes[i];
            if (Character.isWhitespace(ch) || Character.isLetterOrDigit(ch)) {
                sb.append(ch);
                continue;
            }
            sb.append("\\x");
            sb.append(Integer.toHexString(bytes[i] & 0xFF));
        }
        return sb.toString();
    }

    private void queueReconnect(MemcachedNode qa) {
        if (!this.shutDown) {
            this.getLogger().warn("Closing, and reopening %s, attempt %d.", qa, qa.getReconnectCount());
            if (qa.getSk() != null) {
                qa.getSk().cancel();
                assert (!qa.getSk().isValid()) : "Cancelled selection key is valid";
            }
            qa.reconnecting();
            try {
                if (qa.getChannel() != null && qa.getChannel().socket() != null) {
                    qa.getChannel().socket().close();
                } else {
                    this.getLogger().info("The channel or socket was null for %s", qa);
                }
            }
            catch (IOException e) {
                this.getLogger().warn((Object)"IOException trying to close a socket", e);
            }
            qa.setChannel(null);
            long delay = Math.min((long)(100 * qa.getReconnectCount() ^ 2), 30000L);
            this.reconnectQueue.put(System.currentTimeMillis() + delay, qa);
            qa.setupResend();
        }
    }

    private void attemptReconnects() throws IOException {
        long now = System.currentTimeMillis();
        IdentityHashMap<MemcachedNode, Boolean> seen = new IdentityHashMap<MemcachedNode, Boolean>();
        Iterator<MemcachedNode> i = this.reconnectQueue.headMap(now).values().iterator();
        while (i.hasNext()) {
            MemcachedNode qa = i.next();
            i.remove();
            if (!seen.containsKey(qa)) {
                seen.put(qa, Boolean.TRUE);
                this.getLogger().info("Reconnecting %s", qa);
                SocketChannel ch = SocketChannel.open();
                ch.configureBlocking(false);
                int ops = 0;
                if (ch.connect(qa.getSocketAddress())) {
                    this.getLogger().info("Immediately reconnected to %s", qa);
                    assert (ch.isConnected());
                } else {
                    ops = 8;
                }
                qa.registerChannel(ch, ch.register(this.selector, ops, qa));
                assert (qa.getChannel() == ch) : "Channel was lost.";
                continue;
            }
            this.getLogger().debug("Skipping duplicate reconnect request for %s", qa);
        }
    }

    NodeLocator getLocator() {
        return this.locator;
    }

    public void addOperation(String key, Operation o) {
        MemcachedNode placeIn = null;
        MemcachedNode primary = this.locator.getPrimary(key);
        if (primary.isActive()) {
            placeIn = primary;
        } else {
            Iterator<MemcachedNode> i = this.locator.getSequence(key);
            while (placeIn == null && i.hasNext()) {
                MemcachedNode n = i.next();
                if (!n.isActive()) continue;
                placeIn = n;
            }
            if (placeIn == null) {
                placeIn = primary;
            }
        }
        assert (placeIn != null) : "No node found for key " + key;
        this.addOperation(placeIn, o);
    }

    public void addOperation(MemcachedNode node, Operation o) {
        o.initialize();
        node.addOp(o);
        this.addedQueue.offer(node);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    public void addOperations(Map<MemcachedNode, Operation> ops) {
        for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
            MemcachedNode node = me.getKey();
            Operation o = me.getValue();
            o.initialize();
            node.addOp(o);
            this.addedQueue.offer(node);
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of) {
        CountDownLatch latch = new CountDownLatch(this.locator.getAll().size());
        for (MemcachedNode node : this.locator.getAll()) {
            Operation op = of.newOp(node, latch);
            op.initialize();
            node.addOp(op);
            this.addedQueue.offer(node);
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        return latch;
    }

    public void shutdown() throws IOException {
        this.shutDown = true;
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        for (MemcachedNode qa : this.locator.getAll()) {
            if (qa.getChannel() == null) continue;
            qa.getChannel().close();
            qa.setSk(null);
            if (qa.getBytesRemainingToWrite() > 0) {
                this.getLogger().warn("Shut down with %d bytes remaining to write", qa.getBytesRemainingToWrite());
            }
            this.getLogger().debug("Shut down channel %s", qa.getChannel());
        }
        this.selector.close();
        this.getLogger().debug("Shut down selector %s", this.selector);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{MemcachedConnection to");
        for (MemcachedNode qa : this.locator.getAll()) {
            sb.append(" ");
            sb.append(qa.getSocketAddress());
        }
        sb.append("}");
        return sb.toString();
    }
}

