/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.common;

import com.google.common.net.HostAndPort;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.common.ZookeeperConnector;
import com.pinterest.secor.message.Message;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaClient {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClient.class);
    private SecorConfig mConfig;
    private ZookeeperConnector mZookeeperConnector;

    public KafkaClient(SecorConfig config) {
        this.mConfig = config;
        this.mZookeeperConnector = new ZookeeperConnector(this.mConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HostAndPort findLeader(TopicPartition topicPartition) {
        SimpleConsumer consumer = null;
        try {
            LOG.info("looking up leader for topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition());
            consumer = new SimpleConsumer(this.mConfig.getKafkaSeedBrokerHost(), this.mConfig.getKafkaSeedBrokerPort(), 100000, 65536, "leaderLookup");
            ArrayList<String> topics = new ArrayList<String>();
            topics.add(topicPartition.getTopic());
            TopicMetadataRequest request = new TopicMetadataRequest(topics);
            TopicMetadataResponse response = consumer.send(request);
            List metaData = response.topicsMetadata();
            for (TopicMetadata item : metaData) {
                for (PartitionMetadata part : item.partitionsMetadata()) {
                    if (part.partitionId() != topicPartition.getPartition()) continue;
                    HostAndPort hostAndPort = HostAndPort.fromParts((String)part.leader().host(), (int)part.leader().port());
                    return hostAndPort;
                }
            }
        }
        finally {
            if (consumer != null) {
                consumer.close();
            }
        }
        return null;
    }

    private static String getClientName(TopicPartition topicPartition) {
        return "secorClient_" + topicPartition.getTopic() + "_" + topicPartition.getPartition();
    }

    private long findLastOffset(TopicPartition topicPartition, SimpleConsumer consumer) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topicPartition.getTopic(), topicPartition.getPartition());
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
        String clientName = KafkaClient.getClientName(topicPartition);
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            throw new RuntimeException("Error fetching offset data. Reason: " + response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()));
        }
        long[] offsets = response.offsets(topicPartition.getTopic(), topicPartition.getPartition());
        return offsets[0] - 1L;
    }

    private Message getMessage(TopicPartition topicPartition, long offset, SimpleConsumer consumer) {
        LOG.info("fetching message topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition() + " offset " + offset);
        int MAX_MESSAGE_SIZE_BYTES = this.mConfig.getMaxMessageSizeBytes();
        String clientName = KafkaClient.getClientName(topicPartition);
        FetchRequest request = new FetchRequestBuilder().clientId(clientName).addFetch(topicPartition.getTopic(), topicPartition.getPartition(), offset, MAX_MESSAGE_SIZE_BYTES).build();
        FetchResponse response = consumer.fetch(request);
        if (response.hasError()) {
            consumer.close();
            throw new RuntimeException("Error fetching offset data. Reason: " + response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()));
        }
        MessageAndOffset messageAndOffset = (MessageAndOffset)response.messageSet(topicPartition.getTopic(), topicPartition.getPartition()).iterator().next();
        ByteBuffer payload = messageAndOffset.message().payload();
        byte[] payloadBytes = new byte[payload.limit()];
        payload.get(payloadBytes);
        return new Message(topicPartition.getTopic(), topicPartition.getPartition(), messageAndOffset.offset(), payloadBytes);
    }

    public SimpleConsumer createConsumer(TopicPartition topicPartition) {
        HostAndPort leader = this.findLeader(topicPartition);
        LOG.info("leader for topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition() + " is " + leader.toString());
        String clientName = KafkaClient.getClientName(topicPartition);
        return new SimpleConsumer(leader.getHostText(), leader.getPort(), 100000, 65536, clientName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumPartitions(String topic) {
        SimpleConsumer consumer = null;
        try {
            consumer = new SimpleConsumer(this.mConfig.getKafkaSeedBrokerHost(), this.mConfig.getKafkaSeedBrokerPort(), 100000, 65536, "partitionLookup");
            ArrayList<String> topics = new ArrayList<String>();
            topics.add(topic);
            TopicMetadataRequest request = new TopicMetadataRequest(topics);
            TopicMetadataResponse response = consumer.send(request);
            if (response.topicsMetadata().size() != 1) {
                throw new RuntimeException("Expected one metadata for topic " + topic + " found " + response.topicsMetadata().size());
            }
            TopicMetadata topicMetadata = (TopicMetadata)response.topicsMetadata().get(0);
            int n = topicMetadata.partitionsMetadata().size();
            return n;
        }
        finally {
            if (consumer != null) {
                consumer.close();
            }
        }
    }

    public Message getLastMessage(TopicPartition topicPartition) throws TException {
        SimpleConsumer consumer = this.createConsumer(topicPartition);
        long lastOffset = this.findLastOffset(topicPartition, consumer);
        if (lastOffset < 1L) {
            return null;
        }
        return this.getMessage(topicPartition, lastOffset, consumer);
    }

    public Message getCommittedMessage(TopicPartition topicPartition) throws Exception {
        long committedOffset = this.mZookeeperConnector.getCommittedOffsetCount(topicPartition) - 1L;
        if (committedOffset < 0L) {
            return null;
        }
        SimpleConsumer consumer = this.createConsumer(topicPartition);
        return this.getMessage(topicPartition, committedOffset, consumer);
    }
}

