/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.registration;

import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.DbusPartitionInfoImpl;
import com.linkedin.databus.client.pub.CheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.ClusterCheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.DatabusClientException;
import com.linkedin.databus.client.pub.DatabusRegistration;
import com.linkedin.databus.client.pub.DbusClusterConsumerFactory;
import com.linkedin.databus.client.pub.DbusClusterInfo;
import com.linkedin.databus.client.pub.DbusPartitionInfo;
import com.linkedin.databus.client.pub.DbusPartitionListener;
import com.linkedin.databus.client.pub.DbusServerSideFilterFactory;
import com.linkedin.databus.client.pub.FetchMaxSCNRequest;
import com.linkedin.databus.client.pub.FlushRequest;
import com.linkedin.databus.client.pub.RegistrationId;
import com.linkedin.databus.client.pub.RelayFindMaxSCNResult;
import com.linkedin.databus.client.pub.RelayFlushMaxSCNResult;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStats;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStatsMBean;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStatsMBean;
import com.linkedin.databus.client.pub.monitoring.events.ConsumerCallbackStatsEvent;
import com.linkedin.databus.client.pub.monitoring.events.UnifiedClientStatsEvent;
import com.linkedin.databus.client.registration.ClusterRegistrationStaticConfig;
import com.linkedin.databus.client.registration.DatabusMultiPartitionRegistration;
import com.linkedin.databus.client.registration.DatabusV2RegistrationImpl;
import com.linkedin.databus.client.registration.RegistrationIdGenerator;
import com.linkedin.databus.cluster.DatabusCluster;
import com.linkedin.databus.cluster.DatabusClusterDataNotifier;
import com.linkedin.databus.cluster.DatabusClusterNotifier;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.monitoring.mbean.AggregatedDbusEventsStatisticsCollector;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollectorMBean;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectorMergeable;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectors;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilterConfig;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public class DatabusV2ClusterRegistrationImpl
implements DatabusMultiPartitionRegistration,
DatabusClusterNotifier,
DatabusClusterDataNotifier {
    private DatabusRegistration.RegistrationState _state;
    private RegistrationId _id;
    private final DatabusHttpClientImpl _client;
    private Status _status = new Status();
    public final Map<DbusPartitionInfo, DatabusRegistration> regMap = new HashMap<DbusPartitionInfo, DatabusRegistration>();
    private final ClusterCheckpointPersistenceProvider.StaticConfig _ckptPersistenceProviderConfig;
    private StatsCollectors<ConsumerCallbackStats> _relayCallbackStatsMerger;
    private StatsCollectors<ConsumerCallbackStats> _bootstrapCallbackStatsMerger;
    private StatsCollectors<UnifiedClientStats> _unifiedClientStatsMerger;
    private StatsCollectors<DbusEventsStatisticsCollector> _relayEventStatsMerger;
    private StatsCollectors<DbusEventsStatisticsCollector> _bootstrapEventStatsMerger;
    private Logger _log;
    private final DbusServerSideFilterFactory _serverSideFilterFactory;
    private final DbusClusterConsumerFactory _consumerFactory;
    private final DbusPartitionListener _partitionListener;
    private final Set<DbusPartitionInfo> _partitionSet;
    private final DbusClusterInfo _clusterInfo;
    private final List<String> _sources;
    private List<String> _currentActiveNodes;
    private Map<Integer, String> _activePartitionMap;
    private final ClusterRegistrationStaticConfig _clientClusterConfig;
    private DatabusCluster _cluster;
    private DatabusCluster.DatabusClusterMember _clusterMember;

    public DatabusV2ClusterRegistrationImpl(RegistrationId id, DatabusHttpClientImpl client, ClusterCheckpointPersistenceProvider.StaticConfig ckptPersistenceProviderConfig, DbusClusterInfo clusterInfo, DbusClusterConsumerFactory consumerFactory, DbusServerSideFilterFactory filterFactory, DbusPartitionListener partitionListener, String[] sources) {
        this._client = client;
        this._id = id;
        this._ckptPersistenceProviderConfig = ckptPersistenceProviderConfig;
        this._state = DatabusRegistration.RegistrationState.INIT;
        this._log = Logger.getLogger((String)(this.getClass().getName() + (null == this._id ? "" : "." + this._id.getId())));
        this._consumerFactory = consumerFactory;
        this._serverSideFilterFactory = filterFactory;
        this._partitionListener = partitionListener;
        this._partitionSet = new HashSet<DbusPartitionInfo>();
        this._sources = new ArrayList<String>();
        this._clusterInfo = clusterInfo;
        this._clientClusterConfig = client.getClientStaticConfig().getClientCluster(clusterInfo.getName());
        if (null != sources) {
            this._sources.addAll(Arrays.asList(sources));
        }
    }

    public synchronized void setLogger(Logger log) {
        this._log = log;
    }

    public synchronized boolean start() throws IllegalStateException, DatabusClientException {
        if (this._state == DatabusRegistration.RegistrationState.INIT || this._state == DatabusRegistration.RegistrationState.DEREGISTERED) {
            String errMsg = "Registration (" + this._id + ") cannot be started from its current state, which is " + this._state + " .It may only be started from REGISTERED or SHUTDOWN state";
            this._log.error((Object)errMsg);
            throw new IllegalStateException(errMsg);
        }
        if (this._state.isRunning()) {
            this._log.info((Object)("Registration (" + this._id + ") already started !!"));
            return false;
        }
        String host = null;
        try {
            host = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            this._log.error((Object)"Unable to fetch the local hostname !! Trying to fetch IP Address !!", (Throwable)e);
            try {
                host = InetAddress.getLocalHost().getHostAddress();
            }
            catch (UnknownHostException e1) {
                this._log.error((Object)"Unable to fetch the local IP Address too !! Giving up", (Throwable)e1);
                host = "UNKNOWN_HOST";
            }
        }
        String id = host + "-" + this._client.getContainerStaticConfig().getHttpPort() + "-" + this._client.getContainerStaticConfig().getId();
        try {
            this._cluster = this.createCluster();
            this._cluster.start();
        }
        catch (Exception e) {
            this._log.fatal((Object)("Got exception while trying to create the cluster with id (" + id + ")"), (Throwable)e);
            throw new DatabusClientException((Throwable)e);
        }
        this.initializeStatsCollectors();
        this._log.info((Object)("Dabatus cluster object created : " + this._cluster + " with id :" + id));
        this._clusterMember = this._cluster.addMember(id, (DatabusClusterNotifier)this);
        boolean joined = this._clusterMember.join();
        if (!joined) {
            this._log.fatal((Object)("Unable to join the cluster " + this._clusterInfo));
            throw new DatabusClientException("Unable to join the cluster :" + this._clusterInfo);
        }
        this._state = DatabusRegistration.RegistrationState.STARTED;
        this.activatePartitions();
        return true;
    }

    public synchronized void shutdown() throws IllegalStateException {
        boolean left = true;
        if (!this._state.isRunning()) {
            this._log.warn((Object)("Registration (" + this._id + ") is not in running state to be shutdown. Current state :" + this._state));
            return;
        }
        for (Map.Entry<DbusPartitionInfo, DatabusRegistration> e : this.regMap.entrySet()) {
            this._log.info((Object)("Shutting registration for Partition :" + e.getKey()));
            try {
                if (!e.getValue().getState().isRunning()) continue;
                e.getValue().shutdown();
            }
            catch (Exception ex) {
                this._log.error((Object)("Unable to shutdown registration for partition :" + e.getKey()), (Throwable)ex);
            }
        }
        if (null != this._clusterMember) {
            left = this._clusterMember.leave();
        }
        if (!left) {
            this._log.error((Object)("Unable to leave the cluster cleanly !!" + this._clusterInfo));
        }
        if (null != this._cluster) {
            this._cluster.shutdown();
            ClusterCheckpointPersistenceProvider.close((String)this._cluster.getClusterName());
        }
        this._state = DatabusRegistration.RegistrationState.SHUTDOWN;
    }

    public synchronized void pause() throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.PAUSED) {
            return;
        }
        if (this._state != DatabusRegistration.RegistrationState.STARTED && this._state != DatabusRegistration.RegistrationState.RESUMED) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in correct state to be paused. Current state :" + this._state);
        }
        for (Map.Entry<DbusPartitionInfo, DatabusRegistration> e : this.regMap.entrySet()) {
            this._log.info((Object)("Shutting registration for Partition :" + e.getKey()));
            try {
                e.getValue().pause();
            }
            catch (Exception ex) {
                this._log.error((Object)("Unable to pause registration for partition :" + e.getKey()), (Throwable)ex);
            }
        }
        this._state = DatabusRegistration.RegistrationState.PAUSED;
    }

    public synchronized void suspendOnError(Throwable ex) throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.SUSPENDED_ON_ERROR) {
            return;
        }
        if (!this._state.isRunning()) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in correct state to be suspended. Current state :" + this._state);
        }
        for (Map.Entry<DbusPartitionInfo, DatabusRegistration> e : this.regMap.entrySet()) {
            this._log.info((Object)("Shutting registration for Partition :" + e.getKey()));
            try {
                e.getValue().suspendOnError(ex);
            }
            catch (Exception e1) {
                this._log.error((Object)("Unable to suspend registration for partition :" + e.getKey()), (Throwable)e1);
            }
        }
        this._state = DatabusRegistration.RegistrationState.SUSPENDED_ON_ERROR;
    }

    public synchronized void resume() throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.RESUMED) {
            return;
        }
        if (this._state != DatabusRegistration.RegistrationState.PAUSED && this._state != DatabusRegistration.RegistrationState.SUSPENDED_ON_ERROR) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in correct state to be resumed. Current state :" + this._state);
        }
        for (Map.Entry<DbusPartitionInfo, DatabusRegistration> e : this.regMap.entrySet()) {
            this._log.info((Object)("Shutting registration for Partition :" + e.getKey()));
            try {
                e.getValue().resume();
            }
            catch (Exception e1) {
                this._log.error((Object)("Unable to resume registration for partition :" + e.getKey()), (Throwable)e1);
            }
        }
        this._state = DatabusRegistration.RegistrationState.RESUMED;
    }

    public DatabusRegistration.RegistrationState getState() {
        return this._state;
    }

    public synchronized boolean deregister() throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.DEREGISTERED || this._state == DatabusRegistration.RegistrationState.INIT) {
            return false;
        }
        if (this._state.isRunning()) {
            this.shutdown();
        }
        this._client.deregister(this);
        this.regMap.clear();
        this._state = DatabusRegistration.RegistrationState.DEREGISTERED;
        return true;
    }

    public Collection<DatabusSubscription> getSubscriptions() {
        HashSet<DatabusSubscription> subscriptions = new HashSet<DatabusSubscription>();
        for (Map.Entry<DbusPartitionInfo, DatabusRegistration> e : this.regMap.entrySet()) {
            subscriptions.addAll(e.getValue().getSubscriptions());
        }
        return subscriptions;
    }

    public DatabusComponentStatus getStatus() {
        return this._status;
    }

    public synchronized Logger getLogger() {
        return this._log;
    }

    public DatabusRegistration getParent() {
        return null;
    }

    public synchronized DatabusRegistration withRegId(RegistrationId regId) throws DatabusClientException, IllegalStateException {
        if (this._id != null && this._id.equals((Object)regId)) {
            return this;
        }
        if (!RegistrationIdGenerator.isIdValid(regId)) {
            throw new DatabusClientException("Another registration with the same regId (" + regId + ") already present !!");
        }
        if (this._state.isRunning()) {
            throw new IllegalStateException("Cannot update regId when registration is in running state. RegId :" + this._id + ", State :" + this._state);
        }
        this._id = regId;
        this._status = new Status();
        return this;
    }

    public synchronized DatabusRegistration withServerSideFilter(DbusKeyCompositeFilterConfig filterConfig) throws IllegalStateException {
        return this;
    }

    public Collection<DbusPartitionInfo> getPartitions() {
        HashSet<DbusPartitionInfo> partitions = new HashSet<DbusPartitionInfo>();
        for (DbusPartitionInfo p : this.regMap.keySet()) {
            partitions.add(p);
        }
        return partitions;
    }

    public Checkpoint getLastPersistedCheckpoint() {
        throw new RuntimeException("Operation Not supported !!");
    }

    public boolean storeCheckpoint(Checkpoint ckpt) throws IllegalStateException {
        throw new RuntimeException("Operation Not supported !!");
    }

    public DbusEventsStatisticsCollectorMBean getRelayEventStats() {
        return (DbusEventsStatisticsCollectorMBean)this._relayEventStatsMerger.getStatsCollector();
    }

    public DbusEventsStatisticsCollectorMBean getBootstrapEventStats() {
        return (DbusEventsStatisticsCollectorMBean)this._bootstrapEventStatsMerger.getStatsCollector();
    }

    public ConsumerCallbackStatsMBean getRelayCallbackStats() {
        return (ConsumerCallbackStatsMBean)this._relayCallbackStatsMerger.getStatsCollector();
    }

    public ConsumerCallbackStatsMBean getBootstrapCallbackStats() {
        return (ConsumerCallbackStatsMBean)this._bootstrapCallbackStatsMerger.getStatsCollector();
    }

    public UnifiedClientStatsMBean getUnifiedClientStats() {
        return (UnifiedClientStatsMBean)this._unifiedClientStatsMerger.getStatsCollector();
    }

    public RelayFindMaxSCNResult fetchMaxSCN(FetchMaxSCNRequest request) throws InterruptedException {
        throw new RuntimeException("Operation Not supported !!");
    }

    public RelayFlushMaxSCNResult flush(RelayFindMaxSCNResult fetchSCNResult, FlushRequest flushRequest) throws InterruptedException {
        throw new RuntimeException("Operation Not supported !!");
    }

    public RelayFlushMaxSCNResult flush(FetchMaxSCNRequest maxScnRequest, FlushRequest flushRequest) throws InterruptedException {
        throw new RuntimeException("Operation Not supported !!");
    }

    public RegistrationId getRegistrationId() {
        return this._id;
    }

    @Override
    public Map<DbusPartitionInfo, DatabusRegistration> getPartitionRegs() {
        return this.regMap;
    }

    protected String getStatusName() {
        return "Status" + (this._id != null ? "_" + this._id.getId() : "");
    }

    protected synchronized void initializeStatsCollectors() {
        MBeanServer mbeanServer = null;
        int ownerId = -1;
        long pullerThreadDeadnessThresholdMs = 300000L;
        if (null != this._client) {
            mbeanServer = this._client.getMbeanServer();
            ownerId = this._client.getContainerStaticConfig().getId();
            pullerThreadDeadnessThresholdMs = this._client.getClientStaticConfig().getPullerThreadDeadnessThresholdMs();
        }
        String regId = null != this._id ? this._id.getId() : "unknownReg";
        ConsumerCallbackStats relayConsumerStats = new ConsumerCallbackStats(ownerId, regId + ".callback.relay", regId, true, false, new ConsumerCallbackStatsEvent());
        ConsumerCallbackStats bootstrapConsumerStats = new ConsumerCallbackStats(ownerId, regId + ".callback.bootstrap", regId, true, false, new ConsumerCallbackStatsEvent());
        UnifiedClientStats unifiedClientStats = new UnifiedClientStats(ownerId, regId + ".callback.unified", regId, true, false, pullerThreadDeadnessThresholdMs, new UnifiedClientStatsEvent());
        this._relayCallbackStatsMerger = new StatsCollectors((StatsCollectorMergeable)relayConsumerStats);
        this._bootstrapCallbackStatsMerger = new StatsCollectors((StatsCollectorMergeable)bootstrapConsumerStats);
        this._unifiedClientStatsMerger = new StatsCollectors((StatsCollectorMergeable)unifiedClientStats);
        this._relayEventStatsMerger = new StatsCollectors((StatsCollectorMergeable)new AggregatedDbusEventsStatisticsCollector(ownerId, regId + ".inbound", true, false, mbeanServer));
        this._bootstrapEventStatsMerger = new StatsCollectors((StatsCollectorMergeable)new AggregatedDbusEventsStatisticsCollector(ownerId, regId + ".inbound.bs", true, false, mbeanServer));
        if (null != this._client) {
            this._client.getBootstrapEventsStats().addStatsCollector(regId, this._bootstrapEventStatsMerger.getStatsCollector());
            this._client.getInBoundStatsCollectors().addStatsCollector(regId, this._relayEventStatsMerger.getStatsCollector());
            this._client.getRelayConsumerStatsCollectors().addStatsCollector(regId, this._relayCallbackStatsMerger.getStatsCollector());
            this._client.getBootstrapConsumerStatsCollectors().addStatsCollector(regId, this._bootstrapCallbackStatsMerger.getStatsCollector());
            this._client.getUnifiedClientStatsCollectors().addStatsCollector(regId, this._unifiedClientStatsMerger.getStatsCollector());
            this._client.getGlobalStatsMerger().registerStatsCollector(this._relayEventStatsMerger);
            this._client.getGlobalStatsMerger().registerStatsCollector(this._bootstrapEventStatsMerger);
            this._client.getGlobalStatsMerger().registerStatsCollector(this._relayCallbackStatsMerger);
            this._client.getGlobalStatsMerger().registerStatsCollector(this._bootstrapCallbackStatsMerger);
            this._client.getGlobalStatsMerger().registerStatsCollector(this._unifiedClientStatsMerger);
        }
    }

    private synchronized void addPartition(DbusPartitionInfo partition) throws IllegalStateException, DatabusClientException {
        if (this._state == DatabusRegistration.RegistrationState.REGISTERED) {
            this._partitionSet.add(partition);
        } else if (this._state.isRunning()) {
            if (!this._partitionSet.contains(partition)) {
                this._partitionSet.add(partition);
                this.activateOnePartition(partition);
            } else {
                this._log.info((Object)("Partition (" + partition + ") already added !!"));
            }
        } else {
            throw new IllegalStateException("Registration is not in correct state to add a partition !! State :" + this._state);
        }
    }

    private synchronized void activatePartitions() throws DatabusClientException {
        this._state = DatabusRegistration.RegistrationState.STARTED;
        for (DbusPartitionInfo p : this._partitionSet) {
            this.activateOnePartition(p);
        }
    }

    private synchronized void activateOnePartition(DbusPartitionInfo partition) throws DatabusClientException {
        this._log.info((Object)("Trying to activate partition :" + partition));
        try {
            if (this.regMap.containsKey(partition)) {
                this._log.info((Object)("Partition (" + partition + ") is already added and is currently in state : " + this.regMap.get(partition).getState() + " skipping !!"));
                return;
            }
            Collection consumers = this._consumerFactory.createPartitionedConsumers(this._clusterInfo, partition);
            DbusKeyCompositeFilterConfig filterConfig = null;
            if (this._serverSideFilterFactory != null) {
                filterConfig = this._serverSideFilterFactory.createServerSideFilter(this._clusterInfo, partition);
            }
            if (null == consumers || consumers.isEmpty()) {
                this._log.error((Object)("ConsumerFactory for cluster (" + this._clusterInfo + ") returned null or empty consumers "));
                throw new DatabusClientException("ConsumerFactory for cluster (" + this._clusterInfo + ") returned null or empty consumers");
            }
            RegistrationId id = new RegistrationId(this._id + "-" + partition.getPartitionId());
            CheckpointPersistenceProvider ckptProvider = this.createCheckpointPersistenceProvider(partition);
            DatabusV2RegistrationImpl reg = this.createChildRegistration(id, this._client, ckptProvider);
            reg.addDatabusConsumers(consumers);
            String[] srcs = new String[this._sources.size()];
            srcs = this._sources.toArray(srcs);
            reg.addSubscriptions(srcs);
            this.regMap.put(partition, reg);
            reg.onRegister();
            if (null != filterConfig) {
                reg.withServerSideFilter(filterConfig);
            }
            if (null != this._partitionListener) {
                this._partitionListener.onAddPartition(partition, (DatabusRegistration)reg);
            }
            try {
                reg.start();
            }
            catch (DatabusClientException e) {
                this._log.error((Object)("Got exception while starting the registration for partition (" + partition + ")"), (Throwable)e);
                throw e;
            }
            this._relayEventStatsMerger.addStatsCollector(id.getId(), (StatsCollectorMergeable)((DbusEventsStatisticsCollector)reg.getRelayEventStats()));
            this._bootstrapEventStatsMerger.addStatsCollector(id.getId(), (StatsCollectorMergeable)((DbusEventsStatisticsCollector)reg.getBootstrapEventStats()));
            this._relayCallbackStatsMerger.addStatsCollector(id.getId(), (StatsCollectorMergeable)((ConsumerCallbackStats)reg.getRelayCallbackStats()));
            this._bootstrapCallbackStatsMerger.addStatsCollector(id.getId(), (StatsCollectorMergeable)((ConsumerCallbackStats)reg.getBootstrapCallbackStats()));
            this._log.info((Object)("Partition (" + partition + ") started !!"));
        }
        catch (DatabusException e) {
            this._log.error((Object)("Got exception while activating partition(" + partition + ")"), (Throwable)e);
            throw new DatabusClientException((Throwable)e);
        }
        catch (ClusterCheckpointPersistenceProvider.ClusterCheckpointException e) {
            this._log.error((Object)("Got exception while activating partition(" + partition + ")"), (Throwable)e);
            throw new DatabusClientException((Throwable)e);
        }
    }

    private DatabusV2RegistrationImpl createChildRegistration(RegistrationId id, DatabusHttpClientImpl client, CheckpointPersistenceProvider ckptProvider) {
        return new DatabusClusterChildRegistrationImpl(id, client, this, ckptProvider);
    }

    private synchronized void dropOnePartition(DbusPartitionInfo partition) throws DatabusException {
        if (this._state == DatabusRegistration.RegistrationState.REGISTERED) {
            this._partitionSet.remove(partition);
        } else if (this._state.isRunning()) {
            if (!this.regMap.containsKey(partition)) {
                this._log.warn((Object)("Partition (" + partition + ") not available to be dropped. skipping !! Active Partitions :" + this.regMap.keySet()));
                return;
            }
            DatabusRegistration reg = this.regMap.get(partition);
            reg.deregister();
            this.regMap.remove(partition);
            this._partitionSet.remove(partition);
            this._relayEventStatsMerger.removeStatsCollector(reg.getRegistrationId().getId());
            this._bootstrapEventStatsMerger.removeStatsCollector(reg.getRegistrationId().getId());
            this._relayCallbackStatsMerger.removeStatsCollector(reg.getRegistrationId().getId());
            this._bootstrapCallbackStatsMerger.removeStatsCollector(reg.getRegistrationId().getId());
            this._partitionListener.onDropPartition(partition, reg);
        } else {
            throw new IllegalStateException("Registration is not in correct state to drop a partition !! State :" + this._state);
        }
    }

    public synchronized void onGainedPartitionOwnership(int partition) {
        this._log.info((Object)("Partition (" + partition + ") getting added !!"));
        DbusPartitionInfoImpl partitionInfo = new DbusPartitionInfoImpl(partition);
        try {
            this.addPartition(partitionInfo);
        }
        catch (DatabusClientException e) {
            this._log.error((Object)"Unable to add partition. Shutting down the cluster !!", (Throwable)e);
            this.deregister();
        }
    }

    public synchronized void onLostPartitionOwnership(int partition) {
        this._log.info((Object)("Partition (" + partition + ") getting removed !!"));
        DbusPartitionInfoImpl partitionInfo = new DbusPartitionInfoImpl(partition);
        try {
            this.dropOnePartition(partitionInfo);
        }
        catch (DatabusException e) {
            this._log.error((Object)"Unable to drop partition. Shutting down the cluster !!", (Throwable)e);
            this.deregister();
        }
    }

    public synchronized void onError(int partition) {
        this._log.error((Object)"Error notification received for partition");
        this.onLostPartitionOwnership(partition);
    }

    public synchronized void onReset(int partition) {
        this._log.error((Object)"Reset notification received for partition");
        this.onLostPartitionOwnership(partition);
    }

    public synchronized void addSubscriptions(String ... sources) throws IllegalStateException {
        if (!this._state.isPreStartState()) {
            throw new IllegalStateException("Cannot add sources when state is running/shutdown. Current State :" + this._state);
        }
        for (String s : sources) {
            if (this._sources.contains(s)) continue;
            this._sources.add(s);
        }
    }

    public synchronized void removeSubscriptions(String ... sources) throws IllegalStateException {
        if (!this._state.isRunning()) {
            throw new IllegalStateException("Cannot remove sources when state is running. Current State :" + this._state);
        }
        for (String s : sources) {
            this._sources.remove(s);
        }
    }

    public synchronized void onRegister() {
        this._state = DatabusRegistration.RegistrationState.REGISTERED;
    }

    public synchronized void onInstanceChange(List<String> activeNodes) {
        if (this._currentActiveNodes == null) {
            this._currentActiveNodes = new ArrayList<String>();
        }
        ArrayList<String> newOfflineNodes = new ArrayList<String>();
        ArrayList<String> newOnlineNodes = new ArrayList<String>();
        DatabusV2ClusterRegistrationImpl.findDiff(this._currentActiveNodes, activeNodes, newOfflineNodes);
        DatabusV2ClusterRegistrationImpl.findDiff(activeNodes, this._currentActiveNodes, newOnlineNodes);
        if (!newOfflineNodes.isEmpty()) {
            this._log.info((Object)("The following nodes went offline :" + newOfflineNodes));
        }
        if (!newOnlineNodes.isEmpty()) {
            this._log.info((Object)("The following nodes came online :" + newOnlineNodes));
        }
        this._currentActiveNodes.clear();
        this._currentActiveNodes.addAll(activeNodes);
    }

    private static void findDiff(Collection<String> lhs, Collection<String> rhs, Collection<String> result) {
        for (String c : lhs) {
            if (rhs.contains(c)) continue;
            result.add(c);
        }
    }

    public synchronized void onPartitionMappingChange(Map<Integer, String> activePartitionMap) {
        int i = 0;
        while ((long)i < this._clusterInfo.getNumTotalPartitions()) {
            if (!activePartitionMap.containsKey(i)) {
                this._log.error((Object)("No Client listening to partition : " + i));
            }
            ++i;
        }
        if (null == this._activePartitionMap) {
            this._activePartitionMap = new HashMap<Integer, String>();
        }
        this._activePartitionMap.clear();
        this._activePartitionMap.putAll(activePartitionMap);
        this._log.info((Object)("Current Partition to Client mapping :" + activePartitionMap));
    }

    protected DatabusCluster createCluster() throws Exception {
        return new DatabusCluster(this._clientClusterConfig);
    }

    protected CheckpointPersistenceProvider createCheckpointPersistenceProvider(DbusPartitionInfo partition) throws InvalidConfigException, ClusterCheckpointPersistenceProvider.ClusterCheckpointException {
        return new ClusterCheckpointPersistenceProvider(partition.getPartitionId(), this._ckptPersistenceProviderConfig);
    }

    public List<String> getCurrentActiveNodes() {
        return this._currentActiveNodes;
    }

    public Map<Integer, String> getActivePartitionMap() {
        return this._activePartitionMap;
    }

    public DbusClusterInfo getClusterInfo() {
        return this._clusterInfo;
    }

    public DbusKeyCompositeFilterConfig getFilterConfig() {
        return null;
    }

    private static class DatabusClusterChildRegistrationImpl
    extends DatabusV2RegistrationImpl {
        public DatabusClusterChildRegistrationImpl(RegistrationId id, DatabusHttpClientImpl client, DatabusRegistration parentReg, CheckpointPersistenceProvider ckptProvider) {
            super(id, client, ckptProvider);
            this.setParent(parentReg);
        }

        @Override
        protected void deregisterFromClient() {
        }

        @Override
        protected synchronized void initializeStatsCollectors() {
            MBeanServer mbeanServer = null;
            if (null != this._client) {
                mbeanServer = this._client.getClientStaticConfig().isEnablePerConnectionStats() ? this._client.getMbeanServer() : null;
            }
            int ownerId = null == this._client ? -1 : this._client.getContainerStaticConfig().getId();
            String regId = null != this._id ? this._id.getId() : "unknownReg";
            this.initializeStatsCollectors(regId, ownerId, mbeanServer);
            if (null != this._client && this._client.getClientStaticConfig().isEnablePerConnectionStats()) {
                this._client.getBootstrapEventsStats().addStatsCollector(regId, (StatsCollectorMergeable)this._bootstrapEventsStatsCollector);
                this._client.getInBoundStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._inboundEventsStatsCollector);
                this._client.getRelayConsumerStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._relayConsumerStats);
                this._client.getBootstrapConsumerStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._bootstrapConsumerStats);
                this._client.getUnifiedClientStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._unifiedClientStats);
            }
        }
    }

    public class Status
    extends DatabusComponentStatus {
        public Status() {
            super(DatabusV2ClusterRegistrationImpl.this.getStatusName());
        }

        public void start() {
            try {
                DatabusV2ClusterRegistrationImpl.this.start();
            }
            catch (Exception e) {
                this._log.error((Object)"Got exception while trying to start the registration", (Throwable)e);
                throw new RuntimeException(e);
            }
            super.start();
        }

        public void shutdown() {
            DatabusV2ClusterRegistrationImpl.this.shutdown();
            super.shutdown();
        }

        public void pause() {
            DatabusV2ClusterRegistrationImpl.this.pause();
            super.pause();
        }

        public void resume() {
            DatabusV2ClusterRegistrationImpl.this.resume();
            super.resume();
        }

        public void suspendOnError(Throwable error) {
            DatabusV2ClusterRegistrationImpl.this.suspendOnError(error);
            super.suspendOnError(error);
        }
    }
}

