/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap.hadoop;

import cascading.CascadingException;
import cascading.tap.hadoop.MultiInputSplit;
import cascading.util.Util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import org.jets3t.service.S3ServiceException;

public class MultiInputFormat
implements InputFormat {
    private static final Logger LOG = Logger.getLogger(MultiInputFormat.class);

    public static void addInputFormat(JobConf toJob, JobConf ... fromJobs) {
        toJob.setInputFormat(MultiInputFormat.class);
        ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
        ArrayList allPaths = new ArrayList();
        boolean isLocal = false;
        for (JobConf fromJob : fromJobs) {
            configs.add(MultiInputFormat.getConfig(toJob, fromJob));
            Collections.addAll(allPaths, FileInputFormat.getInputPaths((JobConf)fromJob));
            if (isLocal) continue;
            isLocal = fromJob.get("mapred.job.tracker").equalsIgnoreCase("local");
        }
        FileInputFormat.setInputPaths((JobConf)toJob, (Path[])allPaths.toArray(new Path[allPaths.size()]));
        try {
            toJob.set("cascading.multiinputformats", Util.serializeBase64(configs));
        }
        catch (IOException exception) {
            throw new CascadingException("unable to pack input formats", exception);
        }
        if (isLocal) {
            toJob.set("mapred.job.tracker", "local");
        }
    }

    public static Map<String, String> getConfig(JobConf toJob, JobConf fromJob) {
        HashMap<String, String> configs = new HashMap<String, String>();
        for (Map.Entry entry : fromJob) {
            configs.put((String)entry.getKey(), (String)entry.getValue());
        }
        for (Map.Entry entry : toJob) {
            String value = (String)configs.get(entry.getKey());
            if (entry.getValue() == null) continue;
            if (value == null && entry.getValue() == null) {
                configs.remove(entry.getKey());
            }
            if (value != null && value.equals(entry.getValue())) {
                configs.remove(entry.getKey());
            }
            configs.remove("mapred.working.dir");
        }
        return configs;
    }

    public static JobConf[] getJobConfs(JobConf job, List<Map<String, String>> configs) {
        JobConf[] jobConfs = new JobConf[configs.size()];
        for (int i = 0; i < jobConfs.length; ++i) {
            jobConfs[i] = MultiInputFormat.mergeConf(job, configs.get(i), false);
        }
        return jobConfs;
    }

    static JobConf mergeConf(JobConf job, Map<String, String> config, boolean directly) {
        JobConf currentConf = directly ? job : new JobConf((Configuration)job);
        for (String key : config.keySet()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("merging key: " + key + " value: " + config.get(key)));
            }
            currentConf.set(key, config.get(key));
        }
        return currentConf;
    }

    static InputFormat[] getInputFormats(JobConf[] jobConfs) {
        InputFormat[] inputFormats = new InputFormat[jobConfs.length];
        for (int i = 0; i < jobConfs.length; ++i) {
            inputFormats[i] = jobConfs[i].getInputFormat();
        }
        return inputFormats;
    }

    private List<Map<String, String>> getConfigs(JobConf job) throws IOException {
        return (List)Util.deserializeBase64(job.get("cascading.multiinputformats"));
    }

    public void validateInput(JobConf job) throws IOException {
    }

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        numSplits = numSplits == 0 ? 1 : numSplits;
        List<Map<String, String>> configs = this.getConfigs(job);
        JobConf[] jobConfs = MultiInputFormat.getJobConfs(job, configs);
        InputFormat[] inputFormats = MultiInputFormat.getInputFormats(jobConfs);
        if (inputFormats.length == 1) {
            return this.collapse(this.getSplits(inputFormats, jobConfs, new int[]{numSplits}), configs);
        }
        int[] indexedSplits = new int[inputFormats.length];
        if (numSplits <= inputFormats.length) {
            Arrays.fill(indexedSplits, 1);
            return this.collapse(this.getSplits(inputFormats, jobConfs, indexedSplits), configs);
        }
        long[] inputSplitSizes = this.getInputSplitSizes(inputFormats, jobConfs, numSplits);
        long totalSplitSize = this.sum(inputSplitSizes);
        if (totalSplitSize == 0L) {
            Arrays.fill(indexedSplits, 1);
            return this.collapse(this.getSplits(inputFormats, jobConfs, indexedSplits), configs);
        }
        for (int i = 0; i < inputSplitSizes.length; ++i) {
            int useSplits = (int)Math.ceil((double)numSplits * (double)inputSplitSizes[i] / (double)totalSplitSize);
            indexedSplits[i] = useSplits == 0 ? 1 : useSplits;
        }
        return this.collapse(this.getSplits(inputFormats, jobConfs, indexedSplits), configs);
    }

    private long sum(long[] inputSizes) {
        long size = 0L;
        for (long inputSize : inputSizes) {
            size += inputSize;
        }
        return size;
    }

    private InputSplit[] collapse(InputSplit[][] splits, List<Map<String, String>> configs) {
        ArrayList<MultiInputSplit> splitsList = new ArrayList<MultiInputSplit>();
        for (int i = 0; i < splits.length; ++i) {
            InputSplit[] split = splits[i];
            for (int j = 0; j < split.length; ++j) {
                splitsList.add(new MultiInputSplit(split[j], configs.get(i)));
            }
        }
        return splitsList.toArray(new InputSplit[splitsList.size()]);
    }

    private InputSplit[][] getSplits(InputFormat[] inputFormats, JobConf[] jobConfs, int[] numSplits) throws IOException {
        InputSplit[][] inputSplits = new InputSplit[inputFormats.length][];
        for (int i = 0; i < inputFormats.length; ++i) {
            inputSplits[i] = inputFormats[i].getSplits(jobConfs[i], numSplits[i]);
        }
        return inputSplits;
    }

    private long[] getInputSplitSizes(InputFormat[] inputFormats, JobConf[] jobConfs, int numSplits) throws IOException {
        long[] inputSizes = new long[inputFormats.length];
        for (int i = 0; i < inputFormats.length; ++i) {
            InputFormat inputFormat = inputFormats[i];
            InputSplit[] splits = inputFormat.getSplits(jobConfs[i], numSplits);
            inputSizes[i] = splits.length;
        }
        return inputSizes;
    }

    public RecordReader getRecordReader(InputSplit split, JobConf job, final Reporter reporter) throws IOException {
        final MultiInputSplit multiSplit = (MultiInputSplit)split;
        final JobConf currentConf = MultiInputFormat.mergeConf(job, multiSplit.config, true);
        try {
            return Util.retry(LOG, 3, 20, "unable to get record reader", new Util.RetryOperator<RecordReader>(){

                @Override
                public RecordReader operate() throws Exception {
                    return currentConf.getInputFormat().getRecordReader(multiSplit.inputSplit, currentConf, reporter);
                }

                @Override
                public boolean rethrow(Exception exception) {
                    return !(exception.getCause() instanceof S3ServiceException);
                }
            });
        }
        catch (Exception exception) {
            if (exception instanceof RuntimeException) {
                throw (RuntimeException)exception;
            }
            throw (IOException)exception;
        }
    }
}

