/*
 * Decompiled with CFR 0.152.
 */
package com.hbase.haxwell;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.hbase.haxwell.HaxwellEventExecutor;
import com.hbase.haxwell.HaxwellMetrics;
import com.hbase.haxwell.HaxwellRegionServer;
import com.hbase.haxwell.HaxwellSubscriptionImpl;
import com.hbase.haxwell.api.HaxwellEvent;
import com.hbase.haxwell.api.HaxwellEventDataExtractor;
import com.hbase.haxwell.api.HaxwellEventListener;
import com.hbase.haxwell.api.WaitPolicy;
import com.hbase.haxwell.util.CloseHelper;
import com.hbase.haxwell.util.ZookeeperHelper;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

public class HaxwellConsumer
extends HaxwellRegionServer {
    private final String subscriptionId;
    private long subscriptionTimestamp;
    private HaxwellEventListener listener;
    private final ZookeeperHelper zk;
    private final Configuration hbaseConf;
    private RpcServer rpcServer;
    private ServerName serverName;
    private ZooKeeperWatcher zkWatcher;
    private HaxwellMetrics sepMetrics;
    private final HaxwellEventDataExtractor payloadExtractor;
    private String zkNodePath;
    private List<ThreadPoolExecutor> executors;
    boolean running = false;
    private Log log = LogFactory.getLog(this.getClass());

    public HaxwellConsumer(String subscriptionId, long subscriptionTimestamp, HaxwellEventListener listener, int threadCnt, String hostName, ZookeeperHelper zk, Configuration hbaseConf) throws IOException, InterruptedException {
        this(subscriptionId, subscriptionTimestamp, listener, threadCnt, hostName, zk, hbaseConf, null);
    }

    public HaxwellConsumer(String subscriptionId, long subscriptionTimestamp, HaxwellEventListener listener, int threadCnt, String hostName, ZookeeperHelper zk, Configuration hbaseConf, HaxwellEventDataExtractor payloadExtractor) throws IOException, InterruptedException {
        Preconditions.checkArgument((threadCnt > 0 ? 1 : 0) != 0, (Object)"Thread count must be > 0");
        this.subscriptionId = HaxwellSubscriptionImpl.toInternalSubscriptionName(subscriptionId);
        this.subscriptionTimestamp = subscriptionTimestamp;
        this.listener = listener;
        this.zk = zk;
        this.hbaseConf = hbaseConf;
        this.sepMetrics = new HaxwellMetrics(subscriptionId);
        this.payloadExtractor = payloadExtractor;
        this.executors = Lists.newArrayListWithCapacity((int)threadCnt);
        InetSocketAddress initialIsa = new InetSocketAddress(hostName, 0);
        if (initialIsa.getAddress() == null) {
            throw new IllegalArgumentException("Failed resolve of " + initialIsa);
        }
        String name = "regionserver/" + initialIsa.toString();
        this.rpcServer = new RpcServer((Server)this, name, this.getServices(), initialIsa, hbaseConf, (RpcScheduler)new FifoRpcScheduler(hbaseConf, hbaseConf.getInt("hbase.regionserver.handler.count", 10)));
        this.serverName = ServerName.valueOf((String)hostName, (int)this.rpcServer.getListenerAddress().getPort(), (long)System.currentTimeMillis());
        this.zkWatcher = new ZooKeeperWatcher(hbaseConf, this.serverName.toString(), null);
        ZKUtil.loginClient((Configuration)hbaseConf, (String)"hbase.zookeeper.client.keytab.file", (String)"hbase.zookeeper.client.kerberos.principal", (String)hostName);
        User.login((Configuration)hbaseConf, (String)"hbase.regionserver.keytab.file", (String)"hbase.regionserver.kerberos.principal", (String)hostName);
        for (int i = 0; i < threadCnt; ++i) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
            executor.setRejectedExecutionHandler(new WaitPolicy());
            this.executors.add(executor);
        }
    }

    public void start() throws IOException, InterruptedException, KeeperException {
        this.rpcServer.start();
        this.zkNodePath = this.hbaseConf.get("haxwell.zookeeper.znode.parent", "/haxwell/hbase-slave") + "/" + this.subscriptionId + "/rs/" + this.serverName.getServerName();
        this.zk.create(this.zkNodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        this.running = true;
    }

    private List<RpcServer.BlockingServiceAndInterface> getServices() {
        ArrayList<RpcServer.BlockingServiceAndInterface> bssi = new ArrayList<RpcServer.BlockingServiceAndInterface>(1);
        bssi.add(new RpcServer.BlockingServiceAndInterface(AdminProtos.AdminService.newReflectiveBlockingService((AdminProtos.AdminService.BlockingInterface)this), AdminProtos.AdminService.BlockingInterface.class));
        return bssi;
    }

    public void stop() {
        block4: {
            CloseHelper.close((Closeable)this.zkWatcher);
            if (this.running) {
                this.running = false;
                CloseHelper.close(this.rpcServer);
                try {
                    this.zk.delete(this.zkNodePath, -1);
                }
                catch (Exception e) {
                    this.log.debug((Object)"Exception while removing zookeeper node", (Throwable)e);
                    if (!(e instanceof InterruptedException)) break block4;
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.sepMetrics.shutdown();
        for (ThreadPoolExecutor executor : this.executors) {
            executor.shutdown();
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override
    public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController controller, AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
        try {
            long lastProcessedTimestamp = -1L;
            HaxwellEventExecutor eventExecutor = new HaxwellEventExecutor(this.listener, this.executors, 100, this.sepMetrics);
            List entries = request.getEntryList();
            CellScanner cells = ((PayloadCarryingRpcController)controller).cellScanner();
            for (AdminProtos.WALEntry entry : entries) {
                TableName tableName = entry.getKey().getWriteTime() < this.subscriptionTimestamp ? null : TableName.valueOf((byte[])entry.getKey().getTableName().toByteArray());
                ArrayListMultimap keyValuesPerRowKey = ArrayListMultimap.create();
                HashMap payloadPerRowKey = Maps.newHashMap();
                int count = entry.getAssociatedCellCount();
                for (int i = 0; i < count; ++i) {
                    byte[] payload;
                    if (!cells.advance()) {
                        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
                    }
                    if (tableName == null) continue;
                    Cell cell = cells.current();
                    ByteBuffer rowKey = ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    KeyValue kv = KeyValueUtil.ensureKeyValue((Cell)cell);
                    if (this.payloadExtractor != null && (payload = this.payloadExtractor.extract(tableName.toBytes(), kv)) != null) {
                        if (payloadPerRowKey.containsKey(rowKey)) {
                            this.log.error((Object)("Multiple payloads encountered for row " + Bytes.toStringBinary((ByteBuffer)rowKey) + ", choosing " + Bytes.toStringBinary((byte[])((byte[])payloadPerRowKey.get(rowKey)))));
                        } else {
                            payloadPerRowKey.put(rowKey, payload);
                        }
                    }
                    keyValuesPerRowKey.put((Object)rowKey, (Object)kv);
                }
                for (ByteBuffer rowKeyBuffer : keyValuesPerRowKey.keySet()) {
                    List keyValues = (List)keyValuesPerRowKey.get((Object)rowKeyBuffer);
                    HaxwellEvent sepEvent = new HaxwellEvent(tableName.toBytes(), CellUtil.cloneRow((Cell)((Cell)keyValues.get(0))), keyValues, (byte[])payloadPerRowKey.get(rowKeyBuffer));
                    eventExecutor.scheduleHaxwellEvent(sepEvent);
                    lastProcessedTimestamp = Math.max(lastProcessedTimestamp, entry.getKey().getWriteTime());
                }
            }
            List<Future<?>> futures = eventExecutor.flush();
            this.waitOnHaxwellSubscriptionCompletion(futures);
            if (lastProcessedTimestamp > 0L) {
                this.sepMetrics.reportSepTimestamp(lastProcessedTimestamp);
            }
            return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
        }
        catch (IOException ie) {
            throw new ServiceException((Throwable)ie);
        }
    }

    private void waitOnHaxwellSubscriptionCompletion(List<Future<?>> futures) throws IOException {
        ArrayList exceptionsThrown = Lists.newArrayList();
        for (Future<?> future : futures) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted in processing events.", e);
            }
            catch (Exception e) {
                this.log.warn((Object)"Error processing a batch of SEP events, the error will be forwarded to HBase for retry", (Throwable)e);
                exceptionsThrown.add(e);
            }
        }
        if (!exceptionsThrown.isEmpty()) {
            this.log.error((Object)("Encountered exceptions on " + exceptionsThrown.size() + " batches (out of " + futures.size() + " total batches)"));
            throw new RuntimeException((Throwable)exceptionsThrown.get(0));
        }
    }

    @Override
    public Configuration getConfiguration() {
        return this.hbaseConf;
    }

    @Override
    public ServerName getServerName() {
        return this.serverName;
    }

    @Override
    public ZooKeeperWatcher getZooKeeper() {
        return this.zkWatcher;
    }
}

