/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.cluster;

import com.linkedin.databus.client.registration.ClusterRegistrationStaticConfig;
import com.linkedin.databus.cluster.DatabusClusterDataNotifier;
import com.linkedin.databus.cluster.DatabusClusterNotifier;
import com.linkedin.databus.cluster.DatabusClusterNotifierFactory;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.log4j.Logger;

public class DatabusCluster {
    public static final String MODULE = DatabusCluster.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String DEFAULT_STATE_MODEL = "OnlineOffline";
    public static final String DEFAULT_RESOURCE_NAME = "default-resource";
    public static final int DEFAULT_CLUSTER_CREATE_WAIT_MS = 1000;
    private static final String HELIX_MANAGER_ZK_SESSION_TIMEOUT_KEY = "zk.session.timeout";
    protected final ZkClient _zkClient;
    protected final ZKHelixAdmin _admin;
    protected final String _clusterName;
    protected final String _zkAddr;
    protected final int _quorum;
    protected final int _numPartitions;
    protected final HashSet<DatabusClusterDataNotifier> _dataNotifiers;
    protected final DatabusHelixWatcher _watcher;
    private final int _zkConnectionTimeoutMs;
    private final int _zkSessionTimeoutMs;

    public DatabusCluster(ClusterRegistrationStaticConfig config) throws Exception {
        this._zkAddr = config.getZkAddr();
        this._clusterName = config.getClusterName();
        this._quorum = (int)config.getQuorum();
        this._numPartitions = (int)config.getNumPartitions();
        this._zkSessionTimeoutMs = config.getZkSessionTimeoutMs();
        this._zkConnectionTimeoutMs = config.getZkConnectionTimeoutMs();
        DatabusCluster.updateHelixManagerZkSessionTimeout(this._zkSessionTimeoutMs);
        this._zkClient = new ZkClient(this._zkAddr, this._zkSessionTimeoutMs, this._zkConnectionTimeoutMs, (ZkSerializer)new ZNRecordSerializer());
        this._admin = new ZKHelixAdmin(this._zkClient);
        this._dataNotifiers = new HashSet(5);
        int part = DatabusCluster.create(this._admin, this._zkClient, this._clusterName, this._numPartitions);
        if (part >= 0) {
            if (this._numPartitions != part) {
                String msg = "Cannot create DatabusCluster!  Cluster exists with num partitions=" + part + ". Tried to join with " + this._numPartitions + " partitions";
                throw new DatabusClusterException(msg);
            }
        } else {
            throw new DatabusClusterException("Cluster " + this._clusterName + " could not be accessed. Num partitions returned -1");
        }
        this._watcher = new DatabusHelixWatcher();
    }

    private static void updateHelixManagerZkSessionTimeout(int timeoutMs) {
        String timeoutStr = System.getProperty(HELIX_MANAGER_ZK_SESSION_TIMEOUT_KEY);
        if (null != timeoutStr) {
            try {
                int envTimeoutMs = Integer.parseInt(timeoutStr);
                if (envTimeoutMs >= timeoutMs) {
                    return;
                }
            }
            catch (NumberFormatException e) {
                LOG.warn((Object)("invalid existing value for zk.session.timeout: " + timeoutStr));
            }
        }
        System.setProperty(HELIX_MANAGER_ZK_SESSION_TIMEOUT_KEY, Integer.toString(timeoutMs));
    }

    protected static int getNumPartitionsInResource(ZKHelixAdmin admin, String clusterName, String resourceName) {
        if (admin != null) {
            try {
                IdealState idealState = admin.getResourceIdealState(clusterName, resourceName);
                if (idealState != null) {
                    return idealState.getNumPartitions();
                }
                return 0;
            }
            catch (Exception e) {
                LOG.warn((Object)("Resource " + resourceName + " not found in " + clusterName));
                return 0;
            }
        }
        return -1;
    }

    public static int create(ZKHelixAdmin admin, ZkClient zkClient, String clusterName, int numPartitions) {
        int part;
        boolean clusterAdded = true;
        try {
            if (zkClient.exists("/" + clusterName)) {
                throw new Exception("Cluster already exists !!");
            }
            clusterAdded = admin.addCluster(clusterName, false);
            if (!clusterAdded) {
                LOG.error((Object)("Problem creating cluster (" + clusterName + ")"));
            }
        }
        catch (Exception e) {
            LOG.warn((Object)("Warn! Cluster might already exist! " + clusterName + " Exception=" + e.getMessage()));
            clusterAdded = false;
        }
        if (clusterAdded) {
            try {
                admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
                admin.addResource(clusterName, DEFAULT_RESOURCE_NAME, numPartitions, DEFAULT_STATE_MODEL, IdealState.IdealStateModeProperty.AUTO_REBALANCE.toString());
                admin.rebalance(clusterName, DEFAULT_RESOURCE_NAME, 1);
            }
            catch (Exception e) {
                LOG.warn((Object)("Resource addition incomplete. May have been completed by another instance: " + e.getMessage()));
                clusterAdded = false;
            }
        }
        if ((part = DatabusCluster.getNumPartitionsInResource(admin, clusterName, DEFAULT_RESOURCE_NAME)) == 0) {
            long startTimeMs = System.currentTimeMillis();
            try {
                do {
                    Thread.sleep(100L);
                } while ((part = DatabusCluster.getNumPartitionsInResource(admin, clusterName, DEFAULT_RESOURCE_NAME)) == 0 && System.currentTimeMillis() - startTimeMs < 1000L);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)("Cluster create wait interrupted for cluster=" + clusterName + " exception= " + e.getMessage()));
            }
        }
        return part;
    }

    public void start() {
        if (this._watcher != null) {
            this._watcher.start();
        }
    }

    public void shutdown() {
        if (this._watcher != null) {
            this._watcher.stop();
        }
        if (this._dataNotifiers != null) {
            this._dataNotifiers.clear();
        }
        if (this._zkClient != null) {
            this._zkClient.close();
        }
    }

    public DatabusClusterMember addMember(String id) {
        return this.addMember(id, null);
    }

    public DatabusClusterMember addMember(String id, DatabusClusterNotifier notifier) {
        block5: {
            try {
                if (this._admin == null) break block5;
                InstanceConfig config = null;
                try {
                    config = this._admin.getInstanceConfig(this._clusterName, id);
                }
                catch (HelixException e) {
                    // empty catch block
                }
                if (config != null) {
                    LOG.warn((Object)("Member id already exists! Overwriting instance for id=" + id));
                    this._admin.dropInstance(this._clusterName, config);
                    config = null;
                }
                config = new InstanceConfig(id);
                config.setHostName(InetAddress.getLocalHost().getCanonicalHostName());
                config.setInstanceEnabled(true);
                this._admin.addInstance(this._clusterName, config);
                return new DatabusClusterMember(id, notifier);
            }
            catch (Exception e) {
                LOG.error((Object)("Error creating databus cluster member " + id + " exception:" + e));
            }
        }
        return null;
    }

    public synchronized void addDataNotifier(DatabusClusterDataNotifier notifier) {
        if (notifier != null) {
            this._dataNotifiers.add(notifier);
        } else {
            LOG.warn((Object)"Add failed. Attempting to add null DatabusClusterDataNotifier!");
        }
    }

    public synchronized void removeDataNotifier(DatabusClusterDataNotifier notifier) {
        if (notifier != null) {
            this._dataNotifiers.remove(notifier);
        }
    }

    public int getNumPartitions() {
        return this._numPartitions;
    }

    public int getNumActiveMembers() {
        return this._watcher.getNumActiveInstances();
    }

    public int getNumActivePartitions() {
        return this._watcher.getNumActivePartitions();
    }

    public HashMap<Integer, String> getActivePartitions() {
        return this._watcher.getPartitions();
    }

    public String getPartitionOwner(int partition) {
        return this._watcher.getOwnerId(partition);
    }

    public String getClusterName() {
        return this._clusterName;
    }

    public int getQuorum() {
        return this._quorum;
    }

    protected DatabusCluster() {
        this._admin = null;
        this._zkAddr = null;
        this._watcher = null;
        this._zkClient = null;
        this._clusterName = null;
        this._quorum = 0;
        this._numPartitions = 0;
        this._dataNotifiers = null;
        this._zkConnectionTimeoutMs = 60000;
        this._zkSessionTimeoutMs = 30000;
    }

    public String toString() {
        return "DatabusCluster [_zkClient=" + this._zkClient + ", _admin=" + this._admin + ", _clusterName=" + this._clusterName + ", _zkAddr=" + this._zkAddr + ", _quorum=" + this._quorum + ", _numPartitions=" + this._numPartitions + ", _dataNotifiers=" + this._dataNotifiers + ", _watcher=" + this._watcher + ", _zkConnectionTimeoutMs=" + this._zkConnectionTimeoutMs + ", _zkSessionTimeoutMs=" + this._zkSessionTimeoutMs + "]";
    }

    private class DatabusHelixWatcher
    implements LiveInstanceChangeListener,
    ExternalViewChangeListener {
        private final HelixManager _manager;
        private int _numActiveInstances = -1;
        private final Random _random = new Random(System.currentTimeMillis());
        private final HashMap<Integer, String> _partitionMap;
        private HelixManager _helixManager = null;
        private boolean _paused = false;
        private final int _id = this._random.nextInt();

        public DatabusHelixWatcher() throws Exception {
            this._manager = HelixManagerFactory.getZKHelixManager((String)DatabusCluster.this._clusterName, (String)("watcher_" + DatabusCluster.this._clusterName + "_" + this._id), (InstanceType)InstanceType.SPECTATOR, (String)DatabusCluster.this._zkAddr);
            this._partitionMap = new HashMap(DatabusCluster.this._numPartitions);
        }

        public void start() {
            if (this._manager != null) {
                try {
                    if (!this._manager.isConnected()) {
                        this._manager.connect();
                        this._manager.addLiveInstanceChangeListener((LiveInstanceChangeListener)this);
                        this._manager.addExternalViewChangeListener((ExternalViewChangeListener)this);
                    }
                }
                catch (Exception e) {
                    LOG.error((Object)("Cannot start HelixWatcher! " + e));
                }
            }
        }

        public void stop() {
            if (this._manager != null) {
                this._manager.disconnect();
            }
            this.stopHelixController();
        }

        public synchronized void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
            boolean quorumReached;
            this._numActiveInstances = liveInstances.size();
            boolean bl = quorumReached = this._numActiveInstances >= DatabusCluster.this._quorum;
            if (quorumReached) {
                if (this._helixManager == null) {
                    LOG.warn((Object)("Quorum Reached! numNodes=" + this._numActiveInstances + " quorum=" + DatabusCluster.this._quorum));
                    this.startHelixController();
                } else if (this._paused) {
                    this.resumeHelixController();
                    this._paused = false;
                }
            } else {
                LOG.warn((Object)("Number of nodes inadequate=" + this._numActiveInstances + " Need at least:" + DatabusCluster.this._quorum));
                if (this._helixManager != null && !this._paused) {
                    this.pauseHelixController();
                    this._paused = true;
                }
            }
            if (!DatabusCluster.this._dataNotifiers.isEmpty()) {
                Vector<String> nodeList = new Vector<String>(liveInstances.size());
                for (LiveInstance i : liveInstances) {
                    nodeList.add(i.getInstanceName());
                }
                for (DatabusClusterDataNotifier notifier : DatabusCluster.this._dataNotifiers) {
                    notifier.onInstanceChange(nodeList);
                }
            }
        }

        private Integer getPartition(String partition) {
            String[] ps = partition.split("_");
            if (ps.length >= 2) {
                return Integer.parseInt(ps[ps.length - 1]);
            }
            return -1;
        }

        public synchronized void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
            this._partitionMap.clear();
            for (ExternalView v : externalViewList) {
                if (!v.getResourceName().equals(DatabusCluster.DEFAULT_RESOURCE_NAME)) continue;
                for (String k : v.getPartitionSet()) {
                    Map map = v.getStateMap(k);
                    if (map == null) continue;
                    for (Map.Entry mkPair : map.entrySet()) {
                        String value = (String)mkPair.getValue();
                        if (value == null) continue;
                        Integer partition = this.getPartition(k);
                        if (!value.equals("ONLINE")) continue;
                        this._partitionMap.put(partition, (String)mkPair.getKey());
                    }
                }
            }
            if (!DatabusCluster.this._dataNotifiers.isEmpty()) {
                HashMap<Integer, String> pmap = this.getPartitions();
                for (DatabusClusterDataNotifier notifier : DatabusCluster.this._dataNotifiers) {
                    notifier.onPartitionMappingChange(pmap);
                }
            }
        }

        public synchronized int getNumActiveInstances() {
            return this._numActiveInstances;
        }

        public synchronized int getNumActivePartitions() {
            return this._partitionMap.size();
        }

        public synchronized String getOwnerId(int numPartition) {
            return this._partitionMap.get(numPartition);
        }

        public synchronized HashMap<Integer, String> getPartitions() {
            HashMap<Integer, String> map = new HashMap<Integer, String>(this._partitionMap.size());
            map.putAll(this._partitionMap);
            return map;
        }

        void stopHelixController() {
            if (this._helixManager != null) {
                LOG.warn((Object)("Shutting down cluster : " + this._helixManager.getClusterName() + " instance:" + this._helixManager.getInstanceName()));
                this._helixManager.disconnect();
            }
        }

        void pauseHelixController() {
            if (DatabusCluster.this._admin != null) {
                LOG.warn((Object)("Pausing  cluster : " + DatabusCluster.this._clusterName));
                DatabusCluster.this._admin.enableCluster(DatabusCluster.this._clusterName, false);
            }
        }

        void resumeHelixController() {
            if (DatabusCluster.this._admin != null) {
                LOG.warn((Object)("Resuming  cluster : " + DatabusCluster.this._clusterName));
                DatabusCluster.this._admin.enableCluster(DatabusCluster.this._clusterName, true);
            }
        }

        void startHelixController() {
            try {
                String controllerId = "controller_" + this._id;
                LOG.info((Object)("Starting cluster controller for cluster=" + DatabusCluster.this._clusterName + " with id = " + controllerId));
                this._helixManager = HelixControllerMain.startHelixController((String)DatabusCluster.this._zkAddr, (String)DatabusCluster.this._clusterName, (String)controllerId, (String)"STANDALONE");
                if (DatabusCluster.this._admin != null) {
                    DatabusCluster.this._admin.enableCluster(this._helixManager.getClusterName(), true);
                }
            }
            catch (Exception e) {
                LOG.error((Object)("Cannot start cluster controller for cluster=" + DatabusCluster.this._clusterName + e));
            }
        }
    }

    public static class DatabusClusterException
    extends Exception {
        public DatabusClusterException(String msg) {
            super(msg);
        }
    }

    public class DatabusClusterMember {
        private final HelixManager _manager;
        private final String _id;
        private DatabusClusterNotifier _notifier = null;

        DatabusClusterMember(String id, DatabusClusterNotifier notifier) throws Exception {
            this._id = id;
            this._manager = HelixManagerFactory.getZKHelixManager((String)DatabusCluster.this._clusterName, (String)this._id, (InstanceType)InstanceType.PARTICIPANT, (String)DatabusCluster.this._zkAddr);
            this._notifier = notifier;
            this.registerNotifier();
        }

        void registerNotifier() {
            StateMachineEngine stateMach = this._manager.getStateMachineEngine();
            DatabusClusterNotifierFactory modelFactory = new DatabusClusterNotifierFactory(this._notifier);
            stateMach.registerStateModelFactory(DatabusCluster.DEFAULT_STATE_MODEL, (StateModelFactory)modelFactory);
        }

        public boolean join() {
            if (this._manager != null) {
                if (!this._manager.isConnected()) {
                    try {
                        this._manager.connect();
                        return true;
                    }
                    catch (Exception e) {
                        LOG.error((Object)("Member " + this._id + " could not connect! " + e));
                    }
                } else {
                    LOG.warn((Object)("Member " + this._id + " cannot join. Already joined! "));
                    return true;
                }
            }
            return false;
        }

        public boolean leave() {
            if (this._manager != null) {
                try {
                    this._manager.disconnect();
                }
                catch (Exception e) {
                    LOG.error((Object)("Member " + this._id + " could not disconnect! " + e));
                }
            }
            if (DatabusCluster.this._admin != null) {
                try {
                    DatabusCluster.this._admin.dropInstance(DatabusCluster.this._clusterName, DatabusCluster.this._admin.getInstanceConfig(DatabusCluster.this._clusterName, this._id));
                }
                catch (HelixException e) {
                    LOG.warn((Object)("Drop instance failed for id= " + this._id + " exception" + (Object)((Object)e)));
                }
            }
            return true;
        }

        protected DatabusClusterMember() {
            this._manager = null;
            this._id = null;
        }
    }
}

