/*
 * Decompiled with CFR 0.152.
 */
package com.thinkaurelius.titan.hadoop;

import com.thinkaurelius.titan.CassandraStorageSetup;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreUtil;
import com.thinkaurelius.titan.diskstorage.KeyValueStoreUtil;
import com.thinkaurelius.titan.diskstorage.SimpleScanJob;
import com.thinkaurelius.titan.diskstorage.SimpleScanJobRunner;
import com.thinkaurelius.titan.diskstorage.cassandra.thrift.CassandraThriftStoreManager;
import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
import com.thinkaurelius.titan.diskstorage.util.StandardBaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProviders;
import com.thinkaurelius.titan.graphdb.TitanGraphBaseTest;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.hadoop.config.TitanHadoopConfiguration;
import com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat;
import com.thinkaurelius.titan.hadoop.scan.CassandraHadoopScanRunner;
import com.thinkaurelius.titan.hadoop.scan.HadoopScanMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraScanJobIT
extends TitanGraphBaseTest {
    private static final Logger log = LoggerFactory.getLogger(CassandraScanJobIT.class);

    @Test
    public void testSimpleScan() throws InterruptedException, ExecutionException, IOException, BackendException {
        int keys = 1000;
        int cols = 40;
        String[][] values = KeyValueStoreUtil.generateData((int)keys, (int)cols);
        for (int i = 0; i < values.length; ++i) {
            if (i % 2 != 0) continue;
            values[i] = Arrays.copyOf(values[i], cols / 2);
        }
        log.debug("Loading values: " + keys + "x" + cols);
        CassandraThriftStoreManager mgr = new CassandraThriftStoreManager((Configuration)GraphDatabaseConfiguration.buildGraphConfiguration());
        KeyColumnValueStore store = mgr.openDatabase("edgestore");
        StoreTransaction tx = mgr.beginTransaction((BaseTransactionConfig)StandardBaseTransactionConfig.of((TimestampProvider)TimestampProviders.MICRO));
        KeyColumnValueStoreUtil.loadValues((KeyColumnValueStore)store, (StoreTransaction)tx, (String[][])values);
        tx.commit();
        SimpleScanJobRunner runner = (job, jobConf, rootNSName) -> {
            try {
                return ((CassandraHadoopScanRunner)((CassandraHadoopScanRunner)new CassandraHadoopScanRunner(job).scanJobConf(jobConf)).scanJobConfRoot(rootNSName)).partitionerOverride("org.apache.cassandra.dht.Murmur3Partitioner").run();
            }
            catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        };
        SimpleScanJob.runBasicTests((int)keys, (int)cols, (SimpleScanJobRunner)runner);
    }

    @Test
    public void testPartitionedVertexScan() throws Exception {
        this.tearDown();
        CassandraScanJobIT.clearGraph((WriteConfiguration)this.getConfiguration());
        WriteConfiguration partConf = this.getConfiguration();
        this.open(partConf);
        this.mgmt.makeVertexLabel("part").partition().make();
        this.finishSchema();
        TitanVertex supernode = this.graph.addVertex("part");
        for (int i = 0; i < 128; ++i) {
            TitanVertex v = this.graph.addVertex("part");
            v.addEdge("default", (Vertex)supernode, new Object[0]);
            if (0 >= i || 0 != i % 4) continue;
            this.graph.tx().commit();
        }
        this.graph.tx().commit();
        org.apache.hadoop.conf.Configuration c = new org.apache.hadoop.conf.Configuration();
        c.set(ConfigElement.getPath((ConfigElement)TitanHadoopConfiguration.GRAPH_CONFIG_KEYS, (boolean)true, (String[])new String[0]) + ".storage.cassandra.keyspace", ((Object)((Object)this)).getClass().getSimpleName());
        c.set(ConfigElement.getPath((ConfigElement)TitanHadoopConfiguration.GRAPH_CONFIG_KEYS, (boolean)true, (String[])new String[0]) + ".storage.backend", "cassandrathrift");
        c.set("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner");
        Job job = this.getVertexJobWithDefaultMapper(c);
        Assert.assertFalse((boolean)job.waitForCompletion(true));
    }

    @Test
    public void testPartitionedVertexFilteredScan() throws Exception {
        this.tearDown();
        CassandraScanJobIT.clearGraph((WriteConfiguration)this.getConfiguration());
        WriteConfiguration partConf = this.getConfiguration();
        this.open(partConf);
        this.mgmt.makeVertexLabel("part").partition().make();
        this.finishSchema();
        TitanVertex supernode = this.graph.addVertex("part");
        for (int i = 0; i < 128; ++i) {
            TitanVertex v = this.graph.addVertex("part");
            v.addEdge("default", (Vertex)supernode, new Object[0]);
            if (0 >= i || 0 != i % 4) continue;
            this.graph.tx().commit();
        }
        this.graph.tx().commit();
        org.apache.hadoop.conf.Configuration c = new org.apache.hadoop.conf.Configuration();
        c.set(ConfigElement.getPath((ConfigElement)TitanHadoopConfiguration.GRAPH_CONFIG_KEYS, (boolean)true, (String[])new String[0]) + ".storage.cassandra.keyspace", ((Object)((Object)this)).getClass().getSimpleName());
        c.set(ConfigElement.getPath((ConfigElement)TitanHadoopConfiguration.GRAPH_CONFIG_KEYS, (boolean)true, (String[])new String[0]) + ".storage.backend", "cassandrathrift");
        c.set(ConfigElement.getPath((ConfigElement)TitanHadoopConfiguration.FILTER_PARTITIONED_VERTICES, (String[])new String[0]), "true");
        c.set("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner");
        Job job = this.getVertexJobWithDefaultMapper(c);
        Assert.assertTrue((boolean)job.waitForCompletion(true));
    }

    private Job getVertexJobWithDefaultMapper(org.apache.hadoop.conf.Configuration c) throws IOException {
        Job job = Job.getInstance((org.apache.hadoop.conf.Configuration)c);
        job.setJarByClass(HadoopScanMapper.class);
        job.setJobName("testPartitionedVertexScan");
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setInputFormatClass(CassandraInputFormat.class);
        return job;
    }

    public WriteConfiguration getConfiguration() {
        String className = ((Object)((Object)this)).getClass().getSimpleName();
        ModifiableConfiguration mc = CassandraStorageSetup.getEmbeddedConfiguration((String)className);
        return mc.getConfiguration();
    }
}

