/*
 * Decompiled with CFR 0.152.
 */
package com.aphyr.riemann.client;

import com.aphyr.riemann.Proto;
import com.aphyr.riemann.client.Promise;
import com.aphyr.riemann.client.Write;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;

public class TcpHandler
extends SimpleChannelHandler {
    public final LinkedBlockingQueue<Promise<Proto.Msg>> queue = new LinkedBlockingQueue();
    public volatile Channel channel;
    public volatile IOException lastError = new IOException("Channel closed.");
    public final ChannelGroup channelGroup;

    public TcpHandler(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    public void channelOpen(ChannelHandlerContext c, ChannelStateEvent e) {
        this.channelGroup.add((Object)e.getChannel());
    }

    public void channelConnected(ChannelHandlerContext c, ChannelStateEvent e) {
        this.channel = e.getChannel();
    }

    public void channelClosed(ChannelHandlerContext c, ChannelStateEvent e) {
        Promise<Proto.Msg> promise;
        this.channel = null;
        this.channelGroup.remove((Object)e.getChannel());
        IOException ex = new IOException("Connection closed.");
        while ((promise = this.queue.poll()) != null) {
            promise.deliver(ex);
        }
    }

    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (!(e instanceof MessageEvent)) {
            ctx.sendDownstream(e);
            return;
        }
        MessageEvent me = (MessageEvent)e;
        if (!(me.getMessage() instanceof Write)) {
            ctx.sendUpstream((ChannelEvent)me);
            return;
        }
        Write write = (Write)me.getMessage();
        Proto.Msg message = write.message;
        final Promise<Proto.Msg> promise = write.promise;
        me.getFuture().addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    TcpHandler.this.queue.put(promise);
                } else if (future.getCause() != null) {
                    promise.deliver(new IOException("Write failed.", future.getCause()));
                } else {
                    promise.deliver(new IOException("Write failed."));
                }
            }
        });
        Channels.write((ChannelHandlerContext)ctx, (ChannelFuture)me.getFuture(), (Object)message);
    }

    public void messageReceived(ChannelHandlerContext c, MessageEvent e) {
        Proto.Msg message = (Proto.Msg)e.getMessage();
        this.queue.poll().deliver(message);
    }

    public void exceptionCaught(ChannelHandlerContext c, ExceptionEvent e) {
        e.getChannel().close();
    }
}

