/*
 * Decompiled with CFR 0.152.
 */
package cascading.pipe.cogroup;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.pipe.cogroup.GroupClosure;
import cascading.pipe.cogroup.Joiner;
import cascading.tuple.Fields;
import cascading.tuple.IndexTuple;
import cascading.tuple.SpillableTupleList;
import cascading.tuple.Tuple;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;

public class CoGroupClosure
extends GroupClosure {
    public static final String SPILL_THRESHOLD = "cascading.cogroup.spill.threshold";
    private static final int defaultThreshold = 10000;
    public static final String SPILL_COMPRESS = "cascading.cogroup.spill.compress";
    public static final String SPILL_CODECS = "cascading.cogroup.spill.codecs";
    private static final String defaultCodecs = "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec";
    private static final Logger LOG = Logger.getLogger(CoGroupClosure.class);
    SpillableTupleList[] groups;
    private int numSelfJoins;
    private CompressionCodec codec;
    private long threshold;
    private JobConf conf;

    public CoGroupClosure(FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields) {
        super(groupingFields, valueFields);
        this.numSelfJoins = numSelfJoins;
        this.codec = this.getCompressionCodec(flowProcess);
        this.threshold = this.getLong(flowProcess, SPILL_THRESHOLD, 10000L);
        this.conf = ((HadoopFlowProcess)flowProcess).getJobConf();
        this.initLists(flowProcess);
    }

    @Override
    public int size() {
        return this.groups.length;
    }

    @Override
    public Iterator<Tuple> getIterator(int pos) {
        if (pos < 0 || pos >= this.groups.length) {
            throw new IllegalArgumentException("invalid group position: " + pos);
        }
        return this.makeIterator(pos, this.groups[pos].iterator());
    }

    public boolean isEmpty(int pos) {
        return this.groups[pos].isEmpty();
    }

    @Override
    public void reset(Joiner joiner, Tuple grouping, Iterator values) {
        super.reset(joiner, grouping, values);
        this.build();
    }

    private void build() {
        this.clearGroups();
        while (this.values.hasNext()) {
            IndexTuple current = (IndexTuple)this.values.next();
            int pos = current.getIndex();
            if (this.numSelfJoins == 0 && pos == 0) {
                this.groups[pos].setIterator(current, this.values);
                break;
            }
            boolean spilled = this.groups[pos].add(current.getTuple());
            if (!spilled || (this.groups[pos].getNumFiles() - 1) % 10 != 0) continue;
            LOG.info((Object)("spilled group: " + this.groupingFields[pos].printVerbose() + ", on grouping: " + this.getGrouping().print()));
            Runtime runtime = Runtime.getRuntime();
            long freeMem = runtime.freeMemory() / 1024L / 1024L;
            long maxMem = runtime.maxMemory() / 1024L / 1024L;
            long totalMem = runtime.totalMemory() / 1024L / 1024L;
            LOG.info((Object)("mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem));
        }
    }

    private void clearGroups() {
        for (SpillableTupleList group : this.groups) {
            group.clear();
        }
    }

    private void initLists(FlowProcess flowProcess) {
        int i;
        int numPipes = this.groupingFields.length;
        this.groups = new SpillableTupleList[Math.max(numPipes, this.numSelfJoins + 1)];
        for (i = 0; i < numPipes; ++i) {
            this.groups[i] = new SpillableTupleList(this.threshold, this.conf, this.codec, flowProcess);
        }
        for (i = 1; i < this.numSelfJoins + 1; ++i) {
            this.groups[i] = this.groups[0];
        }
    }

    private long getLong(FlowProcess flowProcess, String key, long defaultValue) {
        String value = (String)flowProcess.getProperty(key);
        if (value == null || value.length() == 0) {
            return defaultValue;
        }
        return Long.parseLong(value);
    }

    public CompressionCodec getCompressionCodec(FlowProcess flowProcess) {
        String compress = (String)flowProcess.getProperty(SPILL_COMPRESS);
        if (compress != null && !Boolean.parseBoolean(compress)) {
            return null;
        }
        String codecs = (String)flowProcess.getProperty(SPILL_CODECS);
        if (codecs == null || codecs.length() == 0) {
            codecs = defaultCodecs;
        }
        Class<CompressionCodec> codecClass = null;
        for (String codec : codecs.split("[,\\s]*")) {
            try {
                codecClass = Thread.currentThread().getContextClassLoader().loadClass(codec).asSubclass(CompressionCodec.class);
            }
            catch (ClassNotFoundException exception) {
                // empty catch block
            }
        }
        if (codecClass == null) {
            LOG.warn((Object)("codecs set, but unable to load any: " + codecs));
            return null;
        }
        return (CompressionCodec)ReflectionUtils.newInstance(codecClass, (Configuration)((HadoopFlowProcess)flowProcess).getJobConf());
    }
}

