/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.producer.hbase;

import com.flipkart.aesop.runtime.producer.AbstractEventProducer;
import com.flipkart.aesop.runtime.producer.hbase.SepEventMapper;
import com.linkedin.databus.core.DbusEventInfo;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import com.ngdata.sep.EventListener;
import com.ngdata.sep.SepEvent;
import com.ngdata.sep.impl.SepConsumer;
import com.ngdata.sep.impl.SepModelImpl;
import com.ngdata.sep.util.zookeeper.ZkUtil;
import com.ngdata.sep.util.zookeeper.ZooKeeperItf;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.trpr.platform.core.PlatformException;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class HBaseEventProducer<T extends GenericRecord>
extends AbstractEventProducer
implements InitializingBean {
    private static final Logger LOGGER = LogFactory.getLogger(HBaseEventProducer.class);
    private static final String HBASE_REPLICATION_CONFIG = "hbase.replication";
    private static final String ZK_QUORUM_CONFIG = "hbase.zookeeper.quorum";
    private static final String ZK_CLIENT_PORT_CONFIG = "hbase.zookeeper.property.clientPort";
    private static final String LOCAL_HOST_NAME = "localhost";
    private static final int ZK_CLIENT_PORT = 2181;
    private static final int ZK_SESSION_TIMEOUT = 20000;
    private static final int WORKER_THREADS = 1;
    protected SepConsumer sepConsumer;
    private String localHost;
    protected String zkQuorum;
    protected int zkClientPort = 2181;
    protected Integer zkSessionTimeout = 20000;
    protected int workerThreads = 1;
    protected SepEventMapper<T> sepEventMapper;
    private volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);

    public void afterPropertiesSet() throws Exception {
        Assert.notNull((Object)this.zkQuorum, (String)"'zkQuorum' cannot be null. Zookeeper quorum list must be specified. This HBase Events producer will not be initialized");
        if (this.zkQuorum.contains(":")) {
            throw new IllegalStateException("'zkQuorum' is comma separated list of only hosts. Specify port using 'zkClientPort' : " + this.zkQuorum);
        }
        Assert.notNull(this.sepEventMapper, (String)"'sepEventMapper' cannot be null. No WAL edits event mapper found. This HBase Events producer will not be initialized");
        this.localHost = this.zkQuorum.contains(LOCAL_HOST_NAME) ? LOCAL_HOST_NAME : InetAddress.getLocalHost().getHostName();
    }

    public void start(long sinceSCN) {
        this.shutdownRequested.set(false);
        this.sinceSCN.set(sinceSCN);
        LOGGER.info("Starting SEP subscription : " + this.getName());
        LOGGER.info("ZK quorum hosts : " + this.zkQuorum);
        LOGGER.info("ZK client port : " + this.zkClientPort);
        LOGGER.info("Using hostname to bind to : " + this.localHost);
        LOGGER.info("Using worker threads : " + this.workerThreads);
        LOGGER.info("Listening to WAL edits from : " + this.sinceSCN);
        try {
            String[] zkHostsList;
            Configuration hbaseConf = HBaseConfiguration.create();
            hbaseConf.setBoolean(HBASE_REPLICATION_CONFIG, true);
            hbaseConf.set(ZK_QUORUM_CONFIG, this.zkQuorum);
            hbaseConf.setInt(ZK_CLIENT_PORT_CONFIG, this.zkClientPort);
            StringBuilder zkQuorumWithPort = new StringBuilder();
            for (String zkHost : zkHostsList = this.zkQuorum.split(",")) {
                zkQuorumWithPort.append(zkHost);
                zkQuorumWithPort.append(":");
                zkQuorumWithPort.append(this.zkClientPort);
                zkQuorumWithPort.append(",");
            }
            LOGGER.info("ZK util connect string (host:port) : " + zkQuorumWithPort.toString());
            ZooKeeperItf zk = ZkUtil.connect((String)zkQuorumWithPort.toString(), (int)this.zkSessionTimeout);
            StringBuilder hbaseConfBuilder = new StringBuilder();
            for (Map.Entry entry : hbaseConf) {
                if (!((String)entry.getKey()).equalsIgnoreCase(HBASE_REPLICATION_CONFIG) && !((String)entry.getKey()).equalsIgnoreCase(ZK_QUORUM_CONFIG) && !((String)entry.getKey()).equalsIgnoreCase(ZK_CLIENT_PORT_CONFIG)) continue;
                hbaseConfBuilder.append((String)entry.getKey());
                hbaseConfBuilder.append(":");
                hbaseConfBuilder.append((String)entry.getValue());
                hbaseConfBuilder.append(",");
            }
            LOGGER.info("SEP Model Hbase configuration = " + hbaseConfBuilder.toString());
            SepModelImpl sepModel = new SepModelImpl(zk, hbaseConf);
            String subscriptionName = this.getName();
            if (!sepModel.hasSubscription(subscriptionName)) {
                sepModel.addSubscriptionSilent(subscriptionName);
            }
            this.sepConsumer = new SepConsumer(subscriptionName, this.sinceSCN.get(), (EventListener)new RelayAppender(), this.workerThreads, this.localHost, zk, hbaseConf);
            this.sepConsumer.start();
        }
        catch (Exception e) {
            LOGGER.error("Error starting WAL edits consumer. Producer not started!. Error message : " + e.getMessage(), (Throwable)e);
        }
    }

    public void shutdown() {
        LOGGER.info("Shutdown has been requested. HBaseEventProducer shutting down");
        this.shutdownRequested.set(true);
        this.sepConsumer.stop();
        super.shutdown();
        LOGGER.info("HBaseEventProducer shut down complete");
    }

    public boolean isPaused() {
        return !this.isRunning();
    }

    public boolean isRunning() {
        return this.sepConsumer.isRunning();
    }

    public void pause() {
        throw new UnsupportedOperationException("'pause' is not supported on this event producer");
    }

    public void unpause() {
        throw new UnsupportedOperationException("'unpause' is not supported on this event producer");
    }

    public void waitForShutdown() throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown' is not supported on this event producer");
    }

    public void waitForShutdown(long time) throws InterruptedException, IllegalStateException {
        throw new UnsupportedOperationException("'waitForShutdown(long time)' is not supported on this event producer");
    }

    public String getZkQuorum() {
        return this.zkQuorum;
    }

    public void setZkQuorum(String zkQuorum) {
        this.zkQuorum = zkQuorum;
    }

    public Integer getZkClientPort() {
        return this.zkClientPort;
    }

    public void setZkClientPort(int zkClientPort) {
        this.zkClientPort = zkClientPort;
    }

    public Integer getZkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    public void setZkSessionTimeout(Integer zkSessionTimeout) {
        this.zkSessionTimeout = zkSessionTimeout;
    }

    public int getWorkerThreads() {
        return this.workerThreads;
    }

    public void setWorkerThreads(int workerThreads) {
        this.workerThreads = workerThreads;
    }

    public SepEventMapper<T> getSepEventMapper() {
        return this.sepEventMapper;
    }

    public void setSepEventMapper(SepEventMapper<T> sepEventMapper) {
        this.sepEventMapper = sepEventMapper;
    }

    class RelayAppender
    implements EventListener {
        RelayAppender() {
        }

        public void processEvents(List<SepEvent> sepEvents) {
            if (HBaseEventProducer.this.shutdownRequested.get()) {
                return;
            }
            long lastSavedSCN = HBaseEventProducer.this.sinceSCN.get();
            HBaseEventProducer.this.eventBuffer.startEvents();
            for (SepEvent sepEvent : sepEvents) {
                Object changeEvent = HBaseEventProducer.this.sepEventMapper.mapSepEvent(sepEvent);
                byte[] schemaId = SchemaHelper.getSchemaId((String)changeEvent.getSchema().toString());
                byte[] serializedEvent = HBaseEventProducer.this.serializeEvent(changeEvent);
                long earliestKVTimestamp = Long.MAX_VALUE;
                for (KeyValue kv : sepEvent.getKeyValues()) {
                    earliestKVTimestamp = Math.min(earliestKVTimestamp, kv.getTimestamp());
                }
                DbusEventKey eventKey = new DbusEventKey(sepEvent.getRow());
                DbusEventInfo eventInfo = new DbusEventInfo(DbusOpcode.UPSERT, earliestKVTimestamp, (short)HBaseEventProducer.this.physicalSourceStaticConfig.getId(), (short)HBaseEventProducer.this.physicalSourceStaticConfig.getId(), System.nanoTime(), HBaseEventProducer.this.physicalSourceStaticConfig.getSources()[0].getId(), schemaId, serializedEvent, false, true);
                HBaseEventProducer.this.eventBuffer.appendEvent(eventKey, eventInfo, HBaseEventProducer.this.dbusEventsStatisticsCollector);
                HBaseEventProducer.this.sinceSCN.set(Math.max(lastSavedSCN, earliestKVTimestamp));
            }
            HBaseEventProducer.this.eventBuffer.endEvents(HBaseEventProducer.this.sinceSCN.get(), HBaseEventProducer.this.dbusEventsStatisticsCollector);
            try {
                HBaseEventProducer.this.maxScnReaderWriter.saveMaxScn(HBaseEventProducer.this.sinceSCN.get());
            }
            catch (DatabusException e) {
                LOGGER.error("Unable to persist last processed SCN. SCN value is stale. Error is : " + e.getMessage(), (Throwable)e);
                throw new PlatformException("Unable to write last processed SCN to log. Signalling for re-delivery of WAL edits from : " + lastSavedSCN);
            }
            LOGGER.debug("Processed SEP event count : " + sepEvents.size());
        }
    }
}

