/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.tools;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import com.pinterest.secor.common.KafkaClient;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.common.ZookeeperConnector;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.parser.MessageParser;
import com.pinterest.secor.parser.TimestampedMessageParser;
import com.pinterest.secor.util.ReflectionUtil;
import com.timgroup.statsd.NonBlockingStatsDClient;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.minidev.json.JSONArray;
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProgressMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(ProgressMonitor.class);
    private static final String PERIOD = ".";
    private SecorConfig mConfig;
    private ZookeeperConnector mZookeeperConnector;
    private KafkaClient mKafkaClient;
    private MessageParser mMessageParser;
    private String mPrefix;

    public ProgressMonitor(SecorConfig config) throws Exception {
        this.mConfig = config;
        this.mZookeeperConnector = new ZookeeperConnector(this.mConfig);
        this.mKafkaClient = new KafkaClient(this.mConfig);
        this.mMessageParser = ReflectionUtil.createMessageParser(this.mConfig.getMessageParserClass(), this.mConfig);
        this.mPrefix = this.mConfig.getMonitoringPrefix();
        if (Strings.isNullOrEmpty((String)this.mPrefix)) {
            this.mPrefix = "secor";
        }
    }

    private void makeRequest(String body) throws IOException {
        URL url = new URL("http://" + this.mConfig.getTsdbHostport() + "/api/put?details");
        HttpURLConnection connection = null;
        try {
            InputStream inputStream;
            BufferedReader reader;
            Map response;
            connection = (HttpURLConnection)url.openConnection();
            connection.setRequestProperty("Content-Type", "application/json");
            connection.setRequestProperty("Accepts", "application/json");
            connection.setRequestProperty("Accept", "*/*");
            if (body != null) {
                connection.setRequestMethod("POST");
                connection.setRequestProperty("Content-Length", Integer.toString(body.getBytes().length));
            }
            connection.setUseCaches(false);
            connection.setDoInput(true);
            connection.setDoOutput(true);
            if (body != null) {
                DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
                dataOutputStream.writeBytes(body);
                dataOutputStream.flush();
                dataOutputStream.close();
            }
            if (!(response = (Map)JSONValue.parse((Reader)(reader = new BufferedReader(new InputStreamReader(inputStream = connection.getInputStream()))))).get("failed").equals(0)) {
                throw new RuntimeException("url " + url + " with body " + body + " failed " + JSONObject.toJSONString((Map)response));
            }
        }
        catch (IOException exception) {
            if (connection != null) {
                connection.disconnect();
            }
            throw exception;
        }
    }

    private void exportToTsdb(Stat stat) throws IOException {
        LOG.info("exporting metric to tsdb {}", (Object)stat);
        this.makeRequest(stat.toString());
    }

    public void exportStats() throws Exception {
        List<Stat> stats = this.getStats();
        System.out.println(JSONArray.toJSONString(stats));
        if (this.mConfig.getTsdbHostport() != null && !this.mConfig.getTsdbHostport().isEmpty()) {
            for (Stat stat : stats) {
                this.exportToTsdb(stat);
            }
        }
        if (this.mConfig.getStatsDHostPort() != null && !this.mConfig.getStatsDHostPort().isEmpty()) {
            this.exportToStatsD(stats);
        }
    }

    private void exportToStatsD(List<Stat> stats) {
        HostAndPort hostPort = HostAndPort.fromString((String)this.mConfig.getStatsDHostPort());
        NonBlockingStatsDClient client = new NonBlockingStatsDClient(this.mConfig.getKafkaGroup(), hostPort.getHostText(), hostPort.getPort());
        for (Stat stat : stats) {
            Map tags = (Map)stat.get(Stat.STAT_KEYS.TAGS.getName());
            String aspect = (String)stat.get(Stat.STAT_KEYS.METRIC.getName()) + PERIOD + (String)tags.get(Stat.STAT_KEYS.TOPIC.getName()) + PERIOD + (String)tags.get(Stat.STAT_KEYS.PARTITION.getName());
            client.recordGaugeValue(aspect, Long.parseLong((String)stat.get(Stat.STAT_KEYS.VALUE.getName())));
        }
    }

    private List<Stat> getStats() throws Exception {
        List<String> topics = this.mZookeeperConnector.getCommittedOffsetTopics();
        ArrayList stats = Lists.newArrayList();
        for (String topic : topics) {
            if (topic.matches(this.mConfig.getMonitoringBlacklistTopics()) || !topic.matches(this.mConfig.getKafkaTopicFilter())) {
                LOG.info("skipping topic {}", (Object)topic);
                continue;
            }
            List<Integer> partitions = this.mZookeeperConnector.getCommittedOffsetPartitions(topic);
            for (Integer partition : partitions) {
                TopicPartition topicPartition = new TopicPartition(topic, partition);
                Message committedMessage = this.mKafkaClient.getCommittedMessage(topicPartition);
                long committedOffset = -1L;
                long committedTimestampMillis = -1L;
                if (committedMessage == null) {
                    LOG.warn("no committed message found in topic {} partition {}", (Object)topic, (Object)partition);
                } else {
                    committedOffset = committedMessage.getOffset();
                    committedTimestampMillis = this.getTimestamp(committedMessage);
                }
                Message lastMessage = this.mKafkaClient.getLastMessage(topicPartition);
                if (lastMessage == null) {
                    LOG.warn("no message found in topic {} partition {}", (Object)topic, (Object)partition);
                    continue;
                }
                long lastOffset = lastMessage.getOffset();
                long lastTimestampMillis = this.getTimestamp(lastMessage);
                assert (committedOffset <= lastOffset) : Long.toString(committedOffset) + " <= " + lastOffset;
                long offsetLag = lastOffset - committedOffset;
                long timestampMillisLag = lastTimestampMillis - committedTimestampMillis;
                ImmutableMap tags = ImmutableMap.of((Object)Stat.STAT_KEYS.TOPIC.getName(), (Object)topic, (Object)Stat.STAT_KEYS.PARTITION.getName(), (Object)Integer.toString(partition));
                long timestamp = System.currentTimeMillis() / 1000L;
                stats.add(Stat.createInstance(this.metricName("lag.offsets"), (Map<String, String>)tags, Long.toString(offsetLag), timestamp));
                stats.add(Stat.createInstance(this.metricName("lag.seconds"), (Map<String, String>)tags, Long.toString(timestampMillisLag / 1000L), timestamp));
                LOG.debug("topic {} partition {} committed offset {} last offset {} committed timestamp {} last timestamp {}", new Object[]{topic, partition, committedOffset, lastOffset, committedTimestampMillis / 1000L, lastTimestampMillis / 1000L});
            }
        }
        return stats;
    }

    private String metricName(String key) {
        return Joiner.on((String)PERIOD).join((Object)this.mPrefix, (Object)key, new Object[0]);
    }

    private long getTimestamp(Message message) throws Exception {
        if (this.mMessageParser instanceof TimestampedMessageParser) {
            return ((TimestampedMessageParser)this.mMessageParser).extractTimestampMillis(message);
        }
        return -1L;
    }

    private static class Stat
    extends JSONObject {
        public static Stat createInstance(String metric, Map<String, String> tags, String value, long timestamp) {
            return new Stat((Map<String, Object>)ImmutableMap.of((Object)STAT_KEYS.METRIC.getName(), (Object)metric, (Object)STAT_KEYS.TAGS.getName(), tags, (Object)STAT_KEYS.VALUE.getName(), (Object)value, (Object)STAT_KEYS.TIMESTAMP.getName(), (Object)timestamp));
        }

        public Stat(Map<String, Object> map) {
            super(map);
        }

        public static enum STAT_KEYS {
            METRIC("metric"),
            TAGS("tags"),
            VALUE("value"),
            TIMESTAMP("timestamp"),
            TOPIC("topic"),
            PARTITION("partition");

            private final String mName;

            private STAT_KEYS(String name) {
                this.mName = name;
            }

            public String getName() {
                return this.mName;
            }
        }
    }
}

