/*
 * Decompiled with CFR 0.152.
 */
package com.hbase.haxwell;

import com.google.common.collect.ImmutableMap;
import com.hbase.haxwell.KafkaMessageProducer;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.JsonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HaxwellRegionObserver
extends BaseRegionObserver {
    private static final Logger log = LoggerFactory.getLogger(HaxwellRegionObserver.class);
    private KafkaMessageProducer messageProducer;
    private String topic;
    private boolean enableDelete;

    public void start(CoprocessorEnvironment e) throws IOException {
        String brokers = e.getConfiguration().get("haxwell.kafka.bootstrap.servers");
        String acks = e.getConfiguration().get("haxwell.kafka.acks", "all");
        int retries = e.getConfiguration().getInt("haxwell.kafka.retries", 3);
        int batchSize = e.getConfiguration().getInt("haxwell.kafka.batch.size", 1);
        int autoCommitInterval = e.getConfiguration().getInt("haxwell.kafka.autocommit.interval.ms", 30000);
        this.topic = e.getConfiguration().get("haxwell.kafka.topic");
        this.enableDelete = e.getConfiguration().getBoolean("haxwell.delete.enable", false);
        this.messageProducer = new KafkaMessageProducer(brokers, acks, retries, batchSize, autoCommitInterval);
    }

    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        this.sendData(e, put.getId(), put.toMap(Integer.MAX_VALUE), "PUT");
    }

    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        if (this.enableDelete) {
            this.sendData(e, delete.getId(), delete.toMap(Integer.MAX_VALUE), "DELETE");
        }
    }

    private void sendData(ObserverContext<RegionCoprocessorEnvironment> e, String id, Map<String, Object> data, String operation) throws IOException {
        String tableName = ((RegionCoprocessorEnvironment)e.getEnvironment()).getRegion().getTableDesc().getNameAsString();
        if (log.isDebugEnabled()) {
            log.debug("Table Name: {} | Data: {}", (Object)tableName, (Object)JsonMapper.writeMapAsString(data));
        }
        try {
            this.messageProducer.send(this.topic, id, JsonMapper.writeMapAsString((Map)ImmutableMap.builder().put((Object)"table", (Object)tableName).put((Object)"operation", (Object)operation).put((Object)"id", (Object)id).put((Object)"data", data).build()));
        }
        catch (Exception ex) {
            log.error("Error sending event to Kafka", (Throwable)ex);
            throw new IOException(ex);
        }
    }
}

