/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.contrib.cassandra.bolt;

import backtype.storm.contrib.cassandra.bolt.CassandraConstants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBatchingBolt
implements IRichBolt,
CassandraConstants {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingBolt.class);
    private boolean ackOnReceive = false;
    private OutputCollector collector;
    private LinkedBlockingQueue<Tuple> queue;
    private BatchThread batchThread;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        int batchMaxSize = Utils.getInt((Object)Utils.get((Map)stormConf, (Object)"cassandra.batch.max_size", (Object)0));
        this.collector = collector;
        this.queue = new LinkedBlockingQueue();
        this.batchThread = new BatchThread(batchMaxSize);
        this.batchThread.start();
    }

    public void setAckOnReceive(boolean ackOnReceive) {
        this.ackOnReceive = ackOnReceive;
    }

    public final void execute(Tuple input) {
        if (this.ackOnReceive) {
            this.collector.ack(input);
        }
        this.queue.offer(input);
    }

    public void cleanup() {
        this.batchThread.stopRunning();
    }

    public abstract void executeBatch(List<Tuple> var1);

    private class BatchThread
    extends Thread {
        int batchMaxSize;
        boolean stopRequested;

        BatchThread() {
            this(0);
        }

        BatchThread(int batchMaxSize) {
            super("batch-bolt-thread");
            this.stopRequested = false;
            super.setDaemon(true);
            this.batchMaxSize = batchMaxSize;
        }

        @Override
        public void run() {
            while (!this.stopRequested) {
                try {
                    ArrayList<Tuple> batch = new ArrayList<Tuple>();
                    Tuple t = (Tuple)AbstractBatchingBolt.this.queue.take();
                    batch.add(t);
                    if (this.batchMaxSize > 0) {
                        AbstractBatchingBolt.this.queue.drainTo(batch, this.batchMaxSize);
                    } else {
                        AbstractBatchingBolt.this.queue.drainTo(batch);
                    }
                    AbstractBatchingBolt.this.executeBatch(batch);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        synchronized void stopRunning() {
            this.stopRequested = true;
        }
    }
}

