/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.relay;

import com.linkedin.databus.container.netty.HttpRelay;
import com.linkedin.databus.container.request.ControlSourceEventsRequestProcessor;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.NamedObject;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.data_model.PhysicalPartition;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.request.ProcessorRegistrationConflictException;
import com.linkedin.databus2.core.container.request.RequestProcessor;
import com.linkedin.databus2.core.container.request.RequestProcessorRegistry;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.core.seq.MultiServerSequenceNumberHandler;
import com.linkedin.databus2.core.seq.SequenceNumberHandlerFactory;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.EventProducer;
import com.linkedin.databus2.producers.EventProducerServiceProvider;
import com.linkedin.databus2.producers.RelayEventProducer;
import com.linkedin.databus2.producers.RelayEventProducersRegistry;
import com.linkedin.databus2.producers.db.OracleEventProducer;
import com.linkedin.databus2.relay.GoldenGateEventProducer;
import com.linkedin.databus2.relay.MonitoringEventProducer;
import com.linkedin.databus2.relay.OracleEventProducerFactory;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.ReplicationBitSetterStaticConfig;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;

public class DatabusRelayMain
extends HttpRelay {
    public static final Logger LOG = Logger.getLogger((String)DatabusRelayMain.class.getName());
    public static final String DB_RELAY_CONFIG_FILE_OPT_NAME = "db_relay_config";
    private final RelayEventProducersRegistry _producersRegistry = RelayEventProducersRegistry.getInstance();
    MultiServerSequenceNumberHandler _maxScnReaderWriters;
    protected Map<PhysicalPartition, EventProducer> _producers;
    Map<PhysicalPartition, MonitoringEventProducer> _monitoringProducers;
    ControlSourceEventsRequestProcessor _csEventRequestProcessor;
    private boolean _dbPullerStart = false;

    public DatabusRelayMain() throws IOException, InvalidConfigException, DatabusException {
        this(new HttpRelay.Config(), null);
    }

    public DatabusRelayMain(HttpRelay.Config config, PhysicalSourceStaticConfig[] pConfigs) throws IOException, InvalidConfigException, DatabusException {
        this(config.build(), pConfigs);
    }

    public DatabusRelayMain(HttpRelay.StaticConfig config, PhysicalSourceStaticConfig[] pConfigs) throws IOException, InvalidConfigException, DatabusException {
        super(config, pConfigs);
        SequenceNumberHandlerFactory handlerFactory = this._relayStaticConfig.getDataSources().getSequenceNumbersHandler().createFactory();
        this._maxScnReaderWriters = new MultiServerSequenceNumberHandler(handlerFactory);
        this._producers = new HashMap<PhysicalPartition, EventProducer>(this._pConfigs.size());
        this._monitoringProducers = new HashMap<PhysicalPartition, MonitoringEventProducer>(this._pConfigs.size());
        this._dbPullerStart = false;
    }

    public void setDbPullerStart(boolean s) {
        this._dbPullerStart = s;
    }

    public boolean getDbPullerStart() {
        return this._dbPullerStart;
    }

    @Override
    public void removeOneProducer(PhysicalSourceStaticConfig pConfig) {
        PhysicalPartition pPartition = pConfig.getPhysicalPartition();
        ArrayList<EventProducer> plist = new ArrayList<EventProducer>();
        if (this._producers != null && this._producers.containsKey(pPartition)) {
            plist.add(this._producers.remove(pPartition));
        }
        if (this._monitoringProducers != null && this._monitoringProducers.containsKey(pPartition)) {
            plist.add(this._monitoringProducers.remove(pPartition));
        }
        if (plist.size() > 0 && this._csEventRequestProcessor != null) {
            this._csEventRequestProcessor.removeEventProducers(plist);
        }
    }

    @Override
    public void addOneProducer(PhysicalSourceStaticConfig pConfig) throws DatabusException, EventCreationException, UnsupportedKeyException, SQLException, InvalidConfigException {
        ArrayList<EventProducer> plist = new ArrayList<EventProducer>();
        PhysicalPartition pPartition = pConfig.getPhysicalPartition();
        MaxSCNReaderWriter maxScnReaderWriters = this._maxScnReaderWriters.getOrCreateHandler((NamedObject)pPartition);
        LOG.info((Object)("Starting server container with maxScnReaderWriter:" + maxScnReaderWriters));
        DbusEventBufferAppendable dbusEventBuffer = this.getEventBuffer().getDbusEventBufferAppendable(pPartition);
        SchemaRegistryService schemaRegistryService = this.getSchemaRegistryService();
        this.addPhysicalPartitionCollectors(pPartition);
        String statsCollectorName = pPartition.toSimpleString();
        String uri = pConfig.getUri();
        if (uri == null) {
            throw new DatabusException("Uri is required to start the relay");
        }
        uri = uri.trim();
        Object producer = null;
        if (uri.startsWith("jdbc:")) {
            ReplicationBitSetterStaticConfig.SourceType sourceType = pConfig.getReplBitSetter().getSourceType();
            if (ReplicationBitSetterStaticConfig.SourceType.TOKEN.equals((Object)sourceType)) {
                throw new DatabusException("Token Source-type for Replication bit setter config cannot be set for trigger-based Databus relay !!");
            }
            producer = new OracleEventProducerFactory().buildEventProducer(pConfig, schemaRegistryService, dbusEventBuffer, this.getMbeanServer(), (DbusEventsStatisticsCollector)this._inBoundStatsCollectors.getStatsCollector(statsCollectorName), maxScnReaderWriters);
        } else if (uri.startsWith("mock")) {
            EventProducerServiceProvider mockProvider = this._producersRegistry.getEventProducerServiceProvider("mock");
            if (null == mockProvider) {
                throw new DatabusRuntimeException("relay event producer not available: mock");
            }
            producer = mockProvider.createProducer(pConfig, schemaRegistryService, dbusEventBuffer, (DbusEventsStatisticsCollector)this._inBoundStatsCollectors.getStatsCollector(statsCollectorName), maxScnReaderWriters);
        } else if (uri.startsWith("gg:")) {
            producer = new GoldenGateEventProducer(pConfig, schemaRegistryService, dbusEventBuffer, (DbusEventsStatisticsCollector)this._inBoundStatsCollectors.getStatsCollector(statsCollectorName), maxScnReaderWriters);
        } else if (uri.startsWith("mysql:")) {
            LOG.info((Object)("Adding OpenReplicatorEventProducer for uri :" + uri));
            String serviceName = "or";
            EventProducerServiceProvider orProvider = this._producersRegistry.getEventProducerServiceProvider("or");
            if (null == orProvider) {
                throw new DatabusRuntimeException("relay event producer not available: or");
            }
            producer = orProvider.createProducer(pConfig, schemaRegistryService, dbusEventBuffer, (DbusEventsStatisticsCollector)this._inBoundStatsCollectors.getStatsCollector(statsCollectorName), maxScnReaderWriters);
        } else {
            RelayEventProducer.DatabusClientNettyThreadPools nettyThreadPools = new RelayEventProducer.DatabusClientNettyThreadPools(0, this.getNetworkTimeoutTimer(), this.getBossExecutorService(), this.getIoExecutorService(), this.getHttpChannelGroup());
            producer = new RelayEventProducer(pConfig, dbusEventBuffer, (DbusEventsStatisticsCollector)this._inBoundStatsCollectors.getStatsCollector(statsCollectorName), maxScnReaderWriters, nettyThreadPools);
        }
        this._producers.put(pPartition, (EventProducer)producer);
        plist.add((EventProducer)producer);
        if (producer instanceof OracleEventProducer) {
            MonitoringEventProducer monitoringProducer = new MonitoringEventProducer("dbMonitor." + pPartition.toSimpleString(), pConfig.getName(), pConfig.getUri(), ((OracleEventProducer)((Object)producer)).getMonitoredSourceInfos(), this.getMbeanServer());
            this._monitoringProducers.put(pPartition, monitoringProducer);
            plist.add(monitoringProducer);
        }
        if (this._csEventRequestProcessor == null) {
            this._csEventRequestProcessor = new ControlSourceEventsRequestProcessor(null, (HttpRelay)this, plist);
        } else {
            this._csEventRequestProcessor.addEventProducers(plist);
        }
        RequestProcessorRegistry processorRegistry = this.getProcessorRegistry();
        processorRegistry.reregister("controlSources", (RequestProcessor)this._csEventRequestProcessor);
    }

    public void initProducers() throws InvalidConfigException, DatabusException, EventCreationException, UnsupportedKeyException, SQLException, ProcessorRegistrationConflictException {
        LOG.info((Object)"initializing producers");
        for (PhysicalSourceStaticConfig pConfig : this._pConfigs) {
            this.addOneProducer(pConfig);
        }
        this.setDbPullerStart(this._relayStaticConfig.getStartDbPuller());
        LOG.info((Object)"done initializing producers");
    }

    public MaxSCNReaderWriter getMaxSCNReaderWriter(PhysicalSourceStaticConfig pConfig) {
        try {
            MaxSCNReaderWriter maxScnReaderWriters = this._maxScnReaderWriters.getOrCreateHandler((NamedObject)pConfig.getPhysicalPartition());
            return maxScnReaderWriters;
        }
        catch (DatabusException e) {
            LOG.warn((Object)("Cannot get maxScnReaderWriter for " + pConfig.getPhysicalPartition() + " error=" + (Object)((Object)e)));
            return null;
        }
    }

    public static void main(String[] args) throws Exception {
        HttpRelay.Cli cli = new HttpRelay.Cli();
        cli.processCommandLineArgs(args);
        cli.parseRelayConfig();
        PhysicalSourceStaticConfig[] pStaticConfigs = cli.getPhysicalSourceStaticConfigs();
        HttpRelay.StaticConfig staticConfig = cli.getRelayConfigBuilder().build();
        DatabusRelayMain serverContainer = new DatabusRelayMain(staticConfig, pStaticConfigs);
        serverContainer.initProducers();
        serverContainer.registerShutdownHook();
        serverContainer.startAndBlock();
    }

    @Override
    protected void doStart() {
        super.doStart();
        LOG.info((Object)("Starting. Producers are :" + this._producers));
        for (Map.Entry<PhysicalPartition, EventProducer> entry : this._producers.entrySet()) {
            EventProducer producer = entry.getValue();
            if (!this.getDbPullerStart() || producer == null) continue;
            LOG.info((Object)("starting db puller: " + producer.getName()));
            producer.start(-1L);
            LOG.info((Object)("db puller started: " + producer.getName()));
        }
    }

    @Override
    public void pause() {
        for (Map.Entry<PhysicalPartition, EventProducer> entry : this._producers.entrySet()) {
            EventProducer producer = entry.getValue();
            if (null == producer) continue;
            if (producer.isRunning()) {
                producer.pause();
                LOG.info((Object)("EventProducer :" + producer.getName() + "  pause sent"));
                continue;
            }
            if (!producer.isPaused()) continue;
            LOG.info((Object)("EventProducer :" + producer.getName() + "  already paused"));
        }
    }

    @Override
    public void resume() {
        for (Map.Entry<PhysicalPartition, EventProducer> entry : this._producers.entrySet()) {
            EventProducer producer = entry.getValue();
            if (null == producer) continue;
            if (producer.isPaused()) {
                producer.unpause();
                LOG.info((Object)("EventProducer :" + producer.getName() + "  resume sent"));
                continue;
            }
            if (!producer.isRunning()) continue;
            LOG.info((Object)("EventProducer :" + producer.getName() + "  already running"));
        }
    }

    @Override
    protected void doShutdown() {
        LOG.warn((Object)"Shutting down Relay!");
        for (Map.Entry<PhysicalPartition, EventProducer> entry : this._producers.entrySet()) {
            MonitoringEventProducer monitoringProducer;
            PhysicalPartition pPartition = entry.getKey();
            EventProducer producer = entry.getValue();
            if (null != producer && (producer.isRunning() || producer.isPaused())) {
                producer.shutdown();
                try {
                    producer.waitForShutdown();
                }
                catch (InterruptedException ie) {
                    // empty catch block
                }
                LOG.info((Object)"EventProducer is shutdown!");
            }
            if ((monitoringProducer = this._monitoringProducers.get(pPartition)) == null) continue;
            if (monitoringProducer.isRunning()) {
                monitoringProducer.shutdown();
            }
            while (monitoringProducer.isRunning() || monitoringProducer.isPaused()) {
                try {
                    monitoringProducer.waitForShutdown();
                }
                catch (InterruptedException ie) {}
            }
            monitoringProducer.unregisterMBeans();
        }
        super.doShutdown();
    }

    public EventProducer[] getProducers() {
        EventProducer[] result = new EventProducer[this._producers.size()];
        this._producers.values().toArray(result);
        return result;
    }

    public MonitoringEventProducer[] getMonitoringProducers() {
        MonitoringEventProducer[] result = new MonitoringEventProducer[this._monitoringProducers.size()];
        this._monitoringProducers.values().toArray(result);
        return result;
    }

    public void awaitShutdown() {
        super.awaitShutdown();
    }
}

