/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.contrib.cassandra.bolt;

import backtype.storm.contrib.cassandra.bolt.AbstractBatchingBolt;
import backtype.storm.contrib.cassandra.bolt.CassandraConstants;
import backtype.storm.contrib.cassandra.bolt.determinable.ColumnFamilyDeterminable;
import backtype.storm.contrib.cassandra.bolt.determinable.DefaultColumnFamilyDeterminable;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import java.util.Map;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BatchingCassandraBolt
extends AbstractBatchingBolt
implements CassandraConstants {
    private static final Logger LOG = LoggerFactory.getLogger(BatchingCassandraBolt.class);
    protected AckStrategy ackStrategy = AckStrategy.ACK_IGNORE;
    protected OutputCollector collector;
    private Fields declaredFields;
    private String cassandraHost;
    private String cassandraKeyspace;
    protected Cluster cluster;
    protected Keyspace keyspace;
    protected ColumnFamilyDeterminable cfDeterminable;

    public BatchingCassandraBolt(String columnFamily) {
        this(new DefaultColumnFamilyDeterminable(columnFamily));
    }

    public BatchingCassandraBolt(ColumnFamilyDeterminable cfDeterminable) {
        this.cfDeterminable = cfDeterminable;
    }

    public void setAckStrategy(AckStrategy strategy) {
        this.ackStrategy = strategy;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        super.prepare(stormConf, context, collector);
        LOG.debug("Preparing...");
        this.cassandraHost = (String)stormConf.get("cassandra.host");
        this.cassandraKeyspace = (String)stormConf.get("cassandra.keyspace");
        this.collector = collector;
        this.initCassandraConnection();
        if (this.ackStrategy == AckStrategy.ACK_ON_RECEIVE) {
            super.setAckOnReceive(true);
        }
    }

    private void initCassandraConnection() {
        try {
            this.cluster = HFactory.getOrCreateCluster((String)"cassandra-bolt", (CassandraHostConfigurator)new CassandraHostConfigurator(this.cassandraHost));
            this.keyspace = HFactory.createKeyspace((String)this.cassandraKeyspace, (Cluster)this.cluster);
        }
        catch (Throwable e) {
            LOG.warn("Preparation failed.", e);
            throw new IllegalStateException("Failed to prepare CassandraBolt", e);
        }
    }

    @Override
    public void cleanup() {
        super.cleanup();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        if (this.declaredFields != null) {
            declarer.declare(this.declaredFields);
        }
    }

    public static enum AckStrategy {
        ACK_IGNORE,
        ACK_ON_RECEIVE,
        ACK_ON_WRITE;

    }
}

