/*
 * Decompiled with CFR 0.152.
 */
package org.pingles.cascading.neo4j.hadoop;

import java.io.IOException;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.neo4j.graphdb.Transaction;
import org.neo4j.rest.graphdb.RestGraphDatabase;
import org.pingles.cascading.neo4j.hadoop.Neo4jWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class Neo4jRecordWriter<K, V extends Neo4jWritable>
implements RecordWriter<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jRecordWriter.class);
    private RestGraphDatabase database;
    private Transaction transaction;
    private String restConnectionString;
    private int recordsInBatch = 0;
    private long numberOfCommittedTransactions = 0L;
    private final int batchSize;

    public Neo4jRecordWriter(String restConnectionString, int batchSize) {
        this.restConnectionString = restConnectionString;
        this.batchSize = batchSize;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Creating Neo4jRecordWriter to connect to {}", (Object)restConnectionString);
            LOGGER.info("Batch size: {}", (Object)this.getBatchSize());
        }
    }

    public void write(K k, V v) throws IOException {
        this.ensureTransaction();
        v.store(this.database);
        ++this.recordsInBatch;
    }

    private void ensureTransaction() {
        if (this.shouldCreateNewTransaction()) {
            if (this.existingTransaction()) {
                this.commitTransaction();
            }
            this.transaction = this.database().beginTx();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("Created new transaction: %s", this.transaction.getClass().toString()));
            }
        }
    }

    public void close(Reporter reporter) throws IOException {
        if (this.existingTransaction()) {
            this.commitTransaction();
        }
        this.database().shutdown();
        reporter.incrCounter("org.pingles.cascading.neo4j", "Batch transactions committed", this.numberOfCommittedTransactions);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Closing writer. Committed %d transactions.", this.numberOfCommittedTransactions));
        }
    }

    private boolean shouldCreateNewTransaction() {
        return this.transaction == null || this.recordsInBatch % this.getBatchSize() == 0;
    }

    private boolean existingTransaction() {
        return this.transaction != null;
    }

    private void commitTransaction() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Committing transaction with %d records.", this.recordsInBatch));
        }
        this.transaction.success();
        this.transaction.finish();
        ++this.numberOfCommittedTransactions;
        this.recordsInBatch = 0;
    }

    private RestGraphDatabase database() {
        if (this.database == null) {
            this.database = new RestGraphDatabase(this.restConnectionString);
        }
        return this.database;
    }

    private int getBatchSize() {
        return this.batchSize;
    }
}

