/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.coprocessor;

import java.io.IOException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;

public class MetaDataRegionObserver
extends BaseRegionObserver {
    public static final Log LOG = LogFactory.getLog(MetaDataRegionObserver.class);
    protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    private boolean enableRebuildIndex = true;
    private long rebuildIndexTimeInterval = 10000L;

    @Override
    public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) {
        this.executor.shutdownNow();
        GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll();
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        long sleepTime = env.getConfiguration().getLong("phoenix.clock.skew.interval", 2000L);
        try {
            if (sleepTime > 0L) {
                Thread.sleep(sleepTime);
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        this.enableRebuildIndex = env.getConfiguration().getBoolean("phoenix.index.failure.handling.rebuild", true);
        this.rebuildIndexTimeInterval = env.getConfiguration().getLong("phoenix.index.failure.handling.rebuild.interval", 10000L);
    }

    @Override
    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
        if (!this.enableRebuildIndex) {
            LOG.info("Failure Index Rebuild is skipped by configuration.");
            return;
        }
        Logger deprecationLogger = Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation");
        if (deprecationLogger != null) {
            deprecationLogger.setLevel(Level.WARN);
        }
        try {
            Class.forName(PhoenixDriver.class.getName());
            BuildIndexScheduleTask task = new BuildIndexScheduleTask(e.getEnvironment());
            this.executor.scheduleAtFixedRate(task, 10000L, this.rebuildIndexTimeInterval, TimeUnit.MILLISECONDS);
        }
        catch (ClassNotFoundException ex) {
            LOG.error("BuildIndexScheduleTask cannot start!", ex);
        }
    }

    public static class BuildIndexScheduleTask
    extends TimerTask {
        private static final AtomicInteger inProgress = new AtomicInteger(0);
        RegionCoprocessorEnvironment env;

        public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
            this.env = env;
        }

        private String getJdbcUrl() {
            String zkQuorum = this.env.getConfiguration().get("hbase.zookeeper.quorum");
            String zkClientPort = this.env.getConfiguration().get("hbase.zookeeper.property.clientPort", Integer.toString(2181));
            String zkParentNode = this.env.getConfiguration().get("zookeeper.znode.parent", "/hbase");
            return "jdbc:phoenix:" + zkQuorum + ':' + zkClientPort + ':' + zkParentNode;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            InternalScanner scanner = null;
            PhoenixConnection conn = null;
            if (inProgress.get() > 0) {
                LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running");
                return;
            }
            try {
                inProgress.incrementAndGet();
                Scan scan = new Scan();
                SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L));
                filter.setFilterIfMissing(true);
                scan.setFilter(filter);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_NAME_BYTES);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
                boolean hasMore = false;
                ArrayList<Cell> results = new ArrayList<Cell>();
                scanner = this.env.getRegion().getScanner(scan);
                do {
                    results.clear();
                    hasMore = scanner.next(results);
                    if (results.isEmpty()) {
                        break;
                    }
                    Result r = Result.create(results);
                    byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
                    Long disabledTimeStampVal = 0L;
                    if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (disabledTimeStampVal = (Long)PLong.INSTANCE.toObject(disabledTimeStamp)) <= 0L) continue;
                    byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                    byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                    if (dataTable == null || dataTable.length == 0 || indexStat == null || indexStat.length == 0 || Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0 && Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0) continue;
                    byte[][] rowKeyMetaData = new byte[3][];
                    SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
                    byte[] schemaName = rowKeyMetaData[1];
                    byte[] indexTable = rowKeyMetaData[2];
                    if (indexTable == null || indexTable.length == 0) {
                        LOG.debug("Index rebuild has been skipped for row=" + r);
                        continue;
                    }
                    if (conn == null) {
                        conn = DriverManager.getConnection(this.getJdbcUrl()).unwrap(PhoenixConnection.class);
                    }
                    String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
                    String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
                    PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
                    PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
                    if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
                        LOG.debug("Index rebuild has been skipped because not all regions of index table=" + indexPTable.getName() + " are online.");
                        continue;
                    }
                    MetaDataClient client = new MetaDataClient(conn);
                    long overlapTime = this.env.getConfiguration().getLong("phoenix.index.failure.handling.rebuild.overlap.time", 300000L);
                    long timeStamp = Math.max(0L, disabledTimeStampVal - overlapTime);
                    LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
                    client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp));
                } while (hasMore);
            }
            catch (Throwable t) {
                LOG.warn("ScheduledBuildIndexTask failed!", t);
            }
            finally {
                inProgress.decrementAndGet();
                if (scanner != null) {
                    try {
                        scanner.close();
                    }
                    catch (IOException ignored) {
                        LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
                    }
                }
                if (conn != null) {
                    try {
                        conn.close();
                    }
                    catch (SQLException ignored) {
                        LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
                    }
                }
            }
        }
    }
}

