/*
 * Decompiled with CFR 0.152.
 */
package cascading.avro;

import cascading.avro.AvroTuple;
import cascading.avro.CascadingToAvro;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.CompositeTap;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroSerialization;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

public class AvroScheme
extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
    protected Schema schema;
    private static final String DEFAULT_RECORD_NAME = "CascadingAvroRecord";
    private static final PathFilter filter = new PathFilter(){

        public boolean accept(Path path) {
            return !path.getName().startsWith("_");
        }
    };

    public AvroScheme() {
        this(null);
    }

    public AvroScheme(Fields fields, Class<?>[] types) {
        this(CascadingToAvro.generateAvroSchemaFromFieldsAndTypes(DEFAULT_RECORD_NAME, fields, types));
    }

    public AvroScheme(Schema schema) {
        this.schema = schema;
        if (schema == null) {
            this.setSinkFields(Fields.ALL);
            this.setSourceFields(Fields.UNKNOWN);
        } else {
            Fields cascadingFields = new Fields(new Comparable[0]);
            for (Schema.Field avroField : schema.getFields()) {
                cascadingFields = cascadingFields.append(new Fields(new Comparable[]{avroField.name()}));
            }
            this.setSinkFields(cascadingFields);
            this.setSourceFields(cascadingFields);
        }
    }

    protected String getJsonSchema() {
        if (this.schema == null) {
            return "";
        }
        return this.schema.toString();
    }

    public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
        GenericData.Record record = new GenericData.Record((Schema)((Object[])sinkCall.getContext())[0]);
        Object[] objectArray = CascadingToAvro.parseTupleEntry(tupleEntry, (Schema)((Object[])sinkCall.getContext())[0]);
        for (int i = 0; i < objectArray.length; ++i) {
            record.put(i, objectArray[i]);
        }
        ((OutputCollector)sinkCall.getOutput()).collect((Object)new AvroWrapper((Object)record), (Object)NullWritable.get());
    }

    public void sinkPrepare(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        sinkCall.setContext((Object)new Object[]{this.schema});
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
        if (this.schema == null) {
            throw new RuntimeException("Must provide sink schema");
        }
        conf.set("avro.output.schema", this.schema.toString());
        conf.setOutputFormat(AvroOutputFormat.class);
        Collection serializations = conf.getStringCollection("io.serializations");
        if (!serializations.contains(AvroSerialization.class.getName())) {
            serializations.add(AvroSerialization.class.getName());
            conf.setStrings("io.serializations", serializations.toArray(new String[serializations.size()]));
        }
    }

    public Fields retrieveSourceFields(FlowProcess<JobConf> flowProcess, Tap tap) {
        if (this.schema == null) {
            try {
                this.schema = this.getSourceSchema(flowProcess, tap);
            }
            catch (IOException e) {
                throw new RuntimeException("Can't get schema from data source");
            }
        }
        Fields cascadingFields = new Fields(new Comparable[0]);
        if (this.schema.getType().equals((Object)Schema.Type.NULL)) {
            cascadingFields = Fields.NONE;
        } else {
            for (Schema.Field avroField : this.schema.getFields()) {
                cascadingFields = cascadingFields.append(new Fields(new Comparable[]{avroField.name()}));
            }
        }
        this.setSourceFields(cascadingFields);
        return this.getSourceFields();
    }

    public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        AvroWrapper wrapper;
        RecordReader input = (RecordReader)sourceCall.getInput();
        if (!input.next((Object)(wrapper = (AvroWrapper)input.createKey()), input.createValue())) {
            return false;
        }
        GenericData.Record record = (GenericData.Record)wrapper.datum();
        TupleEntry tupleEntry = sourceCall.getIncomingEntry();
        tupleEntry.setTuple((Tuple)new AvroTuple(record));
        return true;
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
        this.retrieveSourceFields(flowProcess, tap);
        conf.set("avro.input.schema", this.schema.toString());
        conf.setInputFormat(AvroInputFormat.class);
        Collection serializations = conf.getStringCollection("io.serializations");
        if (!serializations.contains(AvroSerialization.class.getName())) {
            serializations.add(AvroSerialization.class.getName());
            conf.setStrings("io.serializations", serializations.toArray(new String[serializations.size()]));
        }
    }

    private Schema getSourceSchema(FlowProcess<JobConf> flowProcess, Tap tap) throws IOException {
        if (tap instanceof CompositeTap) {
            tap = (Tap)((CompositeTap)tap).getChildTaps().next();
        }
        String path = tap.getIdentifier();
        Path p = new Path(path);
        FileSystem fs = p.getFileSystem((Configuration)flowProcess.getConfigCopy());
        LinkedList<FileStatus> statuses = new LinkedList<FileStatus>(Arrays.asList(fs.globStatus(p, filter)));
        for (FileStatus status : new LinkedList<FileStatus>(statuses)) {
            if (!status.isDir()) continue;
            for (FileStatus child : Arrays.asList(fs.listStatus(status.getPath(), filter))) {
                if (!child.isDir()) continue;
                statuses.addAll(Arrays.asList(fs.listStatus(child.getPath(), filter)));
            }
        }
        for (FileStatus status : statuses) {
            Path statusPath = status.getPath();
            if (!fs.isFile(statusPath)) continue;
            BufferedInputStream stream = new BufferedInputStream((InputStream)fs.open(statusPath));
            DataFileStream reader = new DataFileStream((InputStream)stream, (DatumReader)new GenericDatumReader());
            return reader.getSchema();
        }
        return Schema.create((Schema.Type)Schema.Type.NULL);
    }

    private static Schema readSchema(ObjectInputStream in) throws IOException {
        Schema.Parser parser = new Schema.Parser();
        return parser.parse(in.readUTF());
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeUTF(this.schema.toString());
    }

    private void readObject(ObjectInputStream in) throws IOException {
        this.schema = AvroScheme.readSchema(in);
    }
}

