/*
 * Decompiled with CFR 0.152.
 */
package starkiller;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import starkiller.Junction;
import starkiller.MsgPack;
import starkiller.TimeoutException;
import starkiller.Util;

public class RemoteJunction<K, V>
implements Junction<K, V>,
Closeable {
    private static final Logger logger = LoggerFactory.getLogger(RemoteJunction.class);
    private final InetSocketAddress address;
    private final AtomicLong messageIdGen = new AtomicLong();
    private final AtomicBoolean running = new AtomicBoolean();
    private final NonBlockingHashMapLong<CompletableFuture> methodCalls = new NonBlockingHashMapLong();
    private final AtomicReference<AsynchronousSocketChannel> socket = new AtomicReference();
    private final MsgPack<K, V> msgpack;
    private final Util.AsyncLock connectLock = new Util.AsyncLock("connect");
    private final Util.AsyncLock writeLock = new Util.AsyncLock("write");
    private CompletableFuture<Void> runningFuture = CompletableFuture.failedFuture(new IllegalStateException("not running"));

    @Override
    public void close() throws IOException {
        this.stop();
        AsynchronousSocketChannel s = this.socket.get();
        if (s != null) {
            s.close();
        }
        this.methodCalls.forEach((id, action) -> action.completeExceptionally(new ClosedChannelException()));
    }

    public RemoteJunction(InetSocketAddress address, Class<? extends V> valueClass, Consumer<ObjectMapper> objectMapperConfigurator) {
        this.address = address;
        ObjectMapper objectMapper = new ObjectMapper((JsonFactory)new MessagePackFactory());
        objectMapperConfigurator.accept(objectMapper);
        this.msgpack = new MsgPack(objectMapper, valueClass);
    }

    public RemoteJunction(InetSocketAddress address, Class<? extends V> valueClass) {
        this(address, valueClass, om -> {});
    }

    public CompletableFuture<Void> start() {
        if (this.running.compareAndSet(false, true)) {
            this.runningFuture = this.readLoop();
            return this.runningFuture;
        }
        return this.runningFuture;
    }

    public void stop() {
        this.running.set(false);
    }

    public boolean isRunning() {
        return !this.runningFuture.isDone();
    }

    CompletableFuture<AsynchronousSocketChannel> ensureSocket() {
        AsynchronousSocketChannel s = this.socket.get();
        if (s != null && s.isOpen()) {
            return CompletableFuture.completedFuture(s);
        }
        return this.connectLock.lock().thenCompose(l -> {
            if (this.socket.get() != null && this.socket.get().isOpen()) {
                return CompletableFuture.completedFuture(this.socket.get());
            }
            try {
                final AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
                final CompletableFuture future = new CompletableFuture();
                socket.connect(this.address, null, new CompletionHandler<Void, Object>(){

                    @Override
                    public void completed(Void result, Object attachment) {
                        RemoteJunction.this.socket.set(socket);
                        future.complete(socket);
                        l.unlock();
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        future.completeExceptionally(exc);
                        l.unlock();
                    }
                });
                return future;
            }
            catch (IOException e) {
                return CompletableFuture.failedFuture(e);
            }
        });
    }

    CompletableFuture<Void> readLoop() {
        logger.trace("readLoop begin");
        return this.ensureSocket().thenCompose(socket -> {
            CompletableFuture<ByteBuffer> readFuture = Util.readMessage(socket);
            CompletionStage dispatchFuture = ((CompletableFuture)readFuture.thenApply(buffer -> {
                try {
                    logger.trace("read message buffer: {}", Util.hex(buffer));
                    Response<V> resp = this.msgpack.unpackResponse((ByteBuffer)buffer);
                    logger.trace("read response {}", resp);
                    CompletableFuture future = (CompletableFuture)this.methodCalls.remove(resp.messageId);
                    if (future != null) {
                        logger.trace("completing call {} {} with {}", new Object[]{resp.messageId, future, resp});
                        if (resp instanceof RecvResponse) {
                            future.complete(((RecvResponse)resp).value);
                        } else if (resp instanceof SendResponse) {
                            future.complete(Boolean.TRUE);
                        } else if (resp instanceof TokenResponse) {
                            future.complete(((TokenResponse)resp).tokens);
                        } else if (resp instanceof TimeoutResponse) {
                            future.completeExceptionally(new TimeoutException());
                        } else {
                            future.completeExceptionally(new IllegalStateException("should never have decoded a " + resp));
                        }
                    } else {
                        logger.warn("no method call resolved for {}, response {}", (Object)resp.messageId, resp);
                    }
                    return this.running.get();
                }
                catch (IOException e) {
                    logger.warn("exception parsing message", (Throwable)e);
                    return false;
                }
            })).exceptionally(exc -> {
                logger.warn("exception reading message", exc);
                if (exc instanceof ClosedChannelException || exc.getCause() instanceof ClosedChannelException) {
                    this.socket.set(null);
                    return true;
                }
                try {
                    this.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                return false;
            });
            return ((CompletableFuture)dispatchFuture).thenComposeAsync(again -> {
                CompletableFuture<Void> nextFuture = again != false ? this.readLoop() : CompletableFuture.completedFuture(null);
                return nextFuture;
            });
        });
    }

    @Override
    public CompletableFuture<Boolean> send(K id, V value, long timeout, TimeUnit timeoutUnit) {
        SendRequest request = new SendRequest();
        request.id = id;
        request.value = value;
        request.timeout = TimeUnit.MILLISECONDS.convert(timeout, timeoutUnit);
        request.messageId = this.messageIdGen.incrementAndGet();
        return this.sendRequest(request);
    }

    @Override
    public CompletableFuture<V> recv(K id, long timeout, TimeUnit timeoutUnit) {
        RecvRequest request = new RecvRequest();
        request.id = id;
        request.timeout = TimeUnit.MILLISECONDS.convert(timeout, timeoutUnit);
        request.messageId = this.messageIdGen.incrementAndGet();
        return this.sendRequest(request);
    }

    public CompletableFuture<List<Long>> tokens() {
        TokenRequest request = new TokenRequest();
        request.messageId = this.messageIdGen.incrementAndGet();
        return this.sendRequest(request);
    }

    <T> CompletableFuture<T> sendRequest(Request<K, V> r) {
        byte[] encoded;
        CompletableFuture future = new CompletableFuture();
        try {
            encoded = this.msgpack.packRequest(r);
        }
        catch (IOException e) {
            future.completeExceptionally(e);
            return future;
        }
        ByteBuffer buffer = ByteBuffer.allocate(encoded.length + 2);
        buffer.putShort((short)encoded.length);
        buffer.put(encoded);
        buffer.flip();
        logger.trace("encoded message {}", Util.hex(buffer));
        this.methodCalls.put(r.messageId, future);
        AtomicReference locked = new AtomicReference();
        CompletionStage writeFuture = this.ensureSocket().thenCompose(socket -> this.writeLock.lock().thenCompose(l -> {
            locked.set(l);
            return Util.writeFully(socket, buffer);
        }));
        ((CompletableFuture)writeFuture).whenComplete((res, exc) -> {
            if (locked.get() != null) {
                ((Util.AsyncLock.Locked)locked.get()).unlock();
            }
        });
        return ((CompletableFuture)writeFuture).thenCompose(x -> future);
    }

    static class TimeoutResponse<V>
    extends Response<V> {
        public TimeoutResponse(long messageId) {
            this.messageId = messageId;
        }
    }

    static class TokenResponse<V>
    extends Response<V> {
        public List<Long> tokens;

        public TokenResponse(long messageId, List<Long> tokens) {
            this.messageId = messageId;
            this.tokens = tokens;
        }
    }

    static class SendResponse<V>
    extends Response<V> {
        public boolean success;

        public SendResponse(long messageId, boolean success) {
            this.messageId = messageId;
            this.success = success;
        }
    }

    static class RecvResponse<V>
    extends Response<V> {
        public V value;

        public RecvResponse(long messageId, V value) {
            this.messageId = messageId;
            this.value = value;
        }
    }

    static class Response<V> {
        public long messageId;

        Response() {
        }
    }

    static class TokenRequest<K, V>
    extends Request<K, V> {
        TokenRequest() {
        }
    }

    static class SendRequest<K, V>
    extends Request<K, V> {
        public Long timeout;
        public K id;
        public V value;

        SendRequest() {
        }
    }

    static class RecvRequest<K, V>
    extends Request<K, V> {
        public K id;
        public Long timeout;

        RecvRequest() {
        }
    }

    static class Request<K, V> {
        public long messageId;

        Request() {
        }
    }
}

