/*
 * Decompiled with CFR 0.152.
 */
package com.hmsonline.storm.osgi.impl;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import clojure.osgi.IClojureLoader;
import com.hmsonline.storm.osgi.bolt.BoltDefinition;
import com.hmsonline.storm.osgi.spout.SpoutDefinition;
import com.hmsonline.storm.osgi.subscription.DistributionPolicy;
import com.hmsonline.storm.osgi.topology.ITopologyDefinition;
import com.hmsonline.storm.osgi.topology.ITopologyProperties;
import com.hmsonline.storm.osgi.topology.Subscription;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopologyManager.class);
    private IClojureLoader clojureLoader;
    private ConcurrentMap<String, ITopologyDefinition> topologyMap = new ConcurrentHashMap<String, ITopologyDefinition>();
    private LocalCluster cluster;

    public void init() {
    }

    public void destroy() {
        if (this.cluster != null) {
            try {
                this.cluster.shutdown();
            }
            catch (Exception ex) {
                LOGGER.warn("Error attempting to shutdown cluster.", (Throwable)ex);
            }
        }
        this.topologyMap.clear();
    }

    public void configRegistered(String pid, Dictionary<String, Object> properties) {
        LOGGER.error("Received properties for pid: " + pid + " " + properties);
    }

    public void configDeregistered(String pid, Dictionary properties) {
        LOGGER.error("Received removed properties for pid: " + pid);
    }

    public void onBindService(ITopologyDefinition definition) {
        try {
            this.submitTopology(definition);
        }
        catch (Exception ex) {
            LOGGER.error("Error binding topology service.", (Throwable)ex);
        }
    }

    public void onUnbindService(ITopologyDefinition definition) {
        if (definition == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Null topology definition was provided for unbind service, ignoring.");
            }
            return;
        }
        this.shutdownTopology(definition);
    }

    public void submitTopology(ITopologyDefinition definition) throws Exception {
        ITopologyDefinition previousDefinition = this.topologyMap.putIfAbsent(definition.getName(), definition);
        if (previousDefinition != null) {
            throw new IllegalStateException("A topology with the name, " + definition.getName() + " has already been deployed, request for deployment will be ignored.");
        }
        TopologyBuilder builder = new TopologyBuilder();
        try {
            if (this.cluster == null) {
                this.cluster = (LocalCluster)this.clojureLoader.createInstance(this.getClass().getClassLoader(), "backtype.storm.LocalCluster");
            }
            List<SpoutDefinition> spouts = definition.getSpouts();
            for (SpoutDefinition spoutDef : spouts) {
                builder.setSpout(spoutDef.getName(), (IRichSpout)spoutDef, (Number)spoutDef.getParallelismHint());
            }
            List<BoltDefinition> bolts = definition.getBolts();
            for (int i = 0; i < bolts.size(); ++i) {
                BoltDefinition boltDef = bolts.get(i);
                BoltDeclarer declarer = builder.setBolt(boltDef.getName(), (IRichBolt)boltDef, (Number)boltDef.getParallelismHint());
                Subscription subscription = boltDef.getSubscription();
                if (subscription.getStream() == null) {
                    int j;
                    String to = subscription.getTo();
                    for (j = 0; j < spouts.size(); ++j) {
                        SpoutDefinition sd = spouts.get(j);
                        if (!to.equals(sd.getName())) continue;
                        subscription.setStream(sd.getStreams()[0]);
                        break;
                    }
                    if (subscription.getStream() == null) {
                        for (j = 0; j < bolts.size(); ++j) {
                            BoltDefinition bd = bolts.get(j);
                            if (!to.equals(bd.getName())) continue;
                            subscription.setStream(bd.getStreams()[0]);
                            break;
                        }
                    }
                }
                DistributionPolicy distribution = subscription.getDistribution();
                distribution.setup(declarer, subscription);
            }
        }
        catch (Exception ex) {
            this.topologyMap.remove(definition.getName());
            throw new IllegalStateException("Error building topology, " + definition.getName() + ", due to an error, topology won't be deployed.", ex);
        }
        try {
            Config conf = new Config();
            ITopologyProperties properties = definition.getConfiguration();
            if (properties != null) {
                conf.putAll(properties.getConfiguration());
            }
            this.cluster.submitTopology(definition.getName(), (Map)conf, builder.createTopology());
        }
        catch (Exception ex) {
            this.shutdownTopology(definition);
            LOGGER.warn("Not deploying topology, " + definition.getName() + ", due to an error.", (Throwable)ex);
        }
    }

    public IClojureLoader getClojureLoader() {
        return this.clojureLoader;
    }

    public void setClojureLoader(IClojureLoader clojureLoader) {
        this.clojureLoader = clojureLoader;
    }

    private void shutdownTopology(final ITopologyDefinition definition) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Topology service, " + definition.getName() + " is being shutdown.");
        }
        this.topologyMap.remove(definition.getName());
        try {
            this.clojureLoader.invoke(this.getClass().getClassLoader(), (Callable)new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    TopologyManager.this.cluster.killTopology(definition.getName());
                    return true;
                }
            });
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Topology service, " + definition.getName() + " was shutdown successfully.");
            }
        }
        catch (Exception ex) {
            LOGGER.debug(definition.getName() + " topology failed during shutdown request (probably not active).", (Throwable)ex);
        }
    }
}

