/*
 * Decompiled with CFR 0.152.
 */
package stormy.pythian.sandbox;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.CombinerAggregator;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.MapGet;
import storm.trident.state.QueryFunction;
import storm.trident.state.StateFactory;
import storm.trident.tuple.TridentTuple;
import stormy.pythian.model.annotation.ComponentType;
import stormy.pythian.model.annotation.Documentation;
import stormy.pythian.model.annotation.ExpectedFeature;
import stormy.pythian.model.annotation.InputStream;
import stormy.pythian.model.annotation.Mapper;
import stormy.pythian.model.annotation.MappingType;
import stormy.pythian.model.annotation.OutputStream;
import stormy.pythian.model.annotation.State;
import stormy.pythian.model.component.Component;
import stormy.pythian.model.instance.InputFixedFeaturesMapper;
import stormy.pythian.model.instance.Instance;
import stormy.pythian.model.instance.OutputFixedFeaturesMapper;

@Documentation(name="Word count", type=ComponentType.ANALYTICS)
public class WordCount
implements Component {
    private static final long serialVersionUID = 1822765078810762926L;
    public static final String WORD_FEATURE = "word";
    public static final String COUNT_FEATURE = "count";
    @InputStream(name="in", type=MappingType.FIXED_FEATURES, expectedFeatures={@ExpectedFeature(name="word", type=String.class)})
    private Stream in;
    @OutputStream(name="out", from="in", newFeatures={@ExpectedFeature(name="count", type=Integer.class)})
    private Stream out;
    @Mapper(stream="in")
    private InputFixedFeaturesMapper inputMapper;
    @Mapper(stream="out")
    private OutputFixedFeaturesMapper outputMapper;
    @State(name="count state")
    private StateFactory stateFactory;

    public void init() {
        TridentState wordCounts = this.in.each(new Fields(new String[]{"INSTANCE_FIELD"}), (Function)new ExtractFeature(WORD_FEATURE, this.inputMapper), new Fields(new String[]{WORD_FEATURE})).groupBy(new Fields(new String[]{WORD_FEATURE})).persistentAggregate(this.stateFactory, new Fields(new String[]{WORD_FEATURE}), (CombinerAggregator)new Count(), new Fields(new String[]{COUNT_FEATURE}));
        this.out = this.in.each(new Fields(new String[]{"INSTANCE_FIELD"}), (Function)new ExtractFeature(WORD_FEATURE, this.inputMapper), new Fields(new String[]{WORD_FEATURE})).stateQuery(wordCounts, new Fields(new String[]{WORD_FEATURE}), (QueryFunction)new MapGet(), new Fields(new String[]{COUNT_FEATURE})).each(new Fields(new String[]{"INSTANCE_FIELD", COUNT_FEATURE}), (Function)new AddCountFeature(this.outputMapper), new Fields(new String[]{"NEW_INSTANCE_FIELD"}));
    }

    private static class AddCountFeature
    extends BaseFunction {
        private final OutputFixedFeaturesMapper outMapper;

        public AddCountFeature(OutputFixedFeaturesMapper outMapper) {
            this.outMapper = outMapper;
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            Instance original = Instance.from((TridentTuple)tuple);
            Long count = tuple.getLongByField(WordCount.COUNT_FEATURE);
            Instance updated = original.withFeature(this.outMapper, WordCount.COUNT_FEATURE, (Object)count);
            collector.emit((List)new Values(new Object[]{updated}));
        }
    }

    private static class ExtractFeature
    extends BaseFunction {
        private final String featureName;
        private final InputFixedFeaturesMapper inputMapper;

        public ExtractFeature(String featureName, InputFixedFeaturesMapper mapper) {
            this.featureName = featureName;
            this.inputMapper = mapper;
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            Instance instance = Instance.from((TridentTuple)tuple);
            Object feature = instance.getInputFeature(this.inputMapper, this.featureName);
            collector.emit((List)new Values(new Object[]{feature}));
        }
    }
}

