/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.registration;

import com.linkedin.databus.client.ConnectionStateFactory;
import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.consumer.AbstractDatabusCombinedConsumer;
import com.linkedin.databus.client.consumer.DatabusV2ConsumerRegistration;
import com.linkedin.databus.client.pub.CheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.DatabusClientException;
import com.linkedin.databus.client.pub.DatabusCombinedConsumer;
import com.linkedin.databus.client.pub.DatabusRegistration;
import com.linkedin.databus.client.pub.DbusPartitionInfo;
import com.linkedin.databus.client.pub.FetchMaxSCNRequest;
import com.linkedin.databus.client.pub.FlushRequest;
import com.linkedin.databus.client.pub.RegistrationId;
import com.linkedin.databus.client.pub.RelayFindMaxSCNResult;
import com.linkedin.databus.client.pub.RelayFlushMaxSCNResult;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStats;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStatsMBean;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStatsMBean;
import com.linkedin.databus.client.pub.monitoring.events.ConsumerCallbackStatsEvent;
import com.linkedin.databus.client.pub.monitoring.events.UnifiedClientStatsEvent;
import com.linkedin.databus.client.registration.RegistrationIdGenerator;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollectorMBean;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectorMergeable;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilterConfig;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public class DatabusV2RegistrationImpl
implements DatabusRegistration {
    private DatabusRegistration.RegistrationState _state;
    protected RegistrationId _id;
    private final Logger _log;
    private final CheckpointPersistenceProvider _checkpointPersistenceProvider;
    protected DbusEventsStatisticsCollector _inboundEventsStatsCollector;
    protected DbusEventsStatisticsCollector _bootstrapEventsStatsCollector;
    protected ConsumerCallbackStats _relayConsumerStats;
    protected ConsumerCallbackStats _bootstrapConsumerStats;
    protected UnifiedClientStats _unifiedClientStats;
    private final List<DatabusCombinedConsumer> _consumers;
    private final List<String> _sources;
    private DatabusSourcesConnection _sourcesConnection;
    private DatabusRegistration _parent = null;
    protected final DatabusHttpClientImpl _client;
    private Status _status = null;
    private DbusKeyCompositeFilterConfig _filterConfig = null;
    private List<DatabusV2ConsumerRegistration> _streamConsumerRawRegistrations;
    private List<DatabusV2ConsumerRegistration> _bootstrapConsumerRawRegistrations;
    public static final String STREAM_EVENT_STATS_SUFFIX_NAME = ".inbound";
    public static final String BOOTSTRAP_EVENT_STATS_SUFFIX_NAME = ".inbound.bs";
    public static final String RELAY_CONSUMER_STATS_SUFFIX_NAME = ".callback.relay";
    public static final String BOOTSTRAP_CONSUMER_STATS_SUFFIX_NAME = ".callback.bootstrap";
    public static final String UNIFIED_CLIENT_STATS_SUFFIX_NAME = ".callback.unified";

    public DatabusV2RegistrationImpl(RegistrationId id, DatabusHttpClientImpl client) {
        this(id, client, client.getCheckpointPersistenceProvider(), null, null);
    }

    public DatabusV2RegistrationImpl(RegistrationId id, DatabusHttpClientImpl client, CheckpointPersistenceProvider ckptProvider) {
        this(id, client, ckptProvider, null, null);
    }

    public DatabusV2RegistrationImpl(RegistrationId id, DatabusHttpClientImpl client, CheckpointPersistenceProvider ckptProvider, String[] sources, AbstractDatabusCombinedConsumer[] consumers) {
        this._id = id;
        this._status = new Status();
        this._client = client;
        this._checkpointPersistenceProvider = ckptProvider;
        this._state = DatabusRegistration.RegistrationState.INIT;
        this._sources = new ArrayList<String>();
        this._consumers = new ArrayList<DatabusCombinedConsumer>();
        String loggerName = this._id != null ? this._id.getId() : this.getClass().getName();
        this._log = Logger.getLogger((String)loggerName);
        if (null != sources) {
            this._sources.addAll(Arrays.asList(sources));
        }
        if (null != consumers) {
            this._consumers.addAll(Arrays.asList(consumers));
        }
    }

    public synchronized void addSubscriptions(String ... sources) throws IllegalStateException {
        if (!this._state.isPreStartState()) {
            throw new IllegalStateException("Cannot add sources when state is running or shut down. Current State :" + this._state);
        }
        for (String s : sources) {
            if (this._sources.contains(s)) continue;
            this._sources.add(s);
        }
    }

    public synchronized void removeSubscriptions(String ... sources) throws IllegalStateException {
        if (!this._state.isRunning()) {
            throw new IllegalStateException("Cannot remove sources when state is running. Current State :" + this._state);
        }
        for (String s : sources) {
            this._sources.remove(s);
        }
    }

    public synchronized void addDatabusConsumers(Collection<DatabusCombinedConsumer> consumers) throws IllegalStateException {
        if (!this._state.isPreStartState()) {
            throw new IllegalStateException("Cannot add consumers when state is running/shutdown. Current State :" + this._state);
        }
        for (DatabusCombinedConsumer c : consumers) {
            if (this._consumers.contains(c)) continue;
            this._consumers.add(c);
        }
    }

    public synchronized void removeDatabusConsumers(Collection<AbstractDatabusCombinedConsumer> consumers) {
        if (!this._state.isRunning()) {
            throw new IllegalStateException("Cannot remove consumers when state is running. Current State :" + this._state);
        }
        this._consumers.removeAll(consumers);
    }

    public synchronized void onRegister() {
        this._state = DatabusRegistration.RegistrationState.REGISTERED;
    }

    protected synchronized void initializeStatsCollectors() {
        MBeanServer mbeanServer = null;
        if (null != this._client) {
            mbeanServer = this._client.getMbeanServer();
        }
        int ownerId = null == this._client ? -1 : this._client.getContainerStaticConfig().getId();
        String regId = null != this._id ? this._id.getId() : "unknownReg";
        this.initializeStatsCollectors(regId, ownerId, mbeanServer);
        if (null != this._client) {
            this._client.getBootstrapEventsStats().addStatsCollector(regId, (StatsCollectorMergeable)this._bootstrapEventsStatsCollector);
            this._client.getInBoundStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._inboundEventsStatsCollector);
            this._client.getRelayConsumerStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._relayConsumerStats);
            this._client.getBootstrapConsumerStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._bootstrapConsumerStats);
            this._client.getUnifiedClientStatsCollectors().addStatsCollector(regId, (StatsCollectorMergeable)this._unifiedClientStats);
        }
    }

    protected void initializeStatsCollectors(String regId, int ownerId, MBeanServer mbeanServer) {
        this._inboundEventsStatsCollector = new DbusEventsStatisticsCollector(ownerId, regId + STREAM_EVENT_STATS_SUFFIX_NAME, true, false, mbeanServer);
        this._bootstrapEventsStatsCollector = new DbusEventsStatisticsCollector(ownerId, regId + BOOTSTRAP_EVENT_STATS_SUFFIX_NAME, true, false, mbeanServer);
        this._relayConsumerStats = new ConsumerCallbackStats(ownerId, regId + RELAY_CONSUMER_STATS_SUFFIX_NAME, regId, true, false, new ConsumerCallbackStatsEvent());
        this._bootstrapConsumerStats = new ConsumerCallbackStats(ownerId, regId + BOOTSTRAP_CONSUMER_STATS_SUFFIX_NAME, regId, true, false, new ConsumerCallbackStatsEvent());
        this._unifiedClientStats = new UnifiedClientStats(ownerId, regId + UNIFIED_CLIENT_STATS_SUFFIX_NAME, regId, true, false, this._client.getClientStaticConfig().getPullerThreadDeadnessThresholdMs(), new UnifiedClientStatsEvent());
    }

    public synchronized boolean start() throws IllegalStateException, DatabusClientException {
        this._log.info((Object)("Starting registration (" + this.toString() + ") !!"));
        if (this._state.isRunning()) {
            this._log.info((Object)("Registration (" + this._id + ") already started !!"));
            return false;
        }
        if (this._state != DatabusRegistration.RegistrationState.REGISTERED) {
            throw new IllegalStateException("Registration (" + this._id + ") not in startable state !! Current State is :" + this._state);
        }
        if (null == this._sources || this._sources.isEmpty()) {
            throw new DatabusClientException("Registration (" + this._id + ") does not have any sources to start !!");
        }
        if (null == this._consumers || this._consumers.isEmpty()) {
            throw new DatabusClientException("Registration (" + this._id + ") does not have any consumers to start !!");
        }
        List<ServerInfo> relays = this._client.getRelays();
        List<ServerInfo> bootstrapServers = this._client.getBootstrapServices();
        ArrayList<DatabusCombinedConsumer> streamConsumers = new ArrayList<DatabusCombinedConsumer>();
        ArrayList<DatabusCombinedConsumer> bootstrapConsumers = new ArrayList<DatabusCombinedConsumer>();
        if (null == relays || relays.isEmpty()) {
            throw new DatabusClientException("No configured relays in the client to start");
        }
        HashSet<ServerInfo> candidateRelays = new HashSet<ServerInfo>();
        for (ServerInfo s : relays) {
            if (!DatabusV2RegistrationImpl.canServe(s, this._sources)) continue;
            candidateRelays.add(s);
        }
        if (candidateRelays.isEmpty()) {
            throw new DatabusClientException("No candidate relays for source : " + this._sources);
        }
        streamConsumers.addAll(this._consumers);
        boolean canConsumerBootstrap = false;
        this._streamConsumerRawRegistrations = new ArrayList<DatabusV2ConsumerRegistration>();
        this._streamConsumerRawRegistrations.add(new DatabusV2ConsumerRegistration(streamConsumers, this._sources, this._filterConfig));
        for (DatabusCombinedConsumer c : this._consumers) {
            if (!c.canBootstrap()) continue;
            canConsumerBootstrap = true;
            bootstrapConsumers.add(c);
        }
        boolean enableBootstrap = this._client.getClientStaticConfig().getRuntime().getBootstrap().isEnabled();
        HashSet<ServerInfo> candidateBootstrapServers = new HashSet<ServerInfo>();
        if (enableBootstrap && canConsumerBootstrap) {
            if (null == bootstrapServers || bootstrapServers.isEmpty()) {
                throw new DatabusClientException("No configured bootstrap servers in the client to start");
            }
            for (ServerInfo s : bootstrapServers) {
                if (!DatabusV2RegistrationImpl.canServe(s, this._sources)) continue;
                candidateBootstrapServers.add(s);
            }
            if (candidateBootstrapServers.isEmpty()) {
                throw new DatabusClientException("No candidate bootstrap servers for source : " + this._sources);
            }
            this._bootstrapConsumerRawRegistrations = new ArrayList<DatabusV2ConsumerRegistration>();
            this._bootstrapConsumerRawRegistrations.add(new DatabusV2ConsumerRegistration(bootstrapConsumers, this._sources, this._filterConfig));
        }
        this.initializeStatsCollectors();
        DatabusSourcesConnection.StaticConfig connConfig = this._client.getClientStaticConfig().getConnection(this._sources);
        if (null == connConfig) {
            connConfig = this._client.getClientStaticConfig().getConnectionDefaults();
        }
        DbusEventBuffer eventBuffer = null;
        DbusEventBuffer.StaticConfig cfg = connConfig.getEventBuffer();
        eventBuffer = new DbusEventBuffer(cfg.getMaxSize(), cfg.getMaxIndividualBufferSize(), cfg.getScnIndexSize(), cfg.getReadBufferSize(), cfg.getMaxEventSize(), cfg.getAllocationPolicy(), new File(cfg.getMmapDirectory().getAbsolutePath() + "_stream_" + this._id), cfg.getQueuePolicy(), cfg.getTrace(), null, cfg.getAssertLevel(), cfg.getBufferRemoveWaitPeriod(), cfg.getRestoreMMappedBuffers(), cfg.getRestoreMMappedBuffersValidateEvents(), cfg.isEnableScnIndex(), this._client.getEventFactory());
        eventBuffer.setDropOldEvents(true);
        eventBuffer.start(0L);
        DbusEventBuffer bootstrapBuffer = null;
        if (enableBootstrap && canConsumerBootstrap) {
            DbusEventBuffer.StaticConfig bstCfg = connConfig.getBstEventBuffer();
            bootstrapBuffer = new DbusEventBuffer(bstCfg.getMaxSize(), bstCfg.getMaxIndividualBufferSize(), bstCfg.getScnIndexSize(), bstCfg.getReadBufferSize(), bstCfg.getMaxEventSize(), bstCfg.getAllocationPolicy(), new File(bstCfg.getMmapDirectory().getAbsolutePath() + "_bootstrap_" + this._id), bstCfg.getQueuePolicy(), bstCfg.getTrace(), null, bstCfg.getAssertLevel(), bstCfg.getBufferRemoveWaitPeriod(), bstCfg.getRestoreMMappedBuffers(), bstCfg.getRestoreMMappedBuffersValidateEvents(), bstCfg.isEnableScnIndex(), this._client.getEventFactory());
            bootstrapBuffer.setDropOldEvents(false);
            bootstrapBuffer.start(0L);
        }
        List<DatabusSubscription> subs = this.createSubscriptions(this._sources);
        if (null != this._checkpointPersistenceProvider && this._client.getClientStaticConfig().getCheckpointPersistence().isClearBeforeUse()) {
            this._log.info((Object)("Clearing checkpoint for sources :" + this._sources + " with regId :" + this._id));
            this._checkpointPersistenceProvider.removeCheckpoint(this._sources);
        }
        this._sourcesConnection = this.createConnection(connConfig, subs, candidateRelays, candidateBootstrapServers, eventBuffer, bootstrapBuffer);
        this._sourcesConnection.start();
        this._state = DatabusRegistration.RegistrationState.STARTED;
        this._status.start();
        this._state = DatabusRegistration.RegistrationState.STARTED;
        return true;
    }

    private List<DatabusSubscription> createSubscriptions(List<String> sources) throws DatabusClientException {
        List subs = null;
        try {
            subs = DatabusSubscription.createFromUriList(sources);
        }
        catch (Exception ex) {
            throw new DatabusClientException((Throwable)ex);
        }
        return subs;
    }

    protected synchronized DatabusSourcesConnection createConnection(DatabusSourcesConnection.StaticConfig connConfig, List<DatabusSubscription> subs, Set<ServerInfo> candidateRelays, Set<ServerInfo> candidateBootstrapServers, DbusEventBuffer eventBuffer, DbusEventBuffer bootstrapBuffer) {
        this._log.info((Object)("Creating Sources Connection : Candidate Relays :" + candidateRelays + ", CandidateBootstrapServers :" + candidateBootstrapServers + ", Subscriptions :" + subs));
        ConnectionStateFactory connStateFactory = new ConnectionStateFactory(DatabusSubscription.getStrList(subs));
        DatabusSourcesConnection sourcesConnection = new DatabusSourcesConnection(connConfig, subs, candidateRelays, candidateBootstrapServers, this._streamConsumerRawRegistrations, this._bootstrapConsumerRawRegistrations, eventBuffer, bootstrapBuffer, this._client.getDefaultExecutorService(), this._client.getContainerStatsCollector(), this._inboundEventsStatsCollector, this._bootstrapEventsStatsCollector, this._relayConsumerStats, this._bootstrapConsumerStats, this._unifiedClientStats, this._checkpointPersistenceProvider, this._client.getRelayConnFactory(), this._client.getBootstrapConnFactory(), this._client.getHttpStatsCollector(), null, this._client, this._id.toString(), this._client.getEventFactory(), null, connStateFactory);
        return sourcesConnection;
    }

    public synchronized void shutdown() throws IllegalStateException {
        if (!this._state.isRunning()) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in running state to be shutdown. Current state :" + this._state);
        }
        this._sourcesConnection.unregisterMbeans();
        this._sourcesConnection.stop();
        this._status.shutdown();
        this._state = DatabusRegistration.RegistrationState.SHUTDOWN;
        this._client.getBootstrapEventsStats().removeStatsCollector(this._id.getId());
        this._client.getInBoundStatsCollectors().removeStatsCollector(this._id.getId());
        this._client.getRelayConsumerStatsCollectors().removeStatsCollector(this._id.getId());
        this._client.getBootstrapConsumerStatsCollectors().removeStatsCollector(this._id.getId());
        this._client.getUnifiedClientStatsCollectors().removeStatsCollector(this._id.getId());
    }

    public synchronized void pause() throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.PAUSED) {
            return;
        }
        if (this._state != DatabusRegistration.RegistrationState.STARTED && this._state != DatabusRegistration.RegistrationState.RESUMED) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in correct state to be paused. Current state :" + this._state);
        }
        this._sourcesConnection.getConnectionStatus().pause();
        this._status.pause();
        this._state = DatabusRegistration.RegistrationState.PAUSED;
    }

    public synchronized void suspendOnError(Throwable ex) throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.SUSPENDED_ON_ERROR) {
            return;
        }
        if (!this._state.isRunning()) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in correct state to be suspended. Current state :" + this._state);
        }
        this._sourcesConnection.getConnectionStatus().suspendOnError(ex);
        this._status.suspendOnError(ex);
        this._state = DatabusRegistration.RegistrationState.SUSPENDED_ON_ERROR;
    }

    public synchronized void resume() throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.RESUMED) {
            return;
        }
        if (this._state != DatabusRegistration.RegistrationState.PAUSED && this._state != DatabusRegistration.RegistrationState.SUSPENDED_ON_ERROR) {
            throw new IllegalStateException("Registration (" + this._id + ") is not in correct state to be resumed. Current state :" + this._state);
        }
        this._sourcesConnection.getConnectionStatus().resume();
        this._status.resume();
        this._state = DatabusRegistration.RegistrationState.RESUMED;
    }

    public DatabusRegistration.RegistrationState getState() {
        return this._state;
    }

    public synchronized boolean deregister() throws IllegalStateException {
        if (this._state == DatabusRegistration.RegistrationState.DEREGISTERED || this._state == DatabusRegistration.RegistrationState.INIT) {
            return false;
        }
        if (this._state.isRunning()) {
            this.shutdown();
        }
        this.deregisterFromClient();
        this._state = DatabusRegistration.RegistrationState.DEREGISTERED;
        return true;
    }

    protected void deregisterFromClient() {
        this._client.deregister(this);
    }

    public Collection<DatabusSubscription> getSubscriptions() {
        return DatabusSubscription.createSubscriptionList(this._sources);
    }

    public synchronized DatabusComponentStatus getStatus() {
        return this._status;
    }

    public synchronized Logger getLogger() {
        return this._log;
    }

    public DatabusRegistration getParent() {
        return this._parent;
    }

    protected void setParent(DatabusRegistration parent) {
        this._parent = parent;
    }

    public synchronized DatabusRegistration withRegId(RegistrationId regId) throws DatabusClientException, IllegalStateException {
        if (this._id != null && this._id.equals((Object)regId)) {
            return this;
        }
        if (!RegistrationIdGenerator.isIdValid(regId)) {
            throw new DatabusClientException("Another registration with the same regId (" + regId + ") already present !!");
        }
        if (this._state.isRunning()) {
            throw new IllegalStateException("Cannot update regId when registration is in running state. RegId :" + this._id + ", State :" + this._state);
        }
        this._id = regId;
        this._status = new Status();
        return this;
    }

    public synchronized DatabusRegistration withServerSideFilter(DbusKeyCompositeFilterConfig filterConfig) throws IllegalStateException {
        if (this._state.isRunning()) {
            throw new IllegalStateException("Cannot update server-side filter when registration is in running state. RegId :" + this._id + ", State :" + this._state);
        }
        this._filterConfig = filterConfig;
        return this;
    }

    public List<DbusPartitionInfo> getPartitions() {
        return null;
    }

    public Checkpoint getLastPersistedCheckpoint() {
        Checkpoint cp = this._checkpointPersistenceProvider.loadCheckpoint(this._sources);
        return cp;
    }

    public synchronized boolean storeCheckpoint(Checkpoint ckpt) throws IllegalStateException {
        try {
            this._checkpointPersistenceProvider.storeCheckpoint(this._sources, ckpt);
        }
        catch (IOException ioe) {
            this._log.error((Object)"Storing checkpoint failed with exception", (Throwable)ioe);
            return false;
        }
        return true;
    }

    public DbusEventsStatisticsCollectorMBean getRelayEventStats() {
        return this._inboundEventsStatsCollector;
    }

    public DbusEventsStatisticsCollectorMBean getBootstrapEventStats() {
        return this._bootstrapEventsStatsCollector;
    }

    public ConsumerCallbackStatsMBean getRelayCallbackStats() {
        return this._relayConsumerStats;
    }

    public ConsumerCallbackStatsMBean getBootstrapCallbackStats() {
        return this._bootstrapConsumerStats;
    }

    public UnifiedClientStatsMBean getUnifiedClientStats() {
        return this._unifiedClientStats;
    }

    public RelayFindMaxSCNResult fetchMaxSCN(FetchMaxSCNRequest request) throws InterruptedException {
        throw new RuntimeException("Not yet supported !!");
    }

    public RelayFlushMaxSCNResult flush(RelayFindMaxSCNResult fetchSCNResult, FlushRequest flushRequest) throws InterruptedException {
        throw new RuntimeException("Not yet supported !!");
    }

    public RelayFlushMaxSCNResult flush(FetchMaxSCNRequest maxScnRequest, FlushRequest flushRequest) throws InterruptedException {
        throw new RuntimeException("Not yet supported !!");
    }

    protected synchronized String getStatusName() {
        return "Status" + (this._id != null ? "_" + this._id.getId() : "");
    }

    private static boolean canServe(ServerInfo s, Collection<String> sources) {
        List supportedSources = s.getSources();
        for (String src : sources) {
            if (supportedSources.contains(src)) continue;
            return false;
        }
        return true;
    }

    public synchronized RegistrationId getRegistrationId() {
        return this._id;
    }

    public synchronized String toString() {
        return "DatabusV2RegistrationImpl [_state=" + this._state + ", _id=" + this._id + ", _sources=" + this._sources + ", _status=" + (Object)((Object)this._status) + ", _filterConfig=" + this._filterConfig + ", _streamConsumerRawRegistrations=" + this._streamConsumerRawRegistrations + ", _bootstrapConsumerRawRegistrations=" + this._bootstrapConsumerRawRegistrations + "]";
    }

    public synchronized DbusKeyCompositeFilterConfig getFilterConfig() {
        return this._filterConfig;
    }

    public class Status
    extends DatabusComponentStatus {
        public Status() {
            super(DatabusV2RegistrationImpl.this.getStatusName());
        }
    }
}

