/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers;

import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.producers.ConstantPartitionFunction;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.EventProducer;
import com.linkedin.databus2.producers.EventProducerServiceProvider;
import com.linkedin.databus2.producers.OpenReplicatorAvroEventFactory;
import com.linkedin.databus2.producers.OpenReplicatorEventProducer;
import com.linkedin.databus2.producers.PartitionFunction;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import com.linkedin.databus2.schemas.NoSuchSchemaException;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import java.util.ArrayList;
import org.apache.log4j.Logger;

public class OpenReplicatorEventProducerServiceProvider
implements EventProducerServiceProvider {
    public static final String SCHEME = "or";
    public static final String MODULE = OpenReplicatorEventProducerServiceProvider.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String JMX_DOMAIN = "com.linkedin.databus2";

    public String getUriScheme() {
        return SCHEME;
    }

    public EventProducer createProducer(PhysicalSourceStaticConfig physicalSourceConfig, SchemaRegistryService schemaRegistryService, DbusEventBufferAppendable eventBuffer, DbusEventsStatisticsCollector statsCollector, MaxSCNReaderWriter checkpointWriter) throws InvalidConfigException {
        ArrayList<OpenReplicatorAvroEventFactory> eventFactories = new ArrayList<OpenReplicatorAvroEventFactory>();
        for (LogicalSourceStaticConfig sourceConfig : physicalSourceConfig.getSources()) {
            OpenReplicatorAvroEventFactory factory = null;
            try {
                factory = this.buildEventFactory(sourceConfig, physicalSourceConfig, schemaRegistryService);
            }
            catch (Exception ex) {
                LOG.error((Object)("Got exception while building monitored sources for config :" + sourceConfig), (Throwable)ex);
                throw new InvalidConfigException((Throwable)ex);
            }
            eventFactories.add(factory);
        }
        OpenReplicatorEventProducer producer = null;
        try {
            producer = new OpenReplicatorEventProducer(eventFactories, eventBuffer, checkpointWriter, physicalSourceConfig, statsCollector, null, null, schemaRegistryService, JMX_DOMAIN);
        }
        catch (DatabusException e) {
            LOG.error((Object)("Got databus exception when instantiating Open Replicator event producer for source : " + physicalSourceConfig.getName()), (Throwable)e);
            throw new InvalidConfigException((Throwable)e);
        }
        return producer;
    }

    public ConfigBuilder<? extends PhysicalSourceStaticConfig> createConfigBuilder(String propPrefix) {
        return new PhysicalSourceConfig();
    }

    protected OpenReplicatorAvroEventFactory createEventFactory(String eventViewSchema, String eventView, LogicalSourceStaticConfig sourceConfig, PhysicalSourceStaticConfig pConfig, String eventSchema, PartitionFunction partitionFunction) throws DatabusException {
        return new OpenReplicatorAvroEventFactory(sourceConfig.getId(), (short)pConfig.getId(), eventSchema, partitionFunction, pConfig.getReplBitSetter());
    }

    public OpenReplicatorAvroEventFactory buildEventFactory(LogicalSourceStaticConfig sourceConfig, PhysicalSourceStaticConfig pConfig, SchemaRegistryService schemaRegistryService) throws DatabusException, EventCreationException, UnsupportedKeyException, InvalidConfigException {
        String eventView;
        String eventViewSchema;
        String schema = null;
        try {
            schema = schemaRegistryService.fetchLatestSchemaBySourceName(sourceConfig.getName());
        }
        catch (NoSuchSchemaException e) {
            throw new InvalidConfigException("Unable to load the schema for source (" + sourceConfig.getName() + ").");
        }
        if (schema == null) {
            throw new InvalidConfigException("Unable to load the schema for source (" + sourceConfig.getName() + ").");
        }
        LOG.info((Object)("Loading schema for source id " + sourceConfig.getId() + ": " + schema));
        if (sourceConfig.getUri().indexOf(46) != -1) {
            String[] parts = sourceConfig.getUri().split("\\.");
            eventViewSchema = parts[0];
            eventView = parts[1];
        } else {
            eventViewSchema = null;
            eventView = sourceConfig.getUri();
        }
        PartitionFunction partitionFunction = this.buildPartitionFunction(sourceConfig);
        OpenReplicatorAvroEventFactory factory = this.createEventFactory(eventViewSchema, eventView, sourceConfig, pConfig, schema, partitionFunction);
        return factory;
    }

    public PartitionFunction buildPartitionFunction(LogicalSourceStaticConfig sourceConfig) throws InvalidConfigException {
        String partitionFunction = sourceConfig.getPartitionFunction();
        if (partitionFunction.startsWith("constant:")) {
            try {
                String numberPart = partitionFunction.substring("constant:".length()).trim();
                short constantPartitionNumber = Short.valueOf(numberPart);
                return new ConstantPartitionFunction(constantPartitionNumber);
            }
            catch (Exception ex) {
                throw new InvalidConfigException("Invalid partition configuration (" + partitionFunction + "). " + "Could not parse the constant partition number.");
            }
        }
        throw new InvalidConfigException("Invalid partition configuration (" + partitionFunction + ").");
    }
}

