/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.contrib.cassandra.bolt;

import backtype.storm.contrib.cassandra.bolt.CassandraBolt;
import backtype.storm.contrib.cassandra.bolt.mapper.ColumnsMapper;
import backtype.storm.contrib.cassandra.bolt.mapper.TupleMapper;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraLookupBolt
extends CassandraBolt
implements IBasicBolt {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraLookupBolt.class);
    private ColumnsMapper columnsMapper;

    public CassandraLookupBolt(TupleMapper tupleMapper, ColumnsMapper columnsMapper) {
        super(tupleMapper);
        this.columnsMapper = columnsMapper;
    }

    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String columnFamily = this.tupleMapper.mapToColumnFamily(input);
        String rowKey = this.tupleMapper.mapToRowKey(input);
        try {
            Map<String, String> colMap = this.cassandraClient.lookup(columnFamily, rowKey);
            List<Values> valuesToEmit = this.columnsMapper.mapToValues(rowKey, colMap, input);
            for (Values values : valuesToEmit) {
                collector.emit((List)values);
            }
        }
        catch (Exception e) {
            LOG.warn("Could not emit for row [" + rowKey + "] from Cassandra.", (Throwable)e);
        }
    }

    @Override
    public void cleanup() {
        super.cleanup();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.columnsMapper.declareOutputFields(declarer);
    }
}

