/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.monitor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.shaded.api.OffsetRequest;
import kafka.shaded.api.PartitionOffsetRequestInfo;
import kafka.shaded.common.TopicAndPartition;
import kafka.shaded.javaapi.OffsetResponse;
import kafka.shaded.javaapi.consumer.SimpleConsumer;
import org.apache.commons.cli.shaded.CommandLine;
import org.apache.commons.cli.shaded.DefaultParser;
import org.apache.commons.cli.shaded.HelpFormatter;
import org.apache.commons.cli.shaded.Options;
import org.apache.curator.shaded.framework.CuratorFramework;
import org.apache.curator.shaded.framework.CuratorFrameworkFactory;
import org.apache.curator.shaded.retry.RetryOneTime;
import org.apache.kafka.shaded.clients.consumer.KafkaConsumer;
import org.apache.kafka.shaded.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.shaded.common.PartitionInfo;
import org.apache.kafka.shaded.common.TopicPartition;
import org.apache.storm.kafka.monitor.KafkaOffsetLagResult;
import org.apache.storm.kafka.monitor.KafkaPartitionOffsetLag;
import org.apache.storm.kafka.monitor.NewKafkaSpoutOffsetQuery;
import org.apache.storm.kafka.monitor.OldKafkaSpoutOffsetQuery;
import org.json.simple.shaded.JSONArray;
import org.json.simple.shaded.JSONValue;

public class KafkaOffsetLagUtil {
    private static final String OPTION_TOPIC_SHORT = "t";
    private static final String OPTION_TOPIC_LONG = "topics";
    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
    private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
    private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
    private static final String OPTION_GROUP_ID_SHORT = "g";
    private static final String OPTION_GROUP_ID_LONG = "groupid";
    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
    private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
    private static final String OPTION_PARTITIONS_SHORT = "p";
    private static final String OPTION_PARTITIONS_LONG = "partitions";
    private static final String OPTION_LEADERS_SHORT = "l";
    private static final String OPTION_LEADERS_LONG = "leaders";
    private static final String OPTION_ZK_SERVERS_SHORT = "z";
    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
    private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
    private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
    private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";

    public static void main(String[] args) {
        try {
            List<KafkaOffsetLagResult> results;
            Options options = KafkaOffsetLagUtil.buildOptions();
            DefaultParser parser = new DefaultParser();
            CommandLine commandLine = parser.parse(options, args);
            if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
                KafkaOffsetLagUtil.printUsageAndExit(options, "topics is required");
            }
            String securityProtocol = commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
            if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
                OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
                String[] topics;
                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
                    KafkaOffsetLagUtil.printUsageAndExit(options, "groupid or bootstrap-brokers is not accepted with option old-spout");
                }
                if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
                    KafkaOffsetLagUtil.printUsageAndExit(options, "zk-servers and zk-node are required  with old-spout");
                }
                if ((topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",")) != null && topics.length > 1) {
                    KafkaOffsetLagUtil.printUsageAndExit(options, "Multiple topics not supported with option old-spout. Either a single topic or a wildcard string for matching topics is supported");
                }
                if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
                    if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
                        KafkaOffsetLagUtil.printUsageAndExit(options, "partitions or leaders is not accepted with zk-brokers-root-node");
                    }
                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue(OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG), securityProtocol);
                } else {
                    String[] leaders;
                    String[] partitions;
                    if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
                        KafkaOffsetLagUtil.printUsageAndExit(options, "wildcard-topic is not supported without zk-brokers-root-node");
                    }
                    if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
                        KafkaOffsetLagUtil.printUsageAndExit(options, "partitions and leaders are required if zk-brokers-root-node is not provided");
                    }
                    if ((partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",")).length != (leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",")).length) {
                        KafkaOffsetLagUtil.printUsageAndExit(options, "partitions and leaders need to be of same size");
                    }
                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue(OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue(OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG), securityProtocol);
                }
                results = KafkaOffsetLagUtil.getOffsetLags(oldKafkaSpoutOffsetQuery);
            } else {
                String[] oldSpoutOptions;
                for (String oldOption : oldSpoutOptions = new String[]{OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG, OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG}) {
                    if (!commandLine.hasOption(oldOption)) continue;
                    KafkaOffsetLagUtil.printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
                }
                if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
                    KafkaOffsetLagUtil.printUsageAndExit(options, "groupid and bootstrap-brokers are required if old-spout is not specified");
                }
                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
                results = KafkaOffsetLagUtil.getOffsetLags(newKafkaSpoutOffsetQuery);
            }
            Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyedResult = KafkaOffsetLagUtil.keyByTopicAndPartition(results);
            System.out.print(JSONValue.toJSONString(keyedResult));
        }
        catch (Exception ex) {
            System.out.print("Unable to get offset lags for kafka. Reason: ");
            ex.printStackTrace(System.out);
        }
    }

    private static Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyByTopicAndPartition(List<KafkaOffsetLagResult> results) {
        HashMap<String, Map<Integer, KafkaPartitionOffsetLag>> resultKeyedByTopic = new HashMap<String, Map<Integer, KafkaPartitionOffsetLag>>();
        for (KafkaOffsetLagResult result2 : results) {
            String topic = result2.getTopic();
            HashMap<Integer, KafkaPartitionOffsetLag> topicResultKeyedByPartition = (HashMap<Integer, KafkaPartitionOffsetLag>)resultKeyedByTopic.get(topic);
            if (topicResultKeyedByPartition == null) {
                topicResultKeyedByPartition = new HashMap<Integer, KafkaPartitionOffsetLag>();
                resultKeyedByTopic.put(topic, topicResultKeyedByPartition);
            }
            topicResultKeyedByPartition.put(result2.getPartition(), new KafkaPartitionOffsetLag(result2.getConsumerCommittedOffset(), result2.getLogHeadOffset()));
        }
        return resultKeyedByTopic;
    }

    private static void printUsageAndExit(Options options, String message) {
        System.out.println(message);
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp("storm-kafka-monitor ", options);
        System.exit(1);
    }

    private static Options buildOptions() {
        Options options = new Options();
        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed offset");
        options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new consumer/spout e.g. hostname1:9092,hostname2:9092");
        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in old spout");
        options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to leaders for old spout with StaticHosts");
        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to partitions for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets  and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed offsets without the topic and partition nodes");
        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. /brokers (applicable only for old kafka spout) ");
        options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
        return options;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
        ArrayList<KafkaOffsetLagResult> result2 = new ArrayList<KafkaOffsetLagResult>();
        try (KafkaConsumer consumer = null;){
            Properties props = new Properties();
            props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
            props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId());
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.shaded.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.shaded.common.serialization.StringDeserializer");
            if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
            }
            ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
            consumer = new KafkaConsumer(props);
            for (String topic : newKafkaSpoutOffsetQuery.getTopics().split(",")) {
                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
                if (partitionInfoList == null) continue;
                for (PartitionInfo partitionInfo : partitionInfoList) {
                    topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }
            }
            consumer.assign(topicPartitionList);
            for (TopicPartition topicPartition : topicPartitionList) {
                OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
                long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1L;
                consumer.seekToEnd(KafkaOffsetLagUtil.toArrayList(topicPartition));
                result2.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition)));
            }
        }
        return result2;
    }

    private static Collection<TopicPartition> toArrayList(final TopicPartition tp) {
        return new ArrayList<TopicPartition>(1){
            {
                super(x0);
                this.add(tp);
            }
        };
    }

    public static List<KafkaOffsetLagResult> getOffsetLags(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
        ArrayList<KafkaOffsetLagResult> result2 = new ArrayList<KafkaOffsetLagResult>();
        Map<String, List<TopicPartition>> leaders = KafkaOffsetLagUtil.getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
        if (leaders != null) {
            String securityProtocol = oldKafkaSpoutOffsetQuery.getSecurityProtocol() != null ? oldKafkaSpoutOffsetQuery.getSecurityProtocol() : "PLAINTEXT";
            Map<String, Map<Integer, Long>> logHeadOffsets = KafkaOffsetLagUtil.getLogHeadOffsets(leaders, securityProtocol);
            HashMap<String, List<Integer>> topicPartitions = new HashMap<String, List<Integer>>();
            for (Map.Entry<String, List<TopicPartition>> entry2 : leaders.entrySet()) {
                for (TopicPartition topicPartition : entry2.getValue()) {
                    if (!topicPartitions.containsKey(topicPartition.topic())) {
                        topicPartitions.put(topicPartition.topic(), new ArrayList());
                    }
                    ((List)topicPartitions.get(topicPartition.topic())).add(topicPartition.partition());
                }
            }
            Map<String, Map<Integer, Long>> oldConsumerOffsets = KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
            for (Map.Entry<String, Map<Integer, Long>> topicOffsets : logHeadOffsets.entrySet()) {
                for (Map.Entry<Integer, Long> partitionOffsets : topicOffsets.getValue().entrySet()) {
                    Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get(partitionOffsets.getKey()) : -1L;
                    consumerCommittedOffset = consumerCommittedOffset == null ? -1L : consumerCommittedOffset;
                    KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(), consumerCommittedOffset, partitionOffsets.getValue());
                    result2.add(kafkaOffsetLagResult);
                }
            }
        }
        return result2;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
        HashMap<String, List<TopicPartition>> result2 = new HashMap<String, List<TopicPartition>>();
        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
            String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
            String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
            for (int i = 0; i < leaders.length; ++i) {
                if (!result2.containsKey(leaders[i])) {
                    result2.put(leaders[i], new ArrayList());
                }
                ((List)result2.get(leaders[i])).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
            }
        } else {
            try (CuratorFramework curatorFramework = null;){
                String brokersZkNode = oldKafkaSpoutOffsetQuery.getBrokersZkPath();
                if (!brokersZkNode.endsWith("/")) {
                    brokersZkNode = brokersZkNode + "/";
                }
                String topicsZkPath = brokersZkNode + OPTION_TOPIC_LONG;
                curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
                curatorFramework.start();
                ArrayList<String> topics = new ArrayList<String>();
                if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
                    List children = (List)curatorFramework.getChildren().forPath(topicsZkPath);
                    for (String child : children) {
                        if (!child.matches(oldKafkaSpoutOffsetQuery.getTopic())) continue;
                        topics.add(child);
                    }
                } else {
                    topics.add(oldKafkaSpoutOffsetQuery.getTopic());
                }
                for (String topic : topics) {
                    String partitionsPath = topicsZkPath + "/" + topic + "/partitions";
                    List children = (List)curatorFramework.getChildren().forPath(partitionsPath);
                    for (int i = 0; i < children.size(); ++i) {
                        byte[] leaderData = (byte[])curatorFramework.getData().forPath(partitionsPath + "/" + i + "/state");
                        Map value2 = (Map)JSONValue.parse(new String(leaderData, "UTF-8"));
                        Integer leader = ((Number)value2.get("leader")).intValue();
                        byte[] brokerData = (byte[])curatorFramework.getData().forPath(brokersZkNode + "ids/" + leader);
                        Map broker = (Map)JSONValue.parse(new String(brokerData, "UTF-8"));
                        String host = null;
                        Integer port = null;
                        if (broker.containsKey("endpoints")) {
                            String endPoint = (String)((JSONArray)broker.get("endpoints")).get(0);
                            String[] split2 = endPoint.split(".+://")[1].split(":");
                            if (split2.length == 2) {
                                host = split2[0];
                                port = Integer.parseInt(split2[1]);
                            }
                        } else {
                            host = (String)broker.get("host");
                            port = ((Long)broker.get("port")).intValue();
                        }
                        String leaderBroker = host + ":" + port;
                        if (!result2.containsKey(leaderBroker)) {
                            result2.put(leaderBroker, new ArrayList());
                        }
                        ((List)result2.get(leaderBroker)).add(new TopicPartition(topic, i));
                    }
                }
            }
        }
        return result2;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Map<String, Map<Integer, Long>> getLogHeadOffsets(Map<String, List<TopicPartition>> leadersAndTopicPartitions, String securityProtocol) {
        HashMap<String, Map<Integer, Long>> result2 = new HashMap<String, Map<Integer, Long>>();
        if (leadersAndTopicPartitions != null) {
            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
            SimpleConsumer simpleConsumer = null;
            for (Map.Entry<String, List<TopicPartition>> leader : leadersAndTopicPartitions.entrySet()) {
                try {
                    simpleConsumer = new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 65536, "LogHeadOffsetRequest", securityProtocol);
                    HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
                    for (TopicPartition topicPartition : leader.getValue()) {
                        requestInfo.put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo);
                        if (result2.containsKey(topicPartition.topic())) continue;
                        result2.put(topicPartition.topic(), new HashMap());
                    }
                    kafka.shaded.javaapi.OffsetRequest request = new kafka.shaded.javaapi.OffsetRequest(requestInfo, OffsetRequest.CurrentVersion(), "LogHeadOffsetRequest");
                    OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
                    for (TopicPartition topicPartition : leader.getValue()) {
                        ((Map)result2.get(topicPartition.topic())).put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
                    }
                }
                finally {
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                }
            }
        }
        return result2;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk(Map<String, List<Integer>> topicPartitions, OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
        HashMap<String, Map<Integer, Long>> result2 = new HashMap<String, Map<Integer, Long>>();
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
        curatorFramework.start();
        String partitionPrefix = "partition_";
        String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
        if (!zkPath.endsWith("/")) {
            zkPath = zkPath + "/";
        }
        try {
            if (topicPartitions != null) {
                for (Map.Entry<String, List<Integer>> topicEntry : topicPartitions.entrySet()) {
                    HashMap<Integer, Long> partitionOffsets = new HashMap<Integer, Long>();
                    for (Integer partition2 : topicEntry.getValue()) {
                        String path = zkPath + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix + partition2;
                        if (curatorFramework.checkExists().forPath(path) == null) continue;
                        byte[] zkData = (byte[])curatorFramework.getData().forPath(path);
                        Map offsetData = (Map)JSONValue.parse(new String(zkData, "UTF-8"));
                        partitionOffsets.put(partition2, (Long)offsetData.get("offset"));
                    }
                    result2.put(topicEntry.getKey(), partitionOffsets);
                }
            }
        }
        finally {
            if (curatorFramework != null) {
                curatorFramework.close();
            }
        }
        return result2;
    }
}

