/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.firehose;

import com.ircclouds.irc.api.Callback;
import com.ircclouds.irc.api.IRCApi;
import com.ircclouds.irc.api.IRCApiImpl;
import com.ircclouds.irc.api.IServerParameters;
import com.ircclouds.irc.api.domain.IRCServer;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.ircclouds.irc.api.listeners.IMessageListener;
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
import com.ircclouds.irc.api.state.IIRCState;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.FirehoseFactory;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.segment.realtime.firehose.IrcInputRowParser;
import org.joda.time.DateTime;

public class IrcFirehoseFactory
implements FirehoseFactory<IrcInputRowParser> {
    private static final Logger log = new Logger(IrcFirehoseFactory.class);
    private final String nick;
    private final String host;
    private final List<String> channels;
    private volatile boolean closed = false;

    @JsonCreator
    public IrcFirehoseFactory(@JsonProperty(value="nick") String nick, @JsonProperty(value="host") String host, @JsonProperty(value="channels") List<String> channels) {
        this.nick = nick;
        this.host = host;
        this.channels = channels;
    }

    @JsonProperty
    public String getNick() {
        return this.nick;
    }

    @JsonProperty
    public String getHost() {
        return this.host;
    }

    @JsonProperty
    public List<String> getChannels() {
        return this.channels;
    }

    @Override
    public Firehose connect(final IrcInputRowParser firehoseParser, File temporaryDirectory) throws IOException {
        IRCApiImpl irc = new IRCApiImpl(Boolean.valueOf(false));
        final LinkedBlockingQueue queue = new LinkedBlockingQueue();
        irc.addListener((IMessageListener)new VariousMessageListenerAdapter(){

            public void onChannelMessage(ChannelPrivMsg aMsg) {
                try {
                    queue.put(Pair.of(DateTime.now(), aMsg));
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("interrupted adding message to queue", e);
                }
            }
        });
        log.info("connecting to irc server [%s]", this.host);
        irc.connect(new IServerParameters(){

            public String getNickname() {
                return IrcFirehoseFactory.this.nick;
            }

            public List<String> getAlternativeNicknames() {
                return Lists.newArrayList(IrcFirehoseFactory.this.nick + UUID.randomUUID(), IrcFirehoseFactory.this.nick + UUID.randomUUID(), IrcFirehoseFactory.this.nick + UUID.randomUUID());
            }

            public String getIdent() {
                return "druid";
            }

            public String getRealname() {
                return IrcFirehoseFactory.this.nick;
            }

            public IRCServer getServer() {
                return new IRCServer(IrcFirehoseFactory.this.host, Boolean.valueOf(false));
            }
        }, (Callback)new Callback<IIRCState>((IRCApi)irc){
            final /* synthetic */ IRCApi val$irc;
            {
                this.val$irc = iRCApi;
            }

            public void onSuccess(IIRCState aObject) {
                log.info("irc connection to server [%s] established", IrcFirehoseFactory.this.host);
                for (String chan : IrcFirehoseFactory.this.channels) {
                    log.info("Joining channel %s", chan);
                    this.val$irc.joinChannel(chan);
                }
            }

            public void onFailure(Exception e) {
                log.error(e, "Unable to connect to irc server [%s]", IrcFirehoseFactory.this.host);
                throw new RuntimeException("Unable to connect to server", e);
            }
        });
        this.closed = false;
        return new Firehose((IRCApi)irc){
            InputRow nextRow = null;
            final /* synthetic */ IRCApi val$irc;
            {
                this.val$irc = iRCApi;
            }

            /*
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            @Override
            public boolean hasMore() {
                try {
                    while (true) {
                        Pair nextMsg = (Pair)queue.poll(100L, TimeUnit.MILLISECONDS);
                        if (IrcFirehoseFactory.this.closed) {
                            return false;
                        }
                        if (nextMsg == null) continue;
                        try {
                            this.nextRow = firehoseParser.parse(nextMsg);
                            if (this.nextRow == null) continue;
                            return true;
                        }
                        catch (IllegalArgumentException iae) {
                            log.debug("ignoring invalid message in channel [%s]", ((ChannelPrivMsg)nextMsg.rhs).getChannelName());
                            continue;
                        }
                        break;
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    throw new RuntimeException("interrupted retrieving elements from queue", e);
                }
            }

            @Override
            public InputRow nextRow() {
                return this.nextRow;
            }

            @Override
            public Runnable commit() {
                return new Runnable(){

                    @Override
                    public void run() {
                    }
                };
            }

            @Override
            public void close() throws IOException {
                try {
                    log.info("disconnecting from irc server [%s]", IrcFirehoseFactory.this.host);
                    this.val$irc.disconnect("");
                }
                finally {
                    IrcFirehoseFactory.this.closed = true;
                }
            }
        };
    }
}

