/*
 * Decompiled with CFR 0.152.
 */
package nl.minvenj.nfi.storm.kafka.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import nl.minvenj.nfi.storm.kafka.fail.FailHandler;
import nl.minvenj.nfi.storm.kafka.fail.ReliableFailHandler;
import nl.minvenj.nfi.storm.kafka.fail.UnreliableFailHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigUtils {
    public static final String CONFIG_PREFIX = "kafka.";
    public static final String CONFIG_FILE = "kafka.config";
    public static final String CONFIG_TOPIC = "kafka.spout.topic";
    public static final String DEFAULT_TOPIC = "storm";
    public static final String CONFIG_FAIL_HANDLER = "kafka.spout.fail.handler";
    public static final FailHandler DEFAULT_FAIL_HANDLER = new ReliableFailHandler();
    public static final String CONFIG_GROUP = "kafka.spout.consumer.group";
    public static final String DEFAULT_GROUP = "kafka_spout";
    public static final String CONFIG_BUFFER_MAX_MESSAGES = "kafka.spout.buffer.size.max";
    public static final int DEFAULT_BUFFER_MAX_MESSAGES = 1024;
    private static final Logger LOG = LoggerFactory.getLogger(ConfigUtils.class);

    public static Properties configFromResource(String resource) {
        InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource);
        if (input == null) {
            throw new IllegalArgumentException("configuration file '" + resource + "' not found on classpath");
        }
        Properties config = new Properties();
        try {
            config.load(input);
        }
        catch (IOException e) {
            throw new IllegalArgumentException("reading configuration from '" + resource + "' failed", e);
        }
        return config;
    }

    public static Properties createKafkaConfig(Map<String, Object> config) {
        Properties consumerConfig;
        if (config.get(CONFIG_FILE) != null) {
            String configFile = String.valueOf(config.get(CONFIG_FILE));
            LOG.info("loading kafka configuration from {}", (Object)configFile);
            consumerConfig = ConfigUtils.configFromResource(configFile);
        } else {
            LOG.info("reading kafka configuration from storm config using prefix '{}'", (Object)CONFIG_PREFIX);
            consumerConfig = ConfigUtils.configFromPrefix(config, CONFIG_PREFIX);
        }
        if (!consumerConfig.containsKey("zookeeper.connect")) {
            String zookeepers = ConfigUtils.getStormZookeepers(config);
            if (zookeepers != null) {
                consumerConfig.setProperty("zookeeper.connect", zookeepers);
                LOG.info("no explicit zookeeper configured for kafka, falling back on storm's zookeeper ({})", (Object)zookeepers);
            } else {
                throw new IllegalArgumentException("required kafka configuration key 'zookeeper.connect' not found");
            }
        }
        if (!consumerConfig.containsKey("group.id") || String.valueOf(consumerConfig.get("group.id")).isEmpty()) {
            Object groupId = config.get(CONFIG_GROUP);
            if (groupId != null && !String.valueOf(groupId).isEmpty()) {
                consumerConfig.setProperty("group.id", String.valueOf(groupId));
            } else {
                consumerConfig.setProperty("group.id", DEFAULT_GROUP);
                LOG.info("kafka consumer group id not configured or empty, using default ({})", (Object)DEFAULT_GROUP);
            }
        }
        if (!consumerConfig.containsKey("auto.commit.enable")) {
            consumerConfig.setProperty("auto.commit.enable", "false");
        }
        ConfigUtils.checkConfigSanity(consumerConfig);
        return consumerConfig;
    }

    public static Properties configFromPrefix(Map<String, Object> base, String prefix) {
        Properties config = new Properties();
        for (Map.Entry<String, Object> entry : base.entrySet()) {
            if (!entry.getKey().startsWith(prefix)) continue;
            config.setProperty(entry.getKey().substring(prefix.length()), String.valueOf(entry.getValue()));
        }
        return config;
    }

    public static String getStormZookeepers(Map<String, Object> stormConfig) {
        Object stormZookeepers = stormConfig.get("storm.zookeeper.servers");
        Object stormZookeepersPort = stormConfig.get("storm.zookeeper.port");
        if (stormZookeepers instanceof List && stormZookeepersPort instanceof Number) {
            StringBuilder zookeepers = new StringBuilder();
            int port = ((Number)stormZookeepersPort).intValue();
            Iterator iterator = ((List)stormZookeepers).iterator();
            while (iterator.hasNext()) {
                zookeepers.append(String.valueOf(iterator.next()));
                zookeepers.append(':');
                zookeepers.append(port);
                if (!iterator.hasNext()) continue;
                zookeepers.append(',');
            }
            return zookeepers.toString();
        }
        return null;
    }

    public static FailHandler createFailHandlerFromString(String failHandler) {
        if (failHandler.equalsIgnoreCase("reliable")) {
            return new ReliableFailHandler();
        }
        if (failHandler.equalsIgnoreCase("unreliable")) {
            return new UnreliableFailHandler();
        }
        try {
            return (FailHandler)Class.forName(failHandler).newInstance();
        }
        catch (ClassNotFoundException e) {
            throw new IllegalArgumentException("failed to instantiate FailHandler instance from argument " + failHandler, e);
        }
        catch (InstantiationException e) {
            throw new IllegalArgumentException("failed to instantiate FailHandler instance from argument " + failHandler, e);
        }
        catch (IllegalAccessException e) {
            throw new IllegalArgumentException("failed to instantiate FailHandler instance from argument " + failHandler, e);
        }
        catch (ClassCastException e) {
            throw new IllegalArgumentException("instance from argument " + failHandler + " does not implement FailHandler", e);
        }
    }

    public static int getMaxBufSize(Map<String, Object> stormConfig) {
        Object value = stormConfig.get(CONFIG_BUFFER_MAX_MESSAGES);
        if (value != null) {
            try {
                return Integer.parseInt(String.valueOf(value).trim());
            }
            catch (NumberFormatException e) {
                LOG.warn("invalid value for '{}' in storm config ({}); falling back to default ({})", new Object[]{CONFIG_BUFFER_MAX_MESSAGES, value, 1024});
            }
        }
        return 1024;
    }

    public static String getTopic(Map<String, Object> stormConfig) {
        if (stormConfig.containsKey(CONFIG_TOPIC)) {
            String topic = String.valueOf(stormConfig.get(CONFIG_TOPIC)).trim();
            if (topic.length() > 0) {
                return topic;
            }
            LOG.warn("configured topic found in storm config is empty, defaulting to topic '{}'", (Object)DEFAULT_TOPIC);
            return DEFAULT_TOPIC;
        }
        LOG.warn("no configured topic found in storm config, defaulting to topic '{}'", (Object)DEFAULT_TOPIC);
        return DEFAULT_TOPIC;
    }

    public static void checkConfigSanity(Properties config) {
        String autoCommit = config.getProperty("auto.commit.enable");
        if (autoCommit == null || Boolean.parseBoolean(String.valueOf(autoCommit))) {
            throw new IllegalArgumentException("kafka configuration 'auto.commit.enable' should be set to false for operation in storm");
        }
        String consumerTimeout = config.getProperty("consumer.timeout.ms");
        if (consumerTimeout == null || Integer.parseInt(String.valueOf(consumerTimeout)) < 0) {
            throw new IllegalArgumentException("kafka configuration value for 'consumer.timeout.ms' is not suitable for operation in storm");
        }
    }
}

