/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.BootstrapDispatcher;
import com.linkedin.databus.client.BootstrapPullThread;
import com.linkedin.databus.client.ConnectionStateFactory;
import com.linkedin.databus.client.DatabusBootstrapConnectionFactory;
import com.linkedin.databus.client.DatabusHttpClientImpl;
import com.linkedin.databus.client.DatabusRelayConnectionFactory;
import com.linkedin.databus.client.GenericDispatcher;
import com.linkedin.databus.client.RelayDispatcher;
import com.linkedin.databus.client.RelayPullThread;
import com.linkedin.databus.client.consumer.BootstrapConsumerCallbackFactory;
import com.linkedin.databus.client.consumer.ConsumerCallbackFactory;
import com.linkedin.databus.client.consumer.DatabusV2ConsumerRegistration;
import com.linkedin.databus.client.consumer.LoggingConsumer;
import com.linkedin.databus.client.consumer.MultiConsumerCallback;
import com.linkedin.databus.client.consumer.StreamConsumerCallbackFactory;
import com.linkedin.databus.client.pub.CheckpointPersistenceProvider;
import com.linkedin.databus.client.pub.DatabusCombinedConsumer;
import com.linkedin.databus.client.pub.RegistrationId;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStats;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.async.LifecycleMessage;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.data_model.LogicalSource;
import com.linkedin.databus.core.data_model.PhysicalPartition;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.core.util.NamedThreadFactory;
import com.linkedin.databus.core.util.Range;
import com.linkedin.databus.core.util.UncaughtExceptionTrackingThread;
import com.linkedin.databus2.core.BackoffTimerStaticConfig;
import com.linkedin.databus2.core.BackoffTimerStaticConfigBuilder;
import com.linkedin.databus2.core.container.monitoring.mbean.ContainerStatisticsCollector;
import com.linkedin.databus2.core.container.monitoring.mbean.HttpStatisticsCollector;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilterConfig;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class DatabusSourcesConnection {
    public static final int MAX_QUEUED_MESSAGES = 10;
    public static final long MESSAGE_QUEUE_POLL_TIMEOUT_MS = 100L;
    public static final int MAX_CONNECT_RETRY_NUM = 3;
    public static final long CONNECT_TIMEOUT_MS = 100L;
    public static final long REGISTER_TIMEOUT_MS = 1000L;
    public final Logger _log;
    private final String _name;
    private final String _prettyName;
    private final StaticConfig _connectionConfig;
    private final List<DatabusSubscription> _subscriptions;
    private final RelayPullThread _relayPuller;
    private final GenericDispatcher<DatabusCombinedConsumer> _relayDispatcher;
    private final BootstrapPullThread _bootstrapPuller;
    private final GenericDispatcher<DatabusCombinedConsumer> _bootstrapDispatcher;
    private final DbusEventBuffer _dataEventsBuffer;
    private final DbusEventBuffer _bootstrapEventsBuffer;
    private final ExecutorService _ioThreadPool;
    private final CheckpointPersistenceProvider _checkpointPersistenceProvider;
    private final ContainerStatisticsCollector _containerStatisticsCollector;
    private final DbusEventsStatisticsCollector _inboundEventsStatsCollector;
    private final DbusEventsStatisticsCollector _bootstrapEventsStatsCollector;
    private final HttpStatisticsCollector _relayCallsStatsCollector;
    private final HttpStatisticsCollector _localRelayCallsStatsCollector;
    private final DatabusRelayConnectionFactory _relayConnFactory;
    private final DatabusBootstrapConnectionFactory _bootstrapConnFactory;
    private final List<DatabusV2ConsumerRegistration> _relayRegistrations;
    private final ConsumerCallbackStats _relayConsumerStats;
    private final ConsumerCallbackStats _bootstrapConsumerStats;
    private final UnifiedClientStats _unifiedClientStats;
    private final NannyRunnable _nannyRunnable;
    private final DbusEventFactory _eventFactory;
    private final ConnectionStateFactory _connStateFactory;
    private final List<DatabusV2ConsumerRegistration> _bootstrapRegistrations;
    private final SourcesConnectionStatus _connectionStatus;
    private UncaughtExceptionTrackingThread _relayPullerThread;
    private UncaughtExceptionTrackingThread _relayDispatcherThread;
    private UncaughtExceptionTrackingThread _bootstrapPullerThread;
    private UncaughtExceptionTrackingThread _bootstrapDispatcherThread;
    private final Thread _messageQueuesMonitorThread;
    private Thread _nannyThread;
    private ExecutorService _consumerCallbackExecutor;
    private final boolean _isBootstrapEnabled;
    private final RegistrationId _registrationId;
    private final String _connRawId;
    private ReentrantLock _v3BootstrapLock = null;

    public ExecutorService getIoThreadPool() {
        return this._ioThreadPool;
    }

    public DatabusSourcesConnection(StaticConfig connConfig, List<DatabusSubscription> subscriptions, Set<ServerInfo> relays, Set<ServerInfo> bootstrapServices, List<DatabusV2ConsumerRegistration> registrations, List<DatabusV2ConsumerRegistration> bootstrapRegistrations, DbusEventBuffer dataEventsBuffer, DbusEventBuffer bootstrapEventsBuffer, ExecutorService ioThreadPool, ContainerStatisticsCollector containerStatsCollector, DbusEventsStatisticsCollector inboundEventsStatsCollector, DbusEventsStatisticsCollector bootstrapEventsStatsCollector, ConsumerCallbackStats relayCallbackStats, ConsumerCallbackStats bootstrapCallbackStats, UnifiedClientStats unifiedClientStats, CheckpointPersistenceProvider checkpointPersistenceProvider, DatabusRelayConnectionFactory relayConnFactory, DatabusBootstrapConnectionFactory bootstrapConnFactory, HttpStatisticsCollector relayCallsStatsCollector, RegistrationId registrationId, DatabusHttpClientImpl serverHandle, DbusEventFactory eventFactory, ConnectionStateFactory connStateFactory) {
        this(connConfig, subscriptions, relays, bootstrapServices, registrations, bootstrapRegistrations, dataEventsBuffer, bootstrapEventsBuffer, ioThreadPool, containerStatsCollector, inboundEventsStatsCollector, bootstrapEventsStatsCollector, relayCallbackStats, bootstrapCallbackStats, unifiedClientStats, checkpointPersistenceProvider, relayConnFactory, bootstrapConnFactory, relayCallsStatsCollector, registrationId, serverHandle, registrationId != null ? registrationId.toString() : null, eventFactory, null, connStateFactory);
    }

    public DatabusSourcesConnection(StaticConfig connConfig, List<DatabusSubscription> subscriptions, Set<ServerInfo> relays, Set<ServerInfo> bootstrapServices, List<DatabusV2ConsumerRegistration> registrations, List<DatabusV2ConsumerRegistration> bootstrapRegistrations, DbusEventBuffer dataEventsBuffer, DbusEventBuffer bootstrapEventsBuffer, ExecutorService ioThreadPool, ContainerStatisticsCollector containerStatsCollector, DbusEventsStatisticsCollector inboundEventsStatsCollector, DbusEventsStatisticsCollector bootstrapEventsStatsCollector, ConsumerCallbackStats relayCallbackStats, ConsumerCallbackStats bootstrapCallbackStats, UnifiedClientStats unifiedClientStats, CheckpointPersistenceProvider checkpointPersistenceProvider, DatabusRelayConnectionFactory relayConnFactory, DatabusBootstrapConnectionFactory bootstrapConnFactory, HttpStatisticsCollector relayCallsStatsCollector, RegistrationId registrationId, DatabusHttpClientImpl serverHandle, String connRawId, DbusEventFactory eventFactory, ReentrantLock v3BootstrapLock, ConnectionStateFactory connStateFactory) {
        int consumerParallelism;
        DbusKeyCompositeFilterConfig conf;
        this._eventFactory = eventFactory;
        this._connectionConfig = connConfig;
        this._dataEventsBuffer = dataEventsBuffer;
        this._bootstrapEventsBuffer = bootstrapEventsBuffer;
        this._subscriptions = subscriptions;
        this._ioThreadPool = ioThreadPool;
        this._checkpointPersistenceProvider = checkpointPersistenceProvider;
        this._containerStatisticsCollector = containerStatsCollector;
        this._inboundEventsStatsCollector = inboundEventsStatsCollector;
        this._bootstrapEventsStatsCollector = bootstrapEventsStatsCollector;
        this._relayConsumerStats = relayCallbackStats;
        this._bootstrapConsumerStats = bootstrapCallbackStats;
        this._unifiedClientStats = unifiedClientStats;
        this._relayConnFactory = relayConnFactory;
        this._bootstrapConnFactory = bootstrapConnFactory;
        this._relayRegistrations = registrations;
        this._bootstrapRegistrations = bootstrapRegistrations;
        this._relayCallsStatsCollector = relayCallsStatsCollector;
        this._localRelayCallsStatsCollector = null != relayCallsStatsCollector ? relayCallsStatsCollector.createForClientConnection(this.toString()) : null;
        this._registrationId = registrationId;
        this._connRawId = null == connRawId ? "" : connRawId;
        this._name = this.composeName(connRawId);
        this._prettyName = this.constructPrettyNameForLogging(subscriptions, connRawId);
        this._log = Logger.getLogger((String)this._prettyName);
        this._connectionStatus = new SourcesConnectionStatus();
        this._v3BootstrapLock = v3BootstrapLock;
        this._connStateFactory = connStateFactory;
        ArrayList<DbusKeyCompositeFilterConfig> relayFilterConfigs = new ArrayList<DbusKeyCompositeFilterConfig>();
        ArrayList<DbusKeyCompositeFilterConfig> bootstrapFilterConfigs = new ArrayList<DbusKeyCompositeFilterConfig>();
        if (null != registrations) {
            for (DatabusV2ConsumerRegistration reg : registrations) {
                conf = reg.getFilterConfig();
                if (null == conf) continue;
                relayFilterConfigs.add(conf);
            }
        }
        if (null != bootstrapRegistrations) {
            for (DatabusV2ConsumerRegistration reg : bootstrapRegistrations) {
                conf = reg.getFilterConfig();
                if (null == conf) continue;
                bootstrapFilterConfigs.add(conf);
            }
        }
        this._consumerCallbackExecutor = 1 == (consumerParallelism = connConfig.getConsumerParallelism()) ? Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("callback")) : Executors.newFixedThreadPool(consumerParallelism, (ThreadFactory)new NamedThreadFactory("callback"));
        LoggingConsumer loggingConsumer = null;
        if (serverHandle != null) {
            try {
                loggingConsumer = new LoggingConsumer(serverHandle.getClientStaticConfig().getLoggingListener());
            }
            catch (InvalidConfigException e) {
                throw new DatabusRuntimeException((Throwable)e);
            }
        }
        MultiConsumerCallback relayAsyncCallback = new MultiConsumerCallback(null != this._relayRegistrations ? this._relayRegistrations : new ArrayList(), this._consumerCallbackExecutor, connConfig.getConsumerTimeBudgetMs(), (ConsumerCallbackFactory)new StreamConsumerCallbackFactory(this._relayConsumerStats, this._unifiedClientStats), this._relayConsumerStats, this._unifiedClientStats, loggingConsumer, this._log);
        MultiConsumerCallback bootstrapAsyncCallback = new MultiConsumerCallback(null != this._bootstrapRegistrations ? this._bootstrapRegistrations : new ArrayList(), this._consumerCallbackExecutor, connConfig.getBstConsumerTimeBudgetMs(), (ConsumerCallbackFactory)new BootstrapConsumerCallbackFactory(this._bootstrapConsumerStats, this._unifiedClientStats), this._bootstrapConsumerStats, this._unifiedClientStats, loggingConsumer, this._log);
        this._bootstrapPuller = this._bootstrapEventsBuffer != null ? new BootstrapPullThread(this._connRawId + "-BootstrapPuller", this, this._bootstrapEventsBuffer, this._connStateFactory, bootstrapServices, bootstrapFilterConfigs, connConfig.getPullerUtilizationPct(), ManagementFactory.getPlatformMBeanServer(), this._eventFactory, this._v3BootstrapLock, this._log) : null;
        this._relayDispatcher = new RelayDispatcher(this._connRawId + "-RelayDispatcher", connConfig, this.getSubscriptions(), checkpointPersistenceProvider, dataEventsBuffer, relayAsyncCallback, this._bootstrapPuller, ManagementFactory.getPlatformMBeanServer(), serverHandle, this._registrationId, this._log);
        this._relayPuller = new RelayPullThread(this._connRawId + "-RelayPuller", this, this._dataEventsBuffer, this._connStateFactory, relays, relayFilterConfigs, connConfig.getConsumeCurrent(), connConfig.isReadLatestScnOnErrorEnabled(), connConfig.getPullerUtilizationPct(), connConfig.getNoEventsConnectionResetTimeSec(), ManagementFactory.getPlatformMBeanServer(), this._eventFactory, this._log);
        this._relayPuller.enqueueMessage(LifecycleMessage.createStartMessage());
        this._bootstrapDispatcher = this._bootstrapEventsBuffer != null ? new BootstrapDispatcher(this._connRawId + "-BootstrapDispatcher", connConfig, this.getSubscriptions(), checkpointPersistenceProvider, bootstrapEventsBuffer, bootstrapAsyncCallback, this._relayPuller, ManagementFactory.getPlatformMBeanServer(), serverHandle, this._registrationId, this._log) : null;
        this._messageQueuesMonitorThread = new Thread(new MessageQueuesMonitor());
        this._messageQueuesMonitorThread.setDaemon(true);
        this._isBootstrapEnabled = null != this.getBootstrapServices() && !this.getBootstrapServices().isEmpty() && null != this.getBootstrapRegistrations() && 0 != this.getBootstrapRegistrations().size() && this._bootstrapEventsBuffer != null;
        this._log.info((Object)(" Is Service Empty : " + (null == this.getBootstrapServices() || this.getBootstrapServices().isEmpty())));
        this._log.info((Object)(" Is Consumers Empty : " + (null == this.getBootstrapRegistrations() || 0 == this.getBootstrapRegistrations().size())));
        this._nannyRunnable = new NannyRunnable();
    }

    private DatabusSourcesConnection() {
        this._log = null;
        this._name = null;
        this._prettyName = null;
        this._connectionConfig = null;
        this._subscriptions = null;
        this._relayPuller = null;
        this._relayDispatcher = null;
        this._bootstrapPuller = null;
        this._bootstrapDispatcher = null;
        this._dataEventsBuffer = null;
        this._bootstrapEventsBuffer = null;
        this._ioThreadPool = null;
        this._checkpointPersistenceProvider = null;
        this._containerStatisticsCollector = null;
        this._inboundEventsStatsCollector = null;
        this._bootstrapEventsStatsCollector = null;
        this._relayCallsStatsCollector = null;
        this._localRelayCallsStatsCollector = null;
        this._relayConnFactory = null;
        this._bootstrapConnFactory = null;
        this._relayRegistrations = null;
        this._relayConsumerStats = null;
        this._bootstrapConsumerStats = null;
        this._unifiedClientStats = null;
        this._nannyRunnable = null;
        this._eventFactory = null;
        this._connStateFactory = null;
        this._bootstrapRegistrations = null;
        this._connectionStatus = null;
        this._relayPullerThread = null;
        this._relayDispatcherThread = null;
        this._bootstrapPullerThread = null;
        this._bootstrapDispatcherThread = null;
        this._messageQueuesMonitorThread = null;
        this._nannyThread = null;
        this._consumerCallbackExecutor = null;
        this._isBootstrapEnabled = false;
        this._registrationId = null;
        this._connRawId = null;
        this._v3BootstrapLock = null;
    }

    public static DatabusSourcesConnection createDatabusSourcesConnectionForTesting() {
        DatabusSourcesConnection dsc = new DatabusSourcesConnection();
        return dsc;
    }

    private String composeName(String id) {
        StringBuilder shortSourcesListBuilder = new StringBuilder();
        String separatorChar = "[";
        for (DatabusSubscription sub : this.getSubscriptions()) {
            int lastDotIdx;
            shortSourcesListBuilder.append(separatorChar);
            PhysicalPartition p = sub.getPhysicalPartition();
            String sourceName = "AnySource";
            LogicalSource source = sub.getLogicalPartition().getSource();
            if (!source.isAllSourcesWildcard() && (lastDotIdx = (sourceName = source.getName()).lastIndexOf(46)) >= 0) {
                sourceName = sourceName.substring(lastDotIdx + 1);
            }
            String partString = "AnyPPart_";
            if (!p.isAnyPartitionWildcard()) {
                partString = p.getId() + "_";
            }
            shortSourcesListBuilder.append(partString + sourceName);
            separatorChar = "_";
        }
        shortSourcesListBuilder.append(']');
        String shortSourcesList = shortSourcesListBuilder.toString();
        return "conn" + shortSourcesList + (id == null ? "" : "_" + id);
    }

    public boolean isBootstrapEnabled() {
        return this._isBootstrapEnabled;
    }

    public void start() {
        this._log.info((Object)("Starting http relay connection for sources:" + this._subscriptions));
        this._nannyThread = new Thread((Runnable)this._nannyRunnable, this._connRawId + "-Nanny");
        this._nannyThread.setDaemon(true);
        this._connectionStatus.start();
        this._messageQueuesMonitorThread.start();
        this._nannyThread.start();
    }

    public boolean isRunning() {
        boolean pullThreadRunning = this._relayPullerThread.isAlive();
        boolean dispatcherThreadRunning = this._relayDispatcherThread.isAlive();
        if (!pullThreadRunning) {
            this._log.info((Object)"Pull thread is DEAD!");
        }
        if (null != this._relayPullerThread.getLastException()) {
            this._log.error((Object)(" Reason: " + this._relayPullerThread.getLastException().getMessage()), this._relayPullerThread.getLastException());
        }
        if (!dispatcherThreadRunning) {
            this._log.info((Object)"Dispatch thread is DEAD!");
        }
        if (null != this._relayDispatcherThread.getLastException()) {
            this._log.error((Object)(" Reason: " + this._relayDispatcherThread.getLastException().getMessage()), this._relayDispatcherThread.getLastException());
        }
        return pullThreadRunning && dispatcherThreadRunning;
    }

    public void await() {
        boolean running = this.isRunning();
        this._log.info((Object)("waiting for shutdown: " + running));
        while (running) {
            this._relayPuller.awaitShutdown();
            this._relayDispatcher.awaitShutdown();
            running = this.isRunning();
            this._log.info((Object)("waiting for shutdown: " + running));
        }
    }

    public void stop() {
        this._log.info((Object)("Stopping ... :" + this.isRunning()));
        this.unregisterMbeans();
        this._connectionStatus.shutdown();
        if (this._relayPullerThread.isAlive()) {
            this._log.info((Object)"shutting down relay puller ...");
            this._relayPuller.awaitShutdown();
        }
        if (this._relayDispatcherThread.isAlive()) {
            this._log.info((Object)"shutting down relay dispatcher ...");
            this._relayDispatcher.awaitShutdown();
        }
        if (this._isBootstrapEnabled) {
            if (this._bootstrapDispatcherThread.isAlive()) {
                this._log.info((Object)"shutting down bootstrap dispatcher ...");
                this._bootstrapDispatcher.awaitShutdown();
            }
            if (this._bootstrapPullerThread.isAlive()) {
                this._log.info((Object)"shutting down bootstrap puller ...");
                this._bootstrapPuller.awaitShutdown();
            }
        }
        this._consumerCallbackExecutor.shutdown();
        this._log.info((Object)"Stopped ... ");
    }

    public List<String> getSourcesNames() {
        return DatabusSubscription.getStrList(this._subscriptions);
    }

    public List<DatabusSubscription> getSubscriptions() {
        return this._subscriptions;
    }

    public ConsumerCallbackStats getRelayConsumerStats() {
        return this._relayConsumerStats;
    }

    public ConsumerCallbackStats getBootstrapConsumerStats() {
        return this._bootstrapConsumerStats;
    }

    public UnifiedClientStats getUnifiedClientStats() {
        return this._unifiedClientStats;
    }

    public static void main(String[] args) throws Exception {
    }

    public DatabusComponentStatus getConnectionStatus() {
        return this._connectionStatus;
    }

    public BootstrapPullThread getBootstrapPuller() {
        return this._bootstrapPuller;
    }

    public GenericDispatcher<DatabusCombinedConsumer> getBootstrapDispatcher() {
        return this._bootstrapDispatcher;
    }

    public CheckpointPersistenceProvider getCheckpointPersistenceProvider() {
        return this._checkpointPersistenceProvider;
    }

    public ContainerStatisticsCollector getContainerStatisticsCollector() {
        return this._containerStatisticsCollector;
    }

    public Set<ServerInfo> getRelays() {
        return this._relayPuller != null ? this._relayPuller.getServers() : null;
    }

    public Set<ServerInfo> getBootstrapServices() {
        return this._bootstrapPuller != null ? this._bootstrapPuller.getServers() : null;
    }

    public DbusEventsStatisticsCollector getInboundEventsStatsCollector() {
        return this._inboundEventsStatsCollector;
    }

    public GenericDispatcher<DatabusCombinedConsumer> getRelayDispatcher() {
        return this._relayDispatcher;
    }

    public DatabusRelayConnectionFactory getRelayConnFactory() {
        return this._relayConnFactory;
    }

    public DatabusBootstrapConnectionFactory getBootstrapConnFactory() {
        return this._bootstrapConnFactory;
    }

    public DbusEventBuffer getDataEventsBuffer() {
        return this._dataEventsBuffer;
    }

    public DbusEventBuffer getBootstrapEventsBuffer() {
        return this._bootstrapEventsBuffer;
    }

    public Checkpoint loadPersistentCheckpoint() {
        if (this._checkpointPersistenceProvider != null) {
            return this._checkpointPersistenceProvider.loadCheckpointV3(this.getSubscriptions(), this._registrationId);
        }
        Checkpoint cp = Checkpoint.createFlexibleCheckpoint();
        return cp;
    }

    public List<DatabusV2ConsumerRegistration> getBootstrapRegistrations() {
        return this._bootstrapRegistrations;
    }

    public StaticConfig getConnectionConfig() {
        return this._connectionConfig;
    }

    public List<DatabusV2ConsumerRegistration> getRelayRegistrations() {
        return this._relayRegistrations;
    }

    public DbusEventsStatisticsCollector getBootstrapEventsStatsCollector() {
        return this._bootstrapEventsStatsCollector;
    }

    public HttpStatisticsCollector getRelayCallsStatsCollector() {
        return this._relayCallsStatsCollector;
    }

    public HttpStatisticsCollector getLocalRelayCallsStatsCollector() {
        return this._localRelayCallsStatsCollector;
    }

    public RelayPullThread getRelayPullThread() {
        return this._relayPuller;
    }

    public BootstrapPullThread getBootstrapPullThread() {
        return this._bootstrapPuller;
    }

    public void removeRegistration(DatabusV2ConsumerRegistration reg) {
        this._relayDispatcher.getAsyncCallback().removeRegistration(reg);
    }

    public void unregisterMbeans() {
        if (this._relayConsumerStats != null) {
            this._relayConsumerStats.unregisterAsMbean();
        }
        if (this._bootstrapConsumerStats != null) {
            this._bootstrapConsumerStats.unregisterAsMbean();
        }
        if (this._unifiedClientStats != null) {
            this._unifiedClientStats.unregisterAsMbean();
        }
    }

    public String constructPrettyNameForLogging(List<DatabusSubscription> subscriptions, String regId) {
        StringBuilder sb = new StringBuilder();
        sb.append(DatabusSubscription.getPrettyNameForListOfSubscriptions(subscriptions));
        sb.append("_");
        sb.append(regId);
        return sb.toString();
    }

    class MessageQueuesMonitor
    implements Runnable {
        private static final long ERROR_SLEEP_MS = 300000L;
        private static final long INFO_SLEEP_MS = 300000L;
        private static final long DEBUG_SLEEP_MS = 100L;
        private static final long TRACE_SLEEP_MS = 10L;
        private String _lastMessage;

        MessageQueuesMonitor() {
        }

        @Override
        public void run() {
            while (DatabusSourcesConnection.this._connectionStatus.getStatus() != DatabusComponentStatus.Status.SHUTDOWN) {
                String newMessage;
                StringBuilder sb = new StringBuilder(1000);
                if (null != DatabusSourcesConnection.this._relayPuller) {
                    DatabusSourcesConnection.this._relayPuller.getQueueListString(sb);
                }
                sb.append(' ');
                if (null != DatabusSourcesConnection.this._relayDispatcher) {
                    DatabusSourcesConnection.this._relayDispatcher.getQueueListString(sb);
                }
                sb.append(' ');
                if (null != DatabusSourcesConnection.this._bootstrapPuller) {
                    DatabusSourcesConnection.this._bootstrapPuller.getQueueListString(sb);
                }
                sb.append(' ');
                if (null != DatabusSourcesConnection.this._bootstrapDispatcher) {
                    DatabusSourcesConnection.this._bootstrapDispatcher.getQueueListString(sb);
                }
                if (!(newMessage = sb.toString()).equals(this._lastMessage)) {
                    DatabusSourcesConnection.this._log.info((Object)newMessage);
                    this._lastMessage = newMessage;
                }
                long sleepDuration = 300000L;
                Level logLevel = DatabusSourcesConnection.this._log.getEffectiveLevel();
                if (Level.TRACE == logLevel) {
                    sleepDuration = 10L;
                } else if (Level.DEBUG == logLevel) {
                    sleepDuration = 100L;
                } else if (Level.INFO == logLevel) {
                    sleepDuration = 300000L;
                }
                try {
                    Thread.sleep(sleepDuration);
                }
                catch (InterruptedException ie) {}
            }
        }
    }

    public static class Config
    implements ConfigBuilder<StaticConfig> {
        private static final long DEFAULT_KEY_RANGE_MIN = -1L;
        private static final long DEFAULT_KEY_RANGE_MAX = -1L;
        private static final long DEFAULT_MAX_BUFFER_SIZE = 0xA00000L;
        private static final int DEFAULT_INIT_READBUFFER_SIZE = 20480;
        private static final int DEFAULT_MAX_SCNINDEX_SIZE = 0x100000;
        private static final boolean DEFAULT_PULLER_MESSAGE_QUEUE_LOGGING = false;
        private static int DEFAULT_MAX_RETRY_NUM = -1;
        private static int DEFAULT_INIT_SLEEP = 100;
        private static double DEFAULT_SLEEP_INC_FACTOR = 1.1;
        private static int DEFAULT_BSPULLER_CKPTCLEANUP_MAX_RETRY_NUM = 1000;
        private static int DEFAULT_BSPULLER_CKPTCLEANUP_INIT_SLEEP = 1000;
        private static int DEFAULT_BSPULLER_CKPTCLEANUP_SLEEP_INC_DELTA = 1000;
        private static double DEFAULT_BSPULLER_CKPTCLEANUP_SLEEP_INC_FACTOR = 1.5;
        private static int DEFAULT_FREE_BUFFER_THRESHOLD = 10240;
        private static int DEFAULT_RETRY_ON_FELLOFF_MAX_RETRY_NUM = 5;
        private final Logger _log = Logger.getLogger(Config.class);
        private DbusEventBuffer.Config _eventBuffer;
        private DbusEventBuffer.Config _bstEventBuffer = null;
        private long _consumerTimeBudgetMs = 300000L;
        private long _bstConsumerTimeBudgetMs = 300000L;
        private boolean _setBstConsumerTimeBudgetCalled = false;
        private int _consumerParallelism = 1;
        private double _checkpointThresholdPct;
        private long _keyMin;
        private long _keyMax;
        private BackoffTimerStaticConfigBuilder _bsPullerRetriesBeforeCkptCleanup;
        private BackoffTimerStaticConfigBuilder _pullerRetries;
        private BackoffTimerStaticConfigBuilder _bstPullerRetries;
        private BackoffTimerStaticConfigBuilder _dispatcherRetries;
        private BackoffTimerStaticConfigBuilder _bstDispatcherRetries = null;
        private int _numRetriesOnFallOff;
        private int _freeBufferThreshold = DEFAULT_FREE_BUFFER_THRESHOLD;
        private boolean _consumeCurrent = false;
        private boolean _readLatestScnOnError = false;
        private double _pullerBufferUtilizationPct = 100.0;
        private int _id;
        private boolean _enablePullerMessageQueueLogging;
        private int _noEventsConnectionResetTimeSec = 900;

        private void makeEvbConfig(DbusEventBuffer.Config evbConfig, DbusEventBuffer.QueuePolicy qPolicy, boolean enableScnIndex, double defaultMemUsage) {
            evbConfig.setQueuePolicy(qPolicy.toString());
            evbConfig.setEnableScnIndex(enableScnIndex);
            evbConfig.setDefaultMemUsage(defaultMemUsage);
            if (evbConfig.getMaxSize() > 0xA00000L) {
                this._log.warn((Object)("Setting buffer size to 10485760 instead of requested size " + evbConfig.getMaxSize()));
                evbConfig.setMaxSize(0xA00000L);
            }
            if (evbConfig.getScnIndexSize() > 0x100000) {
                evbConfig.setScnIndexSize(0x100000);
            }
        }

        public Config() {
            this._eventBuffer = new DbusEventBuffer.Config();
            this.makeEvbConfig(this._eventBuffer, DbusEventBuffer.QueuePolicy.BLOCK_ON_WRITE, false, 0.1);
            this._checkpointThresholdPct = 75.0;
            this._keyMin = -1L;
            this._keyMax = -1L;
            this._pullerRetries = new BackoffTimerStaticConfigBuilder();
            this._pullerRetries.setInitSleep((long)DEFAULT_INIT_SLEEP);
            this._pullerRetries.setSleepIncFactor(DEFAULT_SLEEP_INC_FACTOR);
            this._pullerRetries.setMaxRetryNum(DEFAULT_MAX_RETRY_NUM);
            this._bsPullerRetriesBeforeCkptCleanup = new BackoffTimerStaticConfigBuilder();
            this._bsPullerRetriesBeforeCkptCleanup.setInitSleep((long)DEFAULT_BSPULLER_CKPTCLEANUP_INIT_SLEEP);
            this._bsPullerRetriesBeforeCkptCleanup.setSleepIncDelta((long)DEFAULT_BSPULLER_CKPTCLEANUP_SLEEP_INC_DELTA);
            this._bsPullerRetriesBeforeCkptCleanup.setMaxRetryNum(DEFAULT_BSPULLER_CKPTCLEANUP_MAX_RETRY_NUM);
            this._bsPullerRetriesBeforeCkptCleanup.setSleepIncFactor(DEFAULT_BSPULLER_CKPTCLEANUP_SLEEP_INC_FACTOR);
            this._numRetriesOnFallOff = DEFAULT_RETRY_ON_FELLOFF_MAX_RETRY_NUM;
            this._dispatcherRetries = new BackoffTimerStaticConfigBuilder();
            this._dispatcherRetries.setSleepIncFactor(1.1);
            this._dispatcherRetries.setMaxRetryNum(-1);
            this._enablePullerMessageQueueLogging = false;
        }

        public Config(Config other) {
            this._eventBuffer = new DbusEventBuffer.Config(other.getEventBuffer());
            this._bstEventBuffer = other.hasBstEventBuffer() ? new DbusEventBuffer.Config(other.getBstEventBuffer()) : null;
        }

        public DbusEventBuffer.Config getEventBuffer() {
            return this._eventBuffer;
        }

        public DbusEventBuffer.Config getBstEventBuffer() {
            if (this._bstEventBuffer != null) {
                return this._bstEventBuffer;
            }
            this._bstEventBuffer = new DbusEventBuffer.Config();
            this.makeEvbConfig(this._bstEventBuffer, DbusEventBuffer.QueuePolicy.BLOCK_ON_WRITE, false, 0.1);
            return this._bstEventBuffer;
        }

        public boolean hasBstEventBuffer() {
            return this._bstEventBuffer != null;
        }

        public void setEventBuffer(DbusEventBuffer.Config eventBuffer) {
            this._eventBuffer = eventBuffer;
        }

        public double computeSafeCheckpointThresholdPct(DbusEventBuffer.Config bufCfg) {
            int safeMaxEventSize = (int)((100.0 - this._checkpointThresholdPct) * (double)bufCfg.maxMaxEventSize() / 100.0);
            if (-1 == bufCfg.getMaxEventSize()) {
                return this._checkpointThresholdPct;
            }
            if (safeMaxEventSize >= bufCfg.getMaxEventSize()) {
                return this._checkpointThresholdPct;
            }
            return 100.0 - (double)(bufCfg.getMaxEventSize() + this._freeBufferThreshold) / (double)bufCfg.maxMaxEventSize() * 100.0;
        }

        private void validateBufferConfig(StaticConfig connConfig, DbusEventBuffer.StaticConfig bufferConfig) throws InvalidConfigException {
            long bufferCapacityInBytes = bufferConfig.getMaxSize();
            long maxWindowSizeInBytes = (long)(connConfig.getCheckpointThresholdPct() / 100.0 * (double)bufferCapacityInBytes);
            if (maxWindowSizeInBytes + (long)connConfig.getFreeBufferThreshold() > bufferCapacityInBytes) {
                throw new InvalidConfigException("Invalid configuration. Could lead to deadlock: ((checkPointThresholdPct*maxSize) + freeBufferThreshold) > maxSize freeBufferThreshold=" + this._freeBufferThreshold + " checkpointThresholdPct=" + this._checkpointThresholdPct + " maxSize=" + bufferCapacityInBytes);
            }
            int readBufferSize = bufferConfig.getReadBufferSize();
            if (readBufferSize <= connConfig.getFreeBufferThreshold()) {
                throw new InvalidConfigException("Invalid configuration. Could lead to deadlock: readBufferSize <= freeBufferThreshold. Increase readBufferSize to be greater than freeBufferThreshold  readBufferSize=" + readBufferSize + " freeBufferThreshold= " + this._freeBufferThreshold);
            }
        }

        private void validateConfigs(StaticConfig connConfig) throws InvalidConfigException {
            this.validateBufferConfig(connConfig, connConfig.getEventBuffer());
            if (connConfig.getBstEventBuffer() != null) {
                this.validateBufferConfig(connConfig, connConfig.getBstEventBuffer());
            }
        }

        public StaticConfig build() throws InvalidConfigException {
            Range keyRange = null;
            if (this._keyMin >= 0L && this._keyMax > 0L) {
                keyRange = new Range(this._keyMin, this._keyMax);
            }
            if (this.getConsumerParallelism() < 1) {
                throw new InvalidConfigException("Invalid consumer parallelism:" + this.getConsumerParallelism());
            }
            if (this._checkpointThresholdPct <= 0.0 || this._checkpointThresholdPct > 100.0) {
                throw new InvalidConfigException("checkpointThresholdPct must be in (0, 100]");
            }
            this.validateMaxEventSize(this._eventBuffer);
            if (this._bstEventBuffer != null) {
                this.validateMaxEventSize(this._bstEventBuffer);
            }
            StaticConfig config = new StaticConfig(this._eventBuffer.build(), this._bstEventBuffer != null ? this._bstEventBuffer.build() : this._eventBuffer.build(), this.getConsumerTimeBudgetMs(), this.getBstConsumerTimeBudgetMs(), this.getConsumerParallelism(), this.getCheckpointThresholdPct(), keyRange, this._bsPullerRetriesBeforeCkptCleanup.build(), this._pullerRetries.build(), this._bstPullerRetries != null ? this._bstPullerRetries.build() : this._pullerRetries.build(), this._dispatcherRetries.build(), this._bstDispatcherRetries != null ? this._bstDispatcherRetries.build() : this._dispatcherRetries.build(), this._numRetriesOnFallOff, this._freeBufferThreshold, this._consumeCurrent, this._readLatestScnOnError, this._pullerBufferUtilizationPct, this._id, this._enablePullerMessageQueueLogging, this._noEventsConnectionResetTimeSec);
            this._log.info((Object)("Init readBufferSize=" + config.getEventBuffer().getReadBufferSize()));
            this.validateConfigs(config);
            return config;
        }

        private void validateMaxEventSize(DbusEventBuffer.Config bufCfg) {
            int safeMaxEventSize = (int)((100.0 - this._checkpointThresholdPct) * (double)bufCfg.maxMaxEventSize() / 100.0);
            if (-1 == bufCfg.getMaxEventSize()) {
                bufCfg.setMaxEventSize(safeMaxEventSize);
            } else if (safeMaxEventSize < bufCfg.getMaxEventSize()) {
                this._log.warn((Object)String.format("max event size %d is unsafe and may lead to a dead-lock; using %d instead. To keep the current buffer maxEventSize, you should increase buffers maxSize or decrease checkpointThresholdPct.", bufCfg.getMaxEventSize(), safeMaxEventSize));
                bufCfg.setMaxEventSize(safeMaxEventSize);
            }
            this._log.info((Object)("maxEventSize=" + bufCfg.getMaxEventSize()));
        }

        public int getNoEventsConnectionResetTimeSec() {
            return this._noEventsConnectionResetTimeSec;
        }

        public void setNoEventsConnectionResetTimeSec(int noEventsConnectionResetTimeSec) {
            this._noEventsConnectionResetTimeSec = noEventsConnectionResetTimeSec;
        }

        public boolean getReadLatestScnOnError() {
            return this._readLatestScnOnError;
        }

        public void setReadLatestScnOnError(boolean r) {
            this._readLatestScnOnError = r;
        }

        public int getId() {
            return this._id;
        }

        public void setId(int id) {
            this._id = id;
        }

        public double getPullerBufferUtilizationPct() {
            return this._pullerBufferUtilizationPct;
        }

        public void setPullerBufferUtilizationPct(double p) {
            this._pullerBufferUtilizationPct = p;
        }

        public boolean getConsumeCurrent() {
            return this._consumeCurrent;
        }

        public void setConsumeCurrent(boolean currentConsume) {
            this._consumeCurrent = currentConsume;
        }

        public long getConsumerTimeBudgetMs() {
            return this._consumerTimeBudgetMs;
        }

        public long getBstConsumerTimeBudgetMs() {
            if (this._setBstConsumerTimeBudgetCalled) {
                return this._bstConsumerTimeBudgetMs;
            }
            return this._consumerTimeBudgetMs;
        }

        public void setBstConsumerTimeBudgetMs(long consumerTimeBudgetMs) {
            this._setBstConsumerTimeBudgetCalled = true;
            this._bstConsumerTimeBudgetMs = consumerTimeBudgetMs;
        }

        public void setConsumerTimeBudgetMs(long consumerTimeBudgetMs) {
            this._consumerTimeBudgetMs = consumerTimeBudgetMs;
        }

        public int getConsumerParallelism() {
            return this._consumerParallelism;
        }

        public void setConsumerParallelism(int consumerParallelism) {
            this._consumerParallelism = consumerParallelism;
        }

        public double getCheckpointThresholdPct() {
            return this._checkpointThresholdPct;
        }

        public void setCheckpointThresholdPct(double checkpointThresholdPct) {
            this._checkpointThresholdPct = checkpointThresholdPct;
        }

        public long getKeyMin() {
            return this._keyMin;
        }

        public int getFreeBufferThreshold() {
            return this._freeBufferThreshold;
        }

        public void setFreeBufferThreshold(int freeBufferThreshold) {
            if (freeBufferThreshold > DEFAULT_FREE_BUFFER_THRESHOLD) {
                this._log.warn((Object)("Trying to set parameter: 'freeBufferThreshold' to " + freeBufferThreshold + " has no effect. Value=" + this._freeBufferThreshold));
            } else {
                this._freeBufferThreshold = freeBufferThreshold;
            }
        }

        public void setKeyMin(long keyMin) {
            this._keyMin = keyMin;
        }

        public long getKeyMax() {
            return this._keyMax;
        }

        public void setKeyMax(long keyMax) {
            this._keyMax = keyMax;
        }

        public BackoffTimerStaticConfigBuilder getBsPullerRetriesBeforeCkptCleanup() {
            return this._bsPullerRetriesBeforeCkptCleanup;
        }

        public int getNumRetriesOnFallOff() {
            return this._numRetriesOnFallOff;
        }

        public void setNumRetriesOnFallOff(int numRetriesOnFallOff) {
            this._numRetriesOnFallOff = numRetriesOnFallOff;
        }

        public BackoffTimerStaticConfigBuilder getPullerRetries() {
            return this._pullerRetries;
        }

        public BackoffTimerStaticConfigBuilder getBstPullerRetries() {
            if (this._bstPullerRetries != null) {
                return this._bstPullerRetries;
            }
            this._bstPullerRetries = new BackoffTimerStaticConfigBuilder();
            this._bstPullerRetries.setInitSleep((long)DEFAULT_INIT_SLEEP);
            this._bstPullerRetries.setSleepIncFactor(DEFAULT_SLEEP_INC_FACTOR);
            this._bstPullerRetries.setMaxRetryNum(DEFAULT_MAX_RETRY_NUM);
            return this._bstPullerRetries;
        }

        public BackoffTimerStaticConfigBuilder getDispatcherRetries() {
            return this._dispatcherRetries;
        }

        public boolean hasBstPullerRetries() {
            return this._bstPullerRetries != null;
        }

        public BackoffTimerStaticConfigBuilder getBstDispatcherRetries() {
            if (this._bstDispatcherRetries != null) {
                return this._bstDispatcherRetries;
            }
            this._bstDispatcherRetries = new BackoffTimerStaticConfigBuilder();
            this._bstDispatcherRetries.setSleepIncFactor(1.1);
            this._bstDispatcherRetries.setMaxRetryNum(-1);
            return this._bstDispatcherRetries;
        }

        public boolean hasBstDispatcherRetries() {
            return this._bstDispatcherRetries != null;
        }

        public boolean getEnablePullerMessageQueueLogging() {
            return this._enablePullerMessageQueueLogging;
        }

        public void setEnablePullerMessageQueueLogging(boolean enablePullerMessageQueueLogging) {
            this._enablePullerMessageQueueLogging = enablePullerMessageQueueLogging;
        }
    }

    public static class StaticConfig {
        private final DbusEventBuffer.StaticConfig _eventBuffer;
        private final DbusEventBuffer.StaticConfig _bstEventBuffer;
        private final long _consumerTimeBudgetMs;
        private final long _bstConsumerTimeBudgetMs;
        private final int _consumerParallelism;
        private final double _checkpointThresholdPct;
        private final Range _keyRange;
        private final BackoffTimerStaticConfig _bsPullerRetriesBeforeCkptCleanup;
        private final BackoffTimerStaticConfig _pullerRetries;
        private final BackoffTimerStaticConfig _bstPullerRetries;
        private final BackoffTimerStaticConfig _dispatcherRetries;
        private final BackoffTimerStaticConfig _bstDispatcherRetries;
        private final int _freeBufferThreshold;
        private final boolean _consumeCurrent;
        private final boolean _readLatestScnOnError;
        private final double _pullerBufferUtilizationPct;
        private final int _id;
        private final boolean _enablePullerMessageQueueLogging;
        private final int _numRetriesOnFallOff;
        private final int _noEventsConnectionResetTimeSec;

        public StaticConfig(DbusEventBuffer.StaticConfig eventBuffer, DbusEventBuffer.StaticConfig bstEventBuffer, long consumerTimeBudgetMs, long bstConsumerTimeBudgetMs, int consumerParallelism, double checkpointThresholdPct, Range keyRange, BackoffTimerStaticConfig bsPullerRetriesBeforeCkptCleanup, BackoffTimerStaticConfig pullerRetries, BackoffTimerStaticConfig bstPullerRetries, BackoffTimerStaticConfig dispatcherRetries, BackoffTimerStaticConfig bstDispatcherRetries, int retriesOnFellOff, int freeBufferThreshold, boolean consumeCurrent, boolean readLatestScnOnError, double pullerBufferUtilizationPct, int id, boolean enablePullerMessageQueueLogging, int noEventsConnectionResetTimeSec) {
            this._eventBuffer = eventBuffer;
            this._bstEventBuffer = bstEventBuffer;
            this._consumerTimeBudgetMs = consumerTimeBudgetMs;
            this._bstConsumerTimeBudgetMs = bstConsumerTimeBudgetMs;
            this._consumerParallelism = consumerParallelism;
            this._checkpointThresholdPct = checkpointThresholdPct;
            this._keyRange = keyRange;
            this._bsPullerRetriesBeforeCkptCleanup = bsPullerRetriesBeforeCkptCleanup;
            this._pullerRetries = pullerRetries;
            this._bstPullerRetries = bstPullerRetries;
            this._dispatcherRetries = dispatcherRetries;
            this._bstDispatcherRetries = bstDispatcherRetries;
            this._numRetriesOnFallOff = retriesOnFellOff;
            this._freeBufferThreshold = freeBufferThreshold > eventBuffer.getReadBufferSize() ? eventBuffer.getReadBufferSize() / 2 : freeBufferThreshold;
            this._consumeCurrent = consumeCurrent;
            this._readLatestScnOnError = readLatestScnOnError;
            this._pullerBufferUtilizationPct = pullerBufferUtilizationPct;
            this._id = id;
            this._enablePullerMessageQueueLogging = enablePullerMessageQueueLogging;
            this._noEventsConnectionResetTimeSec = noEventsConnectionResetTimeSec;
        }

        public int getNoEventsConnectionResetTimeSec() {
            return this._noEventsConnectionResetTimeSec;
        }

        public DbusEventBuffer.StaticConfig getBstEventBuffer() {
            return this._bstEventBuffer;
        }

        public long getBstConsumerTimeBudgetMs() {
            return this._bstConsumerTimeBudgetMs;
        }

        public BackoffTimerStaticConfig getBstDispatcherRetries() {
            return this._bstDispatcherRetries;
        }

        public boolean getReadLatestScnOnError() {
            return this._readLatestScnOnError;
        }

        public boolean isReadLatestScnOnErrorEnabled() {
            return this._readLatestScnOnError;
        }

        public double getPullerUtilizationPct() {
            return this._pullerBufferUtilizationPct;
        }

        public int getId() {
            return this._id;
        }

        public boolean getConsumeCurrent() {
            return this._consumeCurrent;
        }

        public DbusEventBuffer.StaticConfig getEventBuffer() {
            return this._eventBuffer;
        }

        public long getConsumerTimeBudgetMs() {
            return this._consumerTimeBudgetMs;
        }

        public int getConsumerParallelism() {
            return this._consumerParallelism;
        }

        public double getCheckpointThresholdPct() {
            return this._checkpointThresholdPct;
        }

        public Range getKeyRange() {
            return this._keyRange;
        }

        public BackoffTimerStaticConfig getPullerRetries() {
            return this._pullerRetries;
        }

        public BackoffTimerStaticConfig getBstPullerRetries() {
            return this._bstPullerRetries;
        }

        public BackoffTimerStaticConfig getDispatcherRetries() {
            return this._dispatcherRetries;
        }

        public BackoffTimerStaticConfig getBsPullerRetriesBeforeCkptCleanup() {
            return this._bsPullerRetriesBeforeCkptCleanup;
        }

        public int getNumRetriesOnFallOff() {
            return this._numRetriesOnFallOff;
        }

        public int getFreeBufferThreshold() {
            return this._freeBufferThreshold;
        }

        public boolean isPullerMessageQueueLoggingEnabled() {
            return this._enablePullerMessageQueueLogging;
        }

        public String toString() {
            return "StaticConfig [_eventBuffer=" + this._eventBuffer + ", _bstEventBuffer=" + this._bstEventBuffer + ", _consumerTimeBudgetMs=" + this._consumerTimeBudgetMs + ", _bstConsumerTimeBudgetMs=" + this._bstConsumerTimeBudgetMs + ", _consumerParallelism=" + this._consumerParallelism + ", _checkpointThresholdPct=" + this._checkpointThresholdPct + ", _keyRange=" + this._keyRange + ", _bsPullerRetriesBeforeCkptCleanup=" + this._bsPullerRetriesBeforeCkptCleanup + ", _pullerRetries=" + this._pullerRetries + ", _bstPullerRetries=" + this._bstPullerRetries + ", _dispatcherRetries=" + this._dispatcherRetries + ", _bstDispatcherRetries=" + this._bstDispatcherRetries + ", _freeBufferThreshold=" + this._freeBufferThreshold + ", _enablePullerMessageQueueLogging=" + this._enablePullerMessageQueueLogging + "]";
        }
    }

    public class SourcesConnectionStatus
    extends DatabusComponentStatus {
        public SourcesConnectionStatus() {
            super(DatabusSourcesConnection.this._name);
        }

        public void start() {
            super.start();
            DatabusSourcesConnection.this._relayPullerThread = new UncaughtExceptionTrackingThread((Runnable)((Object)DatabusSourcesConnection.this._relayPuller), DatabusSourcesConnection.this._relayPuller.getName());
            DatabusSourcesConnection.this._relayPullerThread.setDaemon(true);
            DatabusSourcesConnection.this._relayPullerThread.start();
            DatabusSourcesConnection.this._relayDispatcherThread = new UncaughtExceptionTrackingThread((Runnable)((Object)DatabusSourcesConnection.this._relayDispatcher), DatabusSourcesConnection.this._relayDispatcher.getName());
            DatabusSourcesConnection.this._relayDispatcherThread.setDaemon(true);
            DatabusSourcesConnection.this._relayDispatcherThread.start();
            if (DatabusSourcesConnection.this._isBootstrapEnabled) {
                DatabusSourcesConnection.this._bootstrapPullerThread = new UncaughtExceptionTrackingThread((Runnable)((Object)DatabusSourcesConnection.this._bootstrapPuller), DatabusSourcesConnection.this._bootstrapPuller.getName());
                DatabusSourcesConnection.this._bootstrapPullerThread.setDaemon(true);
                DatabusSourcesConnection.this._bootstrapPullerThread.start();
                DatabusSourcesConnection.this._bootstrapDispatcherThread = new UncaughtExceptionTrackingThread((Runnable)((Object)DatabusSourcesConnection.this._bootstrapDispatcher), DatabusSourcesConnection.this._bootstrapDispatcher.getName());
                DatabusSourcesConnection.this._bootstrapDispatcherThread.setDaemon(true);
                DatabusSourcesConnection.this._bootstrapDispatcherThread.start();
            }
        }

        public void shutdown() {
            this._log.info((Object)"shutting down connection ...");
            DatabusSourcesConnection.this._relayPuller.shutdown();
            DatabusSourcesConnection.this._relayDispatcher.shutdown();
            if (DatabusSourcesConnection.this._bootstrapPuller != null) {
                DatabusSourcesConnection.this._bootstrapPuller.shutdown();
            }
            if (DatabusSourcesConnection.this._bootstrapDispatcher != null) {
                DatabusSourcesConnection.this._bootstrapDispatcher.shutdown();
            }
            DatabusSourcesConnection.this._relayPullerThread.interrupt();
            DatabusSourcesConnection.this._relayDispatcherThread.interrupt();
            if (DatabusSourcesConnection.this._isBootstrapEnabled) {
                DatabusSourcesConnection.this._bootstrapPullerThread.interrupt();
                DatabusSourcesConnection.this._bootstrapDispatcherThread.interrupt();
            }
            super.shutdown();
            DatabusSourcesConnection.this._nannyThread.interrupt();
            this._log.info((Object)"connection shut down.");
        }

        public void pause() {
            DatabusSourcesConnection.this._relayPuller.enqueueMessage(LifecycleMessage.createPauseMessage());
            if (DatabusSourcesConnection.this._isBootstrapEnabled) {
                DatabusSourcesConnection.this._bootstrapPuller.enqueueMessage(LifecycleMessage.createPauseMessage());
            }
            super.pause();
        }

        public void resume() {
            DatabusSourcesConnection.this._relayPuller.enqueueMessage(LifecycleMessage.createResumeMessage());
            if (DatabusSourcesConnection.this._isBootstrapEnabled) {
                DatabusSourcesConnection.this._bootstrapPuller.enqueueMessage(LifecycleMessage.createResumeMessage());
            }
            super.resume();
        }

        public void suspendOnError(Throwable cause) {
            DatabusSourcesConnection.this._relayPuller.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)cause));
            if (DatabusSourcesConnection.this._isBootstrapEnabled) {
                DatabusSourcesConnection.this._bootstrapPuller.enqueueMessage(LifecycleMessage.createSuspendOnErroMessage((Throwable)cause));
            }
            super.suspendOnError(cause);
        }
    }

    class NannyRunnable
    implements Runnable {
        public static final int SLEEP_DURATION_MS = 1000;

        NannyRunnable() {
        }

        @Override
        public void run() {
            while (DatabusSourcesConnection.this.getConnectionStatus().getStatus() != DatabusComponentStatus.Status.SHUTDOWN) {
                boolean runShutdown = false;
                if (null != DatabusSourcesConnection.this._relayPuller && DatabusSourcesConnection.this._relayPuller.getComponentStatus().getStatus() == DatabusComponentStatus.Status.SHUTDOWN) {
                    DatabusSourcesConnection.this._log.error((Object)"nanny: detected that the relay puller is shutdown!");
                    runShutdown = true;
                }
                if (null != DatabusSourcesConnection.this._relayDispatcher && DatabusSourcesConnection.this._relayDispatcher.getComponentStatus().getStatus() == DatabusComponentStatus.Status.SHUTDOWN) {
                    DatabusSourcesConnection.this._log.error((Object)"nanny: detected that the relay dispatcher is shutdown!");
                    runShutdown = true;
                }
                if (null != DatabusSourcesConnection.this._bootstrapPuller && DatabusSourcesConnection.this._bootstrapPuller.getComponentStatus().getStatus() == DatabusComponentStatus.Status.SHUTDOWN) {
                    DatabusSourcesConnection.this._log.error((Object)"nanny: detected that the bootstrap puller is shutdown!");
                    runShutdown = true;
                }
                if (null != DatabusSourcesConnection.this._bootstrapDispatcher && DatabusSourcesConnection.this._bootstrapDispatcher.getComponentStatus().getStatus() == DatabusComponentStatus.Status.SHUTDOWN) {
                    DatabusSourcesConnection.this._log.error((Object)"nanny: detected that the bootstrap dispatcher is shutdown!");
                    runShutdown = true;
                }
                if (runShutdown) {
                    DatabusSourcesConnection.this.stop();
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    DatabusSourcesConnection.this._log.info((Object)"nanny: who woke me up?");
                }
            }
        }
    }
}

