/*
 * Decompiled with CFR 0.152.
 */
package com.infochimps.elasticsearch.pig;

import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.data.Tuple;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

public class ElasticSearchJsonIndex
extends StoreFunc
implements StoreFuncInterface {
    private static final Log LOG = LogFactory.getLog(ElasticSearchJsonIndex.class);
    protected RecordWriter writer = null;
    protected ObjectMapper mapper = new ObjectMapper();
    protected String idFieldName;
    protected String bulkSize;
    protected String esConfig;
    protected String esPlugins;
    private static final String ES_INDEX_NAME = "elasticsearch.index.name";
    private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
    private static final String ES_IS_JSON = "elasticsearch.is_json";
    private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
    private static final String ES_FIELD_NAMES = "elasticsearch.field.names";
    private static final String ES_ID_FIELD = "elasticsearch.id.field";
    private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
    private static final String SLASH = "/";
    private static final String NO_ID_FIELD = "-1";
    private static final String LOCAL_SCHEME = "file://";
    private static final String DEFAULT_BULK = "1000";
    private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
    private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
    private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
    private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";

    public ElasticSearchJsonIndex() {
        this(NO_ID_FIELD, DEFAULT_BULK);
    }

    public ElasticSearchJsonIndex(String idFieldName, String bulkSize) {
        this(idFieldName, bulkSize, DEFAULT_ES_CONFIG);
    }

    public ElasticSearchJsonIndex(String idFieldName, String bulkSize, String esConfig) {
        this(idFieldName, bulkSize, esConfig, DEFAULT_ES_PLUGINS);
    }

    public ElasticSearchJsonIndex(String idFieldName, String bulkSize, String esConfig, String esPlugins) {
        this.idFieldName = idFieldName;
        this.bulkSize = bulkSize;
        this.esConfig = esConfig;
        this.esPlugins = esPlugins;
    }

    public void checkSchema(ResourceSchema s) throws IOException {
    }

    public void setStoreLocation(String location, Job job) throws IOException {
        String[] es_store = location.substring(5).split(SLASH);
        if (es_store.length != 2) {
            throw new RuntimeException("Please specify a valid elasticsearch index, eg. es://myindex/myobj");
        }
        Configuration conf = job.getConfiguration();
        if (conf.get(ES_INDEX_NAME) == null) {
            try {
                job.getConfiguration().set(ES_INDEX_NAME, es_store[0]);
                job.getConfiguration().set(ES_OBJECT_TYPE, es_store[1]);
            }
            catch (ArrayIndexOutOfBoundsException e) {
                throw new RuntimeException("You must specify both an index and an object type.");
            }
            job.getConfiguration().setBoolean(ES_IS_JSON, true);
            job.getConfiguration().set(ES_BULK_SIZE, this.bulkSize);
            job.getConfiguration().set(ES_ID_FIELD_NAME, this.idFieldName);
            try {
                Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
                Path hdfsPluginsPath = new Path(ES_PLUGINS_HDFS_PATH);
                HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME + this.esConfig), hdfsConfigPath, job.getConfiguration());
                HadoopUtils.shipFileIfNotShipped(hdfsConfigPath, job.getConfiguration());
                HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME + this.esPlugins), hdfsPluginsPath, job.getConfiguration());
                HadoopUtils.shipArchiveIfNotShipped(hdfsPluginsPath, job.getConfiguration());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public OutputFormat getOutputFormat() throws IOException {
        return new ElasticSearchOutputFormat();
    }

    public void prepareToWrite(RecordWriter writer) throws IOException {
        this.writer = writer;
    }

    public void putNext(Tuple t) throws IOException {
        if (!t.isNull(0)) {
            MapWritable record = new MapWritable();
            String jsonData = t.get(0).toString();
            try {
                HashMap data = (HashMap)this.mapper.readValue(jsonData, HashMap.class);
                record = (MapWritable)this.toWritable(data);
            }
            catch (JsonParseException e) {
                e.printStackTrace();
            }
            catch (JsonMappingException e) {
                e.printStackTrace();
            }
            try {
                this.writer.write((Object)NullWritable.get(), (Object)record);
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    private Writable toWritable(Object thing) {
        if (thing instanceof String) {
            return new Text((String)thing);
        }
        if (thing instanceof Long) {
            return new LongWritable(((Long)thing).longValue());
        }
        if (thing instanceof Integer) {
            return new IntWritable(((Integer)thing).intValue());
        }
        if (thing instanceof Double) {
            return new DoubleWritable(((Double)thing).doubleValue());
        }
        if (thing instanceof Float) {
            return new FloatWritable(((Float)thing).floatValue());
        }
        if (thing instanceof Map) {
            MapWritable result = new MapWritable();
            for (Map.Entry entry : ((Map)thing).entrySet()) {
                result.put((Writable)new Text(((String)entry.getKey()).toString()), this.toWritable(entry.getValue()));
            }
            return result;
        }
        if (thing instanceof List && ((List)thing).size() > 0) {
            Object first = ((List)thing).get(0);
            Writable[] listOfThings = new Writable[((List)thing).size()];
            for (int i = 0; i < listOfThings.length; ++i) {
                listOfThings[i] = this.toWritable(((List)thing).get(i));
            }
            return new ArrayWritable(this.toWritable(first).getClass(), listOfThings);
        }
        return NullWritable.get();
    }

    public void cleanupOnFailure(String location, Job job) throws IOException {
    }
}

