/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow;

import cascading.flow.Flow;
import cascading.flow.MapReduceFlowStep;
import cascading.flow.StepGraph;
import cascading.scheme.Scheme;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.log4j.Logger;

public class MapReduceFlow
extends Flow {
    private static final Logger LOG = Logger.getLogger(MapReduceFlow.class);
    private boolean deleteSinkOnInit = false;

    @ConstructorProperties(value={"jobConf"})
    public MapReduceFlow(JobConf jobConf) {
        this(jobConf.getJobName(), jobConf, false);
    }

    @ConstructorProperties(value={"jobConf", "deleteSinkOnInit"})
    public MapReduceFlow(JobConf jobConf, boolean deleteSinkOnInit) {
        this(jobConf.getJobName(), jobConf, deleteSinkOnInit);
    }

    @ConstructorProperties(value={"name", "jobConf"})
    public MapReduceFlow(String name, JobConf jobConf) {
        this(name, jobConf, false);
    }

    @ConstructorProperties(value={"name", "jobConf", "deleteSinkOnInit"})
    public MapReduceFlow(String name, JobConf jobConf, boolean deleteSinkOnInit) {
        this(name, jobConf, deleteSinkOnInit, true);
    }

    @ConstructorProperties(value={"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"})
    public MapReduceFlow(String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit) {
        this.deleteSinkOnInit = deleteSinkOnInit;
        this.stopJobsOnExit = stopJobsOnExit;
        this.setName(name);
        this.setSources(this.createSources(jobConf));
        this.setSinks(this.createSinks(jobConf));
        this.setTraps(this.createTraps(jobConf));
        this.setStepGraph(this.makeStepGraph(jobConf));
    }

    private StepGraph makeStepGraph(JobConf jobConf) {
        StepGraph stepGraph = new StepGraph();
        Tap sink = this.getSinks().values().iterator().next();
        MapReduceFlowStep step = new MapReduceFlowStep(sink.toString(), jobConf, sink);
        step.setParentFlowName(this.getName());
        stepGraph.addVertex(step);
        return stepGraph;
    }

    private Map<String, Tap> createSources(JobConf jobConf) {
        Path[] paths = FileInputFormat.getInputPaths((JobConf)jobConf);
        HashMap<String, Tap> taps = new HashMap<String, Tap>();
        for (Path path : paths) {
            taps.put(path.toString(), new Hfs((Scheme)new NullScheme(), path.toString()));
        }
        return taps;
    }

    private Map<String, Tap> createSinks(JobConf jobConf) {
        HashMap<String, Tap> taps = new HashMap<String, Tap>();
        String path = FileOutputFormat.getOutputPath((JobConf)jobConf).toString();
        taps.put(path, new Hfs((Scheme)new NullScheme(), path, this.deleteSinkOnInit));
        return taps;
    }

    private Map<String, Tap> createTraps(JobConf jobConf) {
        return new HashMap<String, Tap>();
    }

    class NullScheme
    extends Scheme {
        NullScheme() {
        }

        @Override
        public void sourceInit(Tap tap, JobConf conf) throws IOException {
        }

        @Override
        public void sinkInit(Tap tap, JobConf conf) throws IOException {
        }

        @Override
        public Tuple source(Object key, Object value) {
            if (value instanceof Comparable) {
                return new Tuple((Comparable)key, (Comparable)value);
            }
            return new Tuple((Comparable)key);
        }

        @Override
        public String toString() {
            return this.getClass().getSimpleName();
        }

        @Override
        public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException {
            throw new UnsupportedOperationException("sinking is not supported in the scheme");
        }
    }
}

