/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.hbase.index.builder;

import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;

public class IndexBuildManager
implements Stoppable {
    private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
    private final IndexBuilder delegate;
    private QuickFailingTaskRunner pool;
    private boolean stopped;
    public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
    private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
    private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY = "index.builder.threads.keepalivetime";

    public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
        this(IndexBuildManager.getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor()));
    }

    private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
        Configuration conf = e.getConfiguration();
        Class<IndexBuilder> builderClass = conf.getClass("index.builder", null, IndexBuilder.class);
        try {
            IndexBuilder builder = builderClass.newInstance();
            builder.setup(e);
            return builder;
        }
        catch (InstantiationException e1) {
            throw new IOException("Couldn't instantiate index builder:" + builderClass + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
        }
        catch (IllegalAccessException e1) {
            throw new IOException("Couldn't instantiate index builder:" + builderClass + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
        }
    }

    private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
        String serverName = env.getRegionServerServices().getServerName().getServerName();
        return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY, 10);
    }

    public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
        this.delegate = builder;
        this.pool = pool;
    }

    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(MiniBatchOperationInProgress<Mutation> miniBatchOp, Collection<? extends Mutation> mutations) throws Throwable {
        this.delegate.batchStarted(miniBatchOp);
        TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks = new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
        for (final Mutation mutation : mutations) {
            tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>(){

                @Override
                public Collection<Pair<Mutation, byte[]>> call() throws IOException {
                    return IndexBuildManager.this.delegate.getIndexUpdate(mutation);
                }
            });
        }
        List allResults = null;
        try {
            allResults = this.pool.submitUninterruptible(tasks);
        }
        catch (CancellationException cancellationException) {
            throw cancellationException;
        }
        catch (ExecutionException executionException) {
            LOG.error("Found a failed index update!");
            throw executionException.getCause();
        }
        ArrayList<Pair<Mutation, byte[]>> arrayList = new ArrayList<Pair<Mutation, byte[]>>();
        for (Collection result : allResults) {
            assert (result != null) : "Found an unsuccessful result, but didn't propagate a failure earlier";
            arrayList.addAll(result);
        }
        return arrayList;
    }

    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
        if (!this.delegate.isEnabled(delete)) {
            return null;
        }
        return this.delegate.getIndexUpdate(delete);
    }

    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered) throws IOException {
        return this.delegate.getIndexUpdateForFilteredRows(filtered);
    }

    public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
        this.delegate.batchCompleted(miniBatchOp);
    }

    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        this.delegate.batchStarted(miniBatchOp);
    }

    public boolean isEnabled(Mutation m) throws IOException {
        return this.delegate.isEnabled(m);
    }

    public byte[] getBatchId(Mutation m) {
        return this.delegate.getBatchId(m);
    }

    @Override
    public void stop(String why) {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        this.delegate.stop(why);
        this.pool.stop(why);
    }

    @Override
    public boolean isStopped() {
        return this.stopped;
    }

    public IndexBuilder getBuilderForTesting() {
        return this.delegate;
    }
}

