/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.iterate;

import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.iterate.BaseResultIterators;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.monitoring.PhoenixMetrics;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelIterators
extends BaseResultIterators {
    private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
    private static final String NAME = "PARALLEL";
    private final ParallelIteratorFactory iteratorFactory;

    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper) throws SQLException {
        super(plan, perScanLimit, scanGrouper);
        this.iteratorFactory = iteratorFactory;
    }

    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory) throws SQLException {
        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance());
    }

    @Override
    protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan, Future<PeekingResultIterator>>>> nestedFutures, final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) {
        ThreadPoolExecutor executor = this.context.getConnection().getQueryServices().getExecutor();
        ArrayList<BaseResultIterators.ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
        for (int i = 0; i < nestedScans.size(); ++i) {
            List<Scan> scans = nestedScans.get(i);
            ArrayList<Object> futures = Lists.newArrayListWithExpectedSize(scans.size());
            nestedFutures.add(futures);
            for (int j = 0; j < scans.size(); ++j) {
                Scan scan = nestedScans.get(i).get(j);
                scanLocations.add(new BaseResultIterators.ScanLocator(scan, i, j));
                futures.add(null);
            }
        }
        Collections.shuffle(scanLocations);
        PhoenixMetrics.SizeMetric.PARALLEL_SCANS.update(scanLocations.size());
        for (BaseResultIterators.ScanLocator scanLocation : scanLocations) {
            final Scan scan = scanLocation.getScan();
            Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobManager.JobCallable<PeekingResultIterator>(){

                @Override
                public PeekingResultIterator call() throws Exception {
                    long startTime = System.currentTimeMillis();
                    TableResultIterator scanner = new TableResultIterator(ParallelIterators.this.context, ParallelIterators.this.tableRef, scan);
                    if (logger.isDebugEnabled()) {
                        logger.debug(LogUtil.addCustomAnnotations("Id: " + ParallelIterators.this.scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                    }
                    PeekingResultIterator iterator = ParallelIterators.this.iteratorFactory.newIterator(ParallelIterators.this.context, scanner, scan);
                    iterator.peek();
                    allIterators.add(iterator);
                    return iterator;
                }

                @Override
                public Object getJobId() {
                    return ParallelIterators.this;
                }
            }, "Parallel scanner for table: " + this.tableRef.getTable().getName().getString()));
            nestedFutures.get(scanLocation.getOuterListIndex()).set(scanLocation.getInnerListIndex(), new Pair<Scan, Future<PeekingResultIterator>>(scan, future));
        }
    }

    @Override
    protected String getName() {
        return NAME;
    }
}

