/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow;

import cascading.flow.ElementGraph;
import cascading.flow.ElementGraphException;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowElement;
import cascading.flow.FlowPlanner;
import cascading.flow.PlannerException;
import cascading.flow.Scope;
import cascading.flow.StepGraph;
import cascading.flow.hadoop.HadoopUtil;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.Group;
import cascading.pipe.OperatorException;
import cascading.pipe.Pipe;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.TempHfs;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;

public class MultiMapReducePlanner
extends FlowPlanner {
    private static final Logger LOG = Logger.getLogger(MultiMapReducePlanner.class);
    private JobConf jobConf;
    private final Class intermediateSchemeClass;

    public static void setJobConf(Map<Object, Object> properties, JobConf jobConf) {
        properties.put("cascading.hadoop.jobconf", jobConf);
    }

    public static JobConf getJobConf(Map<Object, Object> properties) {
        return Util.getProperty(properties, "cascading.hadoop.jobconf", null);
    }

    public static void setNormalizeHeterogeneousSources(Map<Object, Object> properties, boolean doNormalize) {
        properties.put("cascading.multimapreduceplanner.normalizesources", Boolean.toString(doNormalize));
    }

    public static boolean getNormalizeHeterogeneousSources(Map<Object, Object> properties) {
        return Boolean.parseBoolean(Util.getProperty(properties, "cascading.multimapreduceplanner.normalizesources", "false"));
    }

    protected MultiMapReducePlanner(Map<Object, Object> properties) {
        super(properties);
        this.jobConf = HadoopUtil.createJobConf(properties, MultiMapReducePlanner.getJobConf(properties));
        this.intermediateSchemeClass = FlowConnector.getIntermediateSchemeClass(properties);
        Class type = FlowConnector.getApplicationJarClass(properties);
        if (this.jobConf.getJar() == null && type != null) {
            this.jobConf.setJarByClass(type);
        }
        String path = FlowConnector.getApplicationJarPath(properties);
        if (this.jobConf.getJar() == null && path != null) {
            this.jobConf.setJar(path);
        }
        if (this.jobConf.getJar() == null) {
            this.jobConf.setJarByClass(Util.findMainClass(MultiMapReducePlanner.class));
        }
        LOG.info((Object)("using application jar: " + this.jobConf.getJar()));
    }

    public Flow buildFlow(String flowName, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps) {
        ElementGraph elementGraph = null;
        try {
            this.verifyAssembly(pipes, sources, sinks, traps);
            elementGraph = this.createElementGraph(pipes, sources, sinks, traps);
            this.failOnLoneGroupAssertion(elementGraph);
            this.failOnMissingGroup(elementGraph);
            this.failOnMisusedBuffer(elementGraph);
            this.handleSplit(elementGraph);
            this.handleGroupPartitioning(elementGraph);
            this.handleNonSafeOperations(elementGraph);
            if (MultiMapReducePlanner.getNormalizeHeterogeneousSources(this.properties)) {
                this.handleHeterogeneousSources(elementGraph);
            }
            elementGraph.removeUnnecessaryPipes();
            elementGraph.resolveFields();
            this.handleAdjacentTaps(elementGraph);
            StepGraph stepGraph = new StepGraph(flowName, elementGraph, traps);
            sources = new HashMap<String, Tap>(sources);
            sinks = new HashMap<String, Tap>(sinks);
            traps = new HashMap<String, Tap>(traps);
            return new Flow(this.properties, this.jobConf, flowName, elementGraph, stepGraph, sources, sinks, traps);
        }
        catch (PlannerException exception) {
            exception.elementGraph = elementGraph;
            throw exception;
        }
        catch (ElementGraphException exception) {
            Throwable cause = exception.getCause();
            if (cause == null) {
                cause = exception;
            }
            String message = String.format("could not build flow from assembly: [%s]", cause.getMessage());
            if (cause instanceof OperatorException) {
                throw new PlannerException(message, cause, elementGraph);
            }
            if (cause instanceof TapException) {
                throw new PlannerException(message, cause, elementGraph);
            }
            throw new PlannerException(exception.getPipe(), message, cause, elementGraph);
        }
        catch (Exception exception) {
            String message = String.format("could not build flow from assembly: [%s]", exception.getMessage());
            throw new PlannerException(message, exception, elementGraph);
        }
    }

    private void handleSplit(ElementGraph elementGraph) {
        while (!this.internalSplit(elementGraph)) {
        }
    }

    private boolean internalSplit(ElementGraph elementGraph) {
        List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents();
        for (GraphPath<FlowElement, Scope> path : paths) {
            List flowElements = Graphs.getPathVertexList(path);
            HashSet<Pipe> tapInsertions = new HashSet<Pipe>();
            FlowElement lastInsertable = null;
            for (int i = 0; i < flowElements.size(); ++i) {
                int maxPaths;
                FlowElement flowElement = (FlowElement)flowElements.get(i);
                if (flowElement instanceof ElementGraph.Extent) continue;
                if (flowElement instanceof Tap || flowElement instanceof Group || flowElement instanceof Every) {
                    lastInsertable = flowElement;
                }
                if (flowElement.getClass() == Pipe.class && flowElements.get(i - 1) instanceof Tap || flowElement instanceof Tap || elementGraph.outDegreeOf(flowElement) <= 1 || (maxPaths = elementGraph.getMaxNumPathsBetweenElementAndMergJoin(flowElement)) <= 1 && lastInsertable instanceof Tap) continue;
                tapInsertions.add((Pipe)flowElement);
            }
            for (Pipe pipe : tapInsertions) {
                this.insertTempTapAfter(elementGraph, pipe);
            }
            if (tapInsertions.isEmpty()) continue;
            return false;
        }
        return true;
    }

    private void handleNonSafeOperations(ElementGraph elementGraph) {
        while (!this.internalNonSafeOperations(elementGraph)) {
        }
    }

    private boolean internalNonSafeOperations(ElementGraph elementGraph) {
        HashSet<Each> tapInsertions = new HashSet<Each>();
        List<Each> splits = elementGraph.findAllEachSplits();
        for (Each each : splits) {
            List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo(each);
            block1: for (GraphPath<FlowElement, Scope> path : paths) {
                FlowElement element;
                List elements = Graphs.getPathVertexList(path);
                Collections.reverse(elements);
                Iterator i$ = elements.iterator();
                while (i$.hasNext() && (element = (FlowElement)i$.next()) instanceof Each) {
                    if (((Each)element).getOperation().isSafe()) continue;
                    tapInsertions.add(each);
                    continue block1;
                }
            }
        }
        for (Pipe pipe : tapInsertions) {
            this.insertTempTapAfter(elementGraph, pipe);
        }
        return tapInsertions.isEmpty();
    }

    private void handleAdjacentTaps(ElementGraph elementGraph) {
        while (!this.internalAdjacentTaps(elementGraph)) {
        }
    }

    private boolean internalAdjacentTaps(ElementGraph elementGraph) {
        List<Tap> taps = elementGraph.findAllTaps();
        for (Tap tap : taps) {
            if (!(tap instanceof TempHfs)) continue;
            for (FlowElement successor : elementGraph.getAllSuccessors(tap)) {
                URI successorURIScheme;
                URI tempURIScheme;
                Hfs successorTap;
                if (!(successor instanceof Hfs) || !(successorTap = (Hfs)successor).getScheme().isSymetrical() || !(tempURIScheme = this.getDefaultURIScheme(tap)).equals(successorURIScheme = this.getURIScheme(successorTap)) || !tap.getScheme().getSourceFields().equals(successorTap.getScheme().getSourceFields())) continue;
                elementGraph.replaceElementWith(tap, successor);
                return false;
            }
        }
        return true;
    }

    private URI getDefaultURIScheme(Tap tap) {
        try {
            return ((Hfs)tap).getDefaultFileSystemURIScheme(this.jobConf);
        }
        catch (IOException exception) {
            throw new PlannerException("unable to get default URI scheme from tap: " + tap);
        }
    }

    private URI getURIScheme(Tap tap) {
        try {
            return ((Hfs)tap).getURIScheme(this.jobConf);
        }
        catch (IOException exception) {
            throw new PlannerException("unable to get URI scheme from tap: " + tap);
        }
    }

    private void handleHeterogeneousSources(ElementGraph elementGraph) {
        while (!this.internalHeterogeneousSources(elementGraph)) {
        }
    }

    private boolean internalHeterogeneousSources(ElementGraph elementGraph) {
        Set<Tap> taps;
        List<Group> groups = elementGraph.findAllMergeJoinGroups();
        HashMap<Group, Set<Tap>> normalizeGroups = new HashMap<Group, Set<Tap>>();
        block0: for (Group group : groups) {
            taps = new HashSet();
            block1: for (GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo(group)) {
                List flowElements = Graphs.getPathVertexList(path);
                Collections.reverse(flowElements);
                for (FlowElement previousElement : flowElements) {
                    if (!(previousElement instanceof Tap)) continue;
                    taps.add((Tap)previousElement);
                    continue block1;
                }
            }
            if (taps.size() == 1) continue;
            Iterator iterator = taps.iterator();
            Tap commonTap = (Tap)iterator.next();
            while (iterator.hasNext()) {
                Tap tap = (Tap)iterator.next();
                if (this.getSchemeClass(tap) == this.getSchemeClass(commonTap)) continue;
                normalizeGroups.put(group, taps);
                continue block0;
            }
        }
        for (Group group : normalizeGroups.keySet()) {
            taps = (Set)normalizeGroups.get(group);
            for (Tap tap : taps) {
                if (tap instanceof TempHfs || this.getSchemeClass(tap).equals(this.intermediateSchemeClass)) continue;
                for (GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetween(tap, group)) {
                    List flowElements = Graphs.getPathVertexList(path);
                    Collections.reverse(flowElements);
                    FlowElement flowElement = (FlowElement)flowElements.get(1);
                    if (flowElement instanceof TempHfs) continue;
                    LOG.warn((Object)("inserting step to normalize incompatible sources: " + tap));
                    this.insertTempTapAfter(elementGraph, (Pipe)flowElement);
                    return false;
                }
            }
        }
        return normalizeGroups.isEmpty();
    }

    private void handleGroupPartitioning(ElementGraph elementGraph) {
        while (!this.internalGroupPartitioning(elementGraph)) {
        }
    }

    private boolean internalGroupPartitioning(ElementGraph elementGraph) {
        for (GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents()) {
            List flowElements = Graphs.getPathVertexList(path);
            ArrayList<Pipe> tapInsertions = new ArrayList<Pipe>();
            boolean foundGroup = false;
            for (int i = 0; i < flowElements.size(); ++i) {
                FlowElement flowElement = (FlowElement)flowElements.get(i);
                if (flowElement instanceof ElementGraph.Extent || flowElement instanceof Tap && flowElements.get(i - 1) instanceof ElementGraph.Extent) continue;
                if (flowElement instanceof Group && !foundGroup) {
                    foundGroup = true;
                    continue;
                }
                if (flowElement instanceof Group && foundGroup) {
                    tapInsertions.add((Pipe)flowElements.get(i - 1));
                    continue;
                }
                if (!(flowElement instanceof Tap)) continue;
                foundGroup = false;
            }
            for (Pipe pipe : tapInsertions) {
                this.insertTempTapAfter(elementGraph, pipe);
            }
            if (tapInsertions.isEmpty()) continue;
            return false;
        }
        return true;
    }

    void insertTempTapAfter(ElementGraph graph, Pipe pipe) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("inserting tap after: " + pipe));
        }
        graph.insertFlowElementAfter(pipe, this.makeTemp(pipe));
    }

    private TempHfs makeTemp(Pipe pipe) {
        String name;
        name = name.substring(0, (name = pipe.getName()).length() < 25 ? name.length() : 25).replaceAll("\\s+|\\*|\\+", "_");
        return new TempHfs(name + "/" + (int)(Math.random() * 100000.0) + "/", this.intermediateSchemeClass);
    }

    private Class getSchemeClass(Tap tap) {
        if (tap instanceof TempHfs) {
            return ((TempHfs)tap).getSchemeClass();
        }
        return tap.getScheme().getClass();
    }
}

