/*
 * Decompiled with CFR 0.152.
 */
package storm.kafka;

import backtype.storm.task.IMetricsContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.DynamicBrokersReader;
import storm.kafka.DynamicPartitionConnections;
import storm.kafka.Partition;
import storm.kafka.PartitionCoordinator;
import storm.kafka.PartitionManager;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.ZkState;
import storm.kafka.trident.GlobalPartitionInformation;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ZkCoordinator
implements PartitionCoordinator {
    public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
    SpoutConfig _spoutConfig;
    int _taskIndex;
    int _totalTasks;
    String _topologyInstanceId;
    Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
    List<PartitionManager> _cachedList;
    Long _lastRefreshTime = null;
    int _refreshFreqMs;
    DynamicPartitionConnections _connections;
    DynamicBrokersReader _reader;
    ZkState _state;
    Map _stormConf;
    IMetricsContext _metricsContext;

    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
        this._spoutConfig = spoutConfig;
        this._connections = connections;
        this._taskIndex = taskIndex;
        this._totalTasks = totalTasks;
        this._topologyInstanceId = topologyInstanceId;
        this._stormConf = stormConf;
        this._state = state;
        ZkHosts brokerConf = (ZkHosts)spoutConfig.hosts;
        this._refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
        this._reader = new DynamicBrokersReader(stormConf, brokerConf.brokerZkStr, brokerConf.brokerZkPath, spoutConfig.topic);
    }

    @Override
    public List<PartitionManager> getMyManagedPartitions() {
        if (this._lastRefreshTime == null || System.currentTimeMillis() - this._lastRefreshTime > (long)this._refreshFreqMs) {
            this.refresh();
            this._lastRefreshTime = System.currentTimeMillis();
        }
        return this._cachedList;
    }

    void refresh() {
        try {
            PartitionManager man;
            LOG.info("Refreshing partition manager connections");
            GlobalPartitionInformation brokerInfo = this._reader.getBrokerInfo();
            HashSet<Partition> mine = new HashSet<Partition>();
            for (Partition partitionId : brokerInfo) {
                if (!this.myOwnership(partitionId)) continue;
                mine.add(partitionId);
            }
            Set<Partition> curr = this._managers.keySet();
            HashSet newPartitions = new HashSet(mine);
            newPartitions.removeAll(curr);
            HashSet<Partition> deletedPartitions = new HashSet<Partition>(curr);
            deletedPartitions.removeAll(mine);
            LOG.info("Deleted partition managers: " + ((Object)deletedPartitions).toString());
            for (Partition id : deletedPartitions) {
                man = this._managers.remove(id);
                man.close();
            }
            LOG.info("New partition managers: " + ((Object)newPartitions).toString());
            for (Partition id : newPartitions) {
                man = new PartitionManager(this._connections, this._topologyInstanceId, this._state, this._stormConf, this._spoutConfig, id);
                this._managers.put(id, man);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this._cachedList = new ArrayList<PartitionManager>(this._managers.values());
        LOG.info("Finished refreshing");
    }

    @Override
    public PartitionManager getManager(Partition partition) {
        return this._managers.get(partition);
    }

    private boolean myOwnership(Partition id) {
        int val = Math.abs(id.host.hashCode() + 23 * id.partition);
        return val % this._totalTasks == this._taskIndex;
    }
}

