/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.reader;

import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.RateLimitUtil;
import com.pinterest.secor.util.StatsUtil;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Blacklist;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageReader {
    private static final Logger LOG = LoggerFactory.getLogger(MessageReader.class);
    private SecorConfig mConfig;
    private OffsetTracker mOffsetTracker;
    private ConsumerConnector mConsumerConnector;
    private ConsumerIterator mIterator;
    private HashMap<TopicPartition, Long> mLastAccessTime;
    private final int mTopicPartitionForgetSeconds;
    private final int mCheckMessagesPerSecond;
    private int mNMessages;

    public MessageReader(SecorConfig config, OffsetTracker offsetTracker) throws UnknownHostException {
        this.mConfig = config;
        this.mOffsetTracker = offsetTracker;
        this.mConsumerConnector = Consumer.createJavaConsumerConnector((ConsumerConfig)this.createConsumerConfig());
        if (!this.mConfig.getKafkaTopicBlacklist().isEmpty() && !this.mConfig.getKafkaTopicFilter().isEmpty()) {
            throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
        }
        Blacklist topicFilter = !this.mConfig.getKafkaTopicBlacklist().isEmpty() ? new Blacklist(this.mConfig.getKafkaTopicBlacklist()) : new Whitelist(this.mConfig.getKafkaTopicFilter());
        LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), (Object)topicFilter);
        List streams = this.mConsumerConnector.createMessageStreamsByFilter((TopicFilter)topicFilter);
        KafkaStream stream = (KafkaStream)streams.get(0);
        this.mIterator = stream.iterator();
        this.mLastAccessTime = new HashMap();
        StatsUtil.setLabel("secor.kafka.consumer.id", IdUtil.getConsumerId());
        this.mTopicPartitionForgetSeconds = this.mConfig.getTopicPartitionForgetSeconds();
        this.mCheckMessagesPerSecond = this.mConfig.getMessagesPerSecond() / this.mConfig.getConsumerThreads();
    }

    private void updateAccessTime(TopicPartition topicPartition) {
        long now = System.currentTimeMillis() / 1000L;
        this.mLastAccessTime.put(topicPartition, now);
        Iterator<Map.Entry<TopicPartition, Long>> iterator = this.mLastAccessTime.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<TopicPartition, Long> pair = iterator.next();
            long lastAccessTime = pair.getValue();
            if (now - lastAccessTime <= (long)this.mTopicPartitionForgetSeconds) continue;
            iterator.remove();
        }
    }

    private void exportStats() {
        StringBuffer topicPartitions = new StringBuffer();
        for (TopicPartition topicPartition : this.mLastAccessTime.keySet()) {
            if (topicPartitions.length() > 0) {
                topicPartitions.append(' ');
            }
            topicPartitions.append(topicPartition.getTopic() + '/' + topicPartition.getPartition());
        }
        StatsUtil.setLabel("secor.topic_partitions", topicPartitions.toString());
    }

    private ConsumerConfig createConsumerConfig() throws UnknownHostException {
        Properties props = new Properties();
        props.put("zookeeper.connect", this.mConfig.getZookeeperQuorum() + this.mConfig.getKafkaZookeeperPath());
        props.put("group.id", this.mConfig.getKafkaGroup());
        props.put("zookeeper.session.timeout.ms", Integer.toString(this.mConfig.getZookeeperSessionTimeoutMs()));
        props.put("zookeeper.sync.time.ms", Integer.toString(this.mConfig.getZookeeperSyncTimeMs()));
        props.put("auto.commit.enable", "false");
        props.put("auto.offset.reset", "smallest");
        props.put("consumer.timeout.ms", Integer.toString(this.mConfig.getConsumerTimeoutMs()));
        props.put("consumer.id", IdUtil.getConsumerId());
        props.put("dual.commit.enabled", this.mConfig.getDualCommitEnabled());
        props.put("offsets.storage", this.mConfig.getOffsetsStorage());
        props.put("partition.assignment.strategy", this.mConfig.getPartitionAssignmentStrategy());
        if (this.mConfig.getRebalanceMaxRetries() != null && !this.mConfig.getRebalanceMaxRetries().isEmpty()) {
            props.put("rebalance.max.retries", this.mConfig.getRebalanceMaxRetries());
        }
        if (this.mConfig.getRebalanceBackoffMs() != null && !this.mConfig.getRebalanceBackoffMs().isEmpty()) {
            props.put("rebalance.backoff.ms", this.mConfig.getRebalanceBackoffMs());
        }
        if (this.mConfig.getSocketReceiveBufferBytes() != null && !this.mConfig.getSocketReceiveBufferBytes().isEmpty()) {
            props.put("socket.receive.buffer.bytes", this.mConfig.getSocketReceiveBufferBytes());
        }
        if (this.mConfig.getFetchMessageMaxBytes() != null && !this.mConfig.getFetchMessageMaxBytes().isEmpty()) {
            props.put("fetch.message.max.bytes", this.mConfig.getFetchMessageMaxBytes());
        }
        if (this.mConfig.getFetchMinBytes() != null && !this.mConfig.getFetchMinBytes().isEmpty()) {
            props.put("fetch.min.bytes", this.mConfig.getFetchMinBytes());
        }
        if (this.mConfig.getFetchWaitMaxMs() != null && !this.mConfig.getFetchWaitMaxMs().isEmpty()) {
            props.put("fetch.wait.max.ms", this.mConfig.getFetchWaitMaxMs());
        }
        return new ConsumerConfig(props);
    }

    public boolean hasNext() {
        return this.mIterator.hasNext();
    }

    public Message read() {
        assert (this.hasNext());
        this.mNMessages = (this.mNMessages + 1) % this.mCheckMessagesPerSecond;
        if (this.mNMessages % this.mCheckMessagesPerSecond == 0) {
            RateLimitUtil.acquire(this.mCheckMessagesPerSecond);
        }
        MessageAndMetadata kafkaMessage = this.mIterator.next();
        Message message = new Message(kafkaMessage.topic(), kafkaMessage.partition(), kafkaMessage.offset(), (byte[])kafkaMessage.key(), (byte[])kafkaMessage.message());
        TopicPartition topicPartition = new TopicPartition(message.getTopic(), message.getKafkaPartition());
        this.updateAccessTime(topicPartition);
        long committedOffsetCount = this.mOffsetTracker.getTrueCommittedOffsetCount(topicPartition);
        LOG.debug("read message {}", (Object)message);
        if (this.mNMessages % this.mCheckMessagesPerSecond == 0) {
            this.exportStats();
        }
        if (message.getOffset() < committedOffsetCount) {
            LOG.debug("skipping message {} because its offset precedes committed offset count {}", (Object)message, (Object)committedOffsetCount);
            return null;
        }
        return message;
    }
}

