/*
 * Decompiled with CFR 0.152.
 */
package stormy.pythian.component.file;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.log4j.Logger;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import stormy.pythian.model.annotation.ComponentType;
import stormy.pythian.model.annotation.Documentation;
import stormy.pythian.model.annotation.Mapper;
import stormy.pythian.model.annotation.MappingType;
import stormy.pythian.model.annotation.OutputStream;
import stormy.pythian.model.annotation.Property;
import stormy.pythian.model.annotation.Topology;
import stormy.pythian.model.component.Component;
import stormy.pythian.model.instance.Instance;
import stormy.pythian.model.instance.OutputUserSelectionFeaturesMapper;

@Documentation(name="Csv stream source", description="Read and parse a given csv file", type=ComponentType.STREAM_SOURCE)
public class CsvSteamSource
implements Component {
    private static final long serialVersionUID = -1157270927374405269L;
    public static final String LINE_FEATURE = "file line";
    private static final int DEFAULT_MAX_BATCH_SIZE = 500;
    @OutputStream(name="lines", type=MappingType.USER_SELECTION)
    private Stream out;
    @Mapper(stream="lines")
    private OutputUserSelectionFeaturesMapper mapper;
    @Property(name="File", description="The full path of the file to read", mandatory=true)
    private String filename;
    @Property(name="Max batch size")
    private Integer maxBatchSize;
    @Topology
    private TridentTopology topology;

    public void init() {
        FileSpout spout = new FileSpout(this.filename, this.mapper, (Integer)Objects.firstNonNull((Object)this.maxBatchSize, (Object)500));
        this.out = this.topology.newStream(FilenameUtils.getName((String)this.filename) + "-spout-" + RandomStringUtils.randomAlphabetic((int)5), (IBatchSpout)spout);
    }

    private static class FileSpout
    implements IBatchSpout {
        private static final long serialVersionUID = -4793933286470308346L;
        private static final Logger LOGGER = Logger.getLogger(FileSpout.class);
        private final String filename;
        private final OutputUserSelectionFeaturesMapper mapper;
        private final int maxBatchSize;
        private Long currentPosition = 0L;

        public FileSpout(String filename, OutputUserSelectionFeaturesMapper mapper, int maxBatchSize) {
            this.filename = filename;
            this.mapper = mapper;
            this.maxBatchSize = maxBatchSize;
        }

        public void emitBatch(long batchId, TridentCollector collector) {
            if (this.hasNext()) {
                List<Instance> instances = this.nextBatch();
                for (Instance instance : instances) {
                    collector.emit((List)new Values(new Object[]{instance}));
                }
            }
        }

        public Map getComponentConfiguration() {
            Config conf = new Config();
            conf.setMaxTaskParallelism(1);
            return conf;
        }

        public void close() {
        }

        public void ack(long batchId) {
        }

        public Fields getOutputFields() {
            return new Fields(new String[]{"NEW_INSTANCE_FIELD"});
        }

        public void open(Map conf, TopologyContext context) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean hasNext() {
            RandomAccessFile file = null;
            try {
                file = new RandomAccessFile(this.filename, "r");
                file.seek(this.currentPosition);
                boolean bl = file.readLine() != null;
                return bl;
            }
            catch (IOException e) {
                LOGGER.error((Object)("Error while reading " + this.filename), (Throwable)e);
                boolean bl = false;
                return bl;
            }
            finally {
                try {
                    file.close();
                }
                catch (IOException e) {
                    LOGGER.error((Object)("Error while closing " + this.filename), (Throwable)e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Instance> nextBatch() {
            ArrayList<Instance> instances = new ArrayList<Instance>(this.maxBatchSize);
            RandomAccessFile file = null;
            try {
                String line;
                file = new RandomAccessFile(this.filename, "r");
                file.seek(this.currentPosition);
                while (instances.size() < this.maxBatchSize && (line = file.readLine()) != null) {
                    try {
                        ArrayList features = Lists.newArrayList((Iterable)Splitter.on((String)",").limit(this.mapper.size()).trimResults().split((CharSequence)line));
                        Instance instance = Instance.newInstance((OutputUserSelectionFeaturesMapper)this.mapper, (List)features);
                        instances.add(instance);
                    }
                    catch (Exception ex) {
                        LOGGER.warn((Object)("Skipped instance : " + line));
                    }
                }
                this.currentPosition = file.getFilePointer();
                LOGGER.debug((Object)("Progress : " + new DecimalFormat("##.##").format(100.0 * (double)this.currentPosition.longValue() / (double)file.length()) + "%"));
                ArrayList<Instance> arrayList = instances;
                return arrayList;
            }
            catch (IOException e) {
                LOGGER.error((Object)("Error while reading " + this.filename), (Throwable)e);
                List<Instance> list = null;
                return list;
            }
            finally {
                try {
                    file.close();
                }
                catch (IOException e) {
                    LOGGER.error((Object)("Error while closing " + this.filename), (Throwable)e);
                }
            }
        }
    }
}

