/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.syslog;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.productivity.java.syslog4j.server.SyslogServer;
import org.productivity.java.syslog4j.server.SyslogServerConfigIF;
import org.productivity.java.syslog4j.server.SyslogServerEventHandlerIF;
import org.productivity.java.syslog4j.server.SyslogServerEventIF;
import org.productivity.java.syslog4j.server.SyslogServerIF;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class SyslogSpout
implements IBatchSpout,
SyslogServerEventHandlerIF {
    private static final long serialVersionUID = -1326400481240203319L;
    public static final int BATCH_SIZE = 10;
    private String protocol;
    private int port;
    private transient BlockingQueue<String> syslog;
    private transient SyslogServerIF server;

    public SyslogSpout(String protocol, int port) {
        this.protocol = protocol;
        this.port = port;
    }

    public void event(SyslogServerIF server, SyslogServerEventIF event) {
        boolean interrupted = false;
        do {
            if (this.syslog.offer(event.getMessage())) continue;
            try {
                this.syslog.take();
                this.syslog.offer(event.getMessage());
                interrupted = false;
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
        } while (interrupted);
    }

    public void open(Map conf, TopologyContext context) {
        this.syslog = new ArrayBlockingQueue<String>(100000);
        this.server = SyslogServer.getThreadedInstance((String)this.protocol.toLowerCase());
        SyslogServerConfigIF config = this.server.getConfig();
        config.setPort(this.port);
        config.addEventHandler((SyslogServerEventHandlerIF)this);
    }

    public void emitBatch(long batchId, TridentCollector collector) {
        boolean interrupted = false;
        do {
            try {
                for (int i = 0; i < 10; ++i) {
                    String message = this.syslog.take();
                    collector.emit((List)new Values(new Object[]{message}));
                }
                interrupted = false;
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
        } while (interrupted);
    }

    public void ack(long batchId) {
    }

    public void close() {
        this.server.getThread().stop();
    }

    public Map getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1);
        return conf;
    }

    public Fields getOutputFields() {
        return new Fields(new String[]{"message"});
    }
}

