/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.coprocessor;

import com.google.common.collect.Maps;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillableGroupByCache;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.GroupByCache;
import org.apache.phoenix.coprocessor.HashJoinRegionScanner;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TupleUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupedAggregateRegionObserver
extends BaseScannerRegionObserver {
    private static final Logger logger = LoggerFactory.getLogger(GroupedAggregateRegionObserver.class);
    public static final int MIN_DISTINCT_VALUES = 100;

    @Override
    protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, RegionScanner s) throws IOException {
        boolean keyOrdered = false;
        byte[] expressionBytes = scan.getAttribute("_UnorderedGroupByExpressions");
        if (expressionBytes == null) {
            expressionBytes = scan.getAttribute("_OrderedGroupByExpressions");
            keyOrdered = true;
        }
        int offset = 0;
        if (ScanUtil.isLocalIndex(scan)) {
            Region region = c.getEnvironment().getRegion();
            offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : region.getRegionInfo().getEndKey().length;
            ScanUtil.setRowKeyOffset(scan, offset);
        }
        List<Expression> expressions = this.deserializeGroupByExpressions(expressionBytes, 0);
        ServerAggregators aggregators = ServerAggregators.deserialize(scan.getAttribute("_Aggs"), c.getEnvironment().getConfiguration());
        RegionScanner innerScanner = s;
        byte[] localIndexBytes = scan.getAttribute("_LocalIndexBuild");
        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
        TupleProjector tupleProjector = null;
        byte[][] viewConstants = null;
        ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
        TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
        HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
        if (ScanUtil.isLocalIndex(scan) || j == null && p != null) {
            if (dataColumns != null) {
                tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
                viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
            }
            ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
            innerScanner = this.getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
        }
        if (j != null) {
            innerScanner = new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
        }
        long limit = Long.MAX_VALUE;
        byte[] limitBytes = scan.getAttribute("_GroupByLimit");
        if (limitBytes != null) {
            limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
        }
        if (keyOrdered) {
            return this.scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
        }
        return this.scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
    }

    public static long sizeOfUnorderedGroupByMap(int nRows, int valueSize) {
        return SizedUtil.sizeOfMap(nRows, 48, valueSize);
    }

    public static void serializeIntoScan(Scan scan, String attribName, List<Expression> groupByExpressions) {
        ByteArrayOutputStream stream = new ByteArrayOutputStream(Math.max(1, groupByExpressions.size() * 10));
        try {
            if (groupByExpressions.isEmpty()) {
                stream.write(QueryConstants.TRUE);
            } else {
                DataOutputStream output = new DataOutputStream(stream);
                for (Expression expression : groupByExpressions) {
                    WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
                    expression.write(output);
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        finally {
            try {
                stream.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        scan.setAttribute(attribName, stream.toByteArray());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Expression> deserializeGroupByExpressions(byte[] expressionBytes, int offset) throws IOException {
        ArrayList<Expression> expressions = new ArrayList<Expression>(3);
        ByteArrayInputStream stream = new ByteArrayInputStream(expressionBytes);
        try {
            DataInputStream input = new DataInputStream(stream);
            try {
                while (true) {
                    int expressionOrdinal = WritableUtils.readVInt(input);
                    Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
                    expression.readFields(input);
                    if (offset != 0) {
                        IndexUtil.setRowKeyExpressionOffset(expression, offset);
                    }
                    expressions.add(expression);
                }
            }
            catch (EOFException e) {
                stream.close();
            }
        }
        catch (Throwable throwable) {
            stream.close();
            throw throwable;
        }
        return expressions;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, RegionScanner scanner, List<Expression> expressions, ServerAggregators aggregators, long limit) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
        }
        RegionCoprocessorEnvironment env = c.getEnvironment();
        Configuration conf = env.getConfiguration();
        int estDistVals = conf.getInt("phoenix.groupby.estimatedDistinctValues", 1000);
        byte[] estDistValsBytes = scan.getAttribute("_EstDistinctValues");
        if (estDistValsBytes != null) {
            estDistVals = Math.max(100, (int)((float)Bytes.toInt(estDistValsBytes) * 1.5f));
        }
        boolean spillableEnabled = conf.getBoolean("phoenix.groupby.spillable", QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE);
        GroupByCache groupByCache = GroupByCacheFactory.INSTANCE.newCache(env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals);
        boolean success = false;
        try {
            MultiKeyValueTuple result = new MultiKeyValueTuple();
            if (logger.isDebugEnabled()) {
                logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
            }
            Region region = c.getEnvironment().getRegion();
            region.startRegionOperation();
            try {
                RegionScanner regionScanner = scanner;
                synchronized (regionScanner) {
                    boolean hasMore;
                    do {
                        ArrayList<Cell> results = new ArrayList<Cell>();
                        hasMore = scanner.nextRaw(results);
                        if (results.isEmpty()) continue;
                        result.setKeyValues(results);
                        ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, expressions);
                        Aggregator[] rowAggregators = groupByCache.cache(key);
                        aggregators.aggregate(rowAggregators, result);
                    } while (hasMore && groupByCache.size() < limit);
                }
            }
            finally {
                region.closeRegionOperation();
            }
            RegionScanner regionScanner = groupByCache.getScanner(scanner);
            success = true;
            RegionScanner regionScanner2 = regionScanner;
            return regionScanner2;
        }
        finally {
            if (!success) {
                Closeables.closeQuietly(groupByCache);
            }
        }
    }

    private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner scanner, final List<Expression> expressions, final ServerAggregators aggregators, final long limit) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
        }
        return new BaseRegionScanner(){
            private long rowCount = 0L;
            private ImmutableBytesWritable currentKey = null;

            @Override
            public HRegionInfo getRegionInfo() {
                return scanner.getRegionInfo();
            }

            @Override
            public void close() throws IOException {
                scanner.close();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean next(List<Cell> results) throws IOException {
                boolean atLimit;
                boolean hasMore;
                boolean aggBoundary = false;
                MultiKeyValueTuple result = new MultiKeyValueTuple();
                ImmutableBytesPtr key = null;
                Aggregator[] rowAggregators = aggregators.getAggregators();
                int countOffset = rowAggregators.length == 0 ? 1 : 0;
                Region region = ((RegionCoprocessorEnvironment)c.getEnvironment()).getRegion();
                region.startRegionOperation();
                try {
                    RegionScanner regionScanner = scanner;
                    synchronized (regionScanner) {
                        do {
                            ArrayList<Cell> kvs = new ArrayList<Cell>();
                            hasMore = scanner.nextRaw(kvs);
                            if (!kvs.isEmpty()) {
                                result.setKeyValues(kvs);
                                key = TupleUtil.getConcatenatedValue(result, expressions);
                                boolean bl = aggBoundary = this.currentKey != null && this.currentKey.compareTo(key) != 0;
                                if (!aggBoundary) {
                                    aggregators.aggregate(rowAggregators, result);
                                    if (logger.isDebugEnabled()) {
                                        logger.debug(LogUtil.addCustomAnnotations("Row passed filters: " + kvs + ", aggregated values: " + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan)));
                                    }
                                    this.currentKey = key;
                                }
                            }
                            boolean bl = atLimit = this.rowCount + (long)countOffset >= limit;
                        } while (hasMore && !aggBoundary && !atLimit);
                    }
                }
                finally {
                    region.closeRegionOperation();
                }
                if (this.currentKey != null) {
                    byte[] value = aggregators.toBytes(rowAggregators);
                    KeyValue keyValue = KeyValueUtil.newKeyValue(this.currentKey.get(), this.currentKey.getOffset(), this.currentKey.getLength(), QueryConstants.SINGLE_COLUMN_FAMILY, QueryConstants.SINGLE_COLUMN, Long.MAX_VALUE, value, 0, value.length);
                    results.add(keyValue);
                    if (logger.isDebugEnabled()) {
                        logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate row: " + keyValue + ",for current key " + Bytes.toStringBinary(this.currentKey.get(), this.currentKey.getOffset(), this.currentKey.getLength()) + ", aggregated values: " + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan)));
                    }
                    if (aggBoundary) {
                        aggregators.reset(rowAggregators);
                        aggregators.aggregate(rowAggregators, result);
                        this.currentKey = key;
                        ++this.rowCount;
                        atLimit |= this.rowCount >= limit;
                    }
                }
                if (!atLimit && (hasMore || aggBoundary)) {
                    return true;
                }
                this.currentKey = null;
                return false;
            }

            @Override
            public long getMaxResultSize() {
                return scanner.getMaxResultSize();
            }

            @Override
            public int getBatch() {
                return scanner.getBatch();
            }
        };
    }

    @Override
    protected boolean isRegionObserverFor(Scan scan) {
        return scan.getAttribute("_UnorderedGroupByExpressions") != null || scan.getAttribute("_OrderedGroupByExpressions") != null;
    }

    private static final class GroupByCacheFactory {
        public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory();

        private GroupByCacheFactory() {
        }

        GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) {
            Configuration conf = env.getConfiguration();
            boolean spillableEnabled = conf.getBoolean("phoenix.groupby.spillable", QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE);
            if (spillableEnabled) {
                return new SpillableGroupByCache(env, tenantId, aggregators, estDistVals);
            }
            return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals);
        }
    }

    private static final class InMemoryGroupByCache
    implements GroupByCache {
        private final MemoryManager.MemoryChunk chunk;
        private final Map<ImmutableBytesPtr, Aggregator[]> aggregateMap;
        private final ServerAggregators aggregators;
        private final RegionCoprocessorEnvironment env;
        private final byte[] customAnnotations;
        private int estDistVals;

        InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) {
            int estValueSize = aggregators.getEstimatedByteSize();
            long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(estDistVals, estValueSize);
            TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
            this.env = env;
            this.estDistVals = estDistVals;
            this.aggregators = aggregators;
            this.aggregateMap = Maps.newHashMapWithExpectedSize(estDistVals);
            this.chunk = tenantCache.getMemoryManager().allocate(estSize);
            this.customAnnotations = customAnnotations;
        }

        @Override
        public void close() throws IOException {
            this.chunk.close();
        }

        @Override
        public Aggregator[] cache(ImmutableBytesWritable cacheKey) {
            ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey);
            Aggregator[] rowAggregators = this.aggregateMap.get(key);
            if (rowAggregators == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate bucket for row key " + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()), this.customAnnotations));
                }
                rowAggregators = this.aggregators.newAggregators(this.env.getConfiguration());
                this.aggregateMap.put(key, rowAggregators);
                if (this.aggregateMap.size() > this.estDistVals) {
                    this.estDistVals = (int)((float)this.estDistVals * 1.5f);
                    long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(this.estDistVals, this.aggregators.getEstimatedByteSize());
                    this.chunk.resize(estSize);
                }
            }
            return rowAggregators;
        }

        @Override
        public RegionScanner getScanner(final RegionScanner s) {
            long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(this.aggregateMap.size(), this.aggregators.getEstimatedByteSize());
            this.chunk.resize(estSize);
            final ArrayList<KeyValue> aggResults = new ArrayList<KeyValue>(this.aggregateMap.size());
            for (Map.Entry<ImmutableBytesPtr, Aggregator[]> entry : this.aggregateMap.entrySet()) {
                ImmutableBytesPtr key = entry.getKey();
                Aggregator[] rowAggregators = entry.getValue();
                byte[] value = this.aggregators.toBytes(rowAggregators);
                if (logger.isDebugEnabled()) {
                    logger.debug(LogUtil.addCustomAnnotations("Adding new distinct group: " + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) + " with aggregators " + Arrays.asList(rowAggregators).toString() + " value = " + Bytes.toStringBinary(value), this.customAnnotations));
                }
                KeyValue keyValue = KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), QueryConstants.SINGLE_COLUMN_FAMILY, QueryConstants.SINGLE_COLUMN, Long.MAX_VALUE, value, 0, value.length);
                aggResults.add(keyValue);
            }
            return new BaseRegionScanner(){
                private int index = 0;

                @Override
                public HRegionInfo getRegionInfo() {
                    return s.getRegionInfo();
                }

                @Override
                public void close() throws IOException {
                    try {
                        s.close();
                    }
                    finally {
                        InMemoryGroupByCache.this.close();
                    }
                }

                @Override
                public boolean next(List<Cell> results) throws IOException {
                    if (this.index >= aggResults.size()) {
                        return false;
                    }
                    results.add((Cell)aggResults.get(this.index));
                    ++this.index;
                    return this.index < aggResults.size();
                }

                @Override
                public long getMaxResultSize() {
                    return s.getMaxResultSize();
                }

                @Override
                public int getBatch() {
                    return s.getBatch();
                }
            };
        }

        @Override
        public long size() {
            return this.aggregateMap.size();
        }
    }
}

