/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers;

import com.google.code.or.OpenReplicator;
import com.google.code.or.binlog.BinlogEventListener;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DatabusThreadBase;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.monitoring.mbean.EventSourceStatistics;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.producers.AbstractEventProducer;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.ORListener;
import com.linkedin.databus2.producers.ORMonitoredSourceInfo;
import com.linkedin.databus2.producers.OpenReplicatorAvroEventFactory;
import com.linkedin.databus2.producers.db.EventReaderSummary;
import com.linkedin.databus2.producers.db.EventSourceStatisticsIface;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.producers.ds.DbChangeEntry;
import com.linkedin.databus2.producers.ds.PerSourceTransaction;
import com.linkedin.databus2.producers.ds.Transaction;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.log4j.Logger;

public class OpenReplicatorEventProducer
extends AbstractEventProducer {
    public static final Integer DEFAULT_MYSQL_PORT = 3306;
    public static final Pattern PATH_PATTERN = Pattern.compile("/([0-9]+)/[0-9a-zA-Z-]+");
    protected final Logger _log;
    private final OpenReplicator _or;
    private String _binlogFilePrefix;
    private final MaxSCNReaderWriter _maxSCNReaderWriter;
    private final PhysicalSourceStaticConfig _physicalSourceStaticConfig;
    private final SchemaRegistryService _schemaRegistryService;
    private final String _physicalSourceName;
    private EventProducerThread _producerThread;
    private final Map<Integer, OpenReplicatorAvroEventFactory> _eventFactoryMap;
    private final DbusEventsStatisticsCollector _relayInboundStatsCollector;
    public static final short GLOBAL_SOURCE_ID = 0;
    private final Map<String, Short> _tableUriToSrcIdMap;
    private final Map<String, String> _tableUriToSrcNameMap;
    private final List<ObjectName> _registeredMbeans = new ArrayList<ObjectName>();
    private final MBeanServer _mbeanServer = ManagementFactory.getPlatformMBeanServer();
    private final String _jmxDomain;
    private final Map<Short, ORMonitoredSourceInfo> _monitoredSources = new HashMap<Short, ORMonitoredSourceInfo>();

    public OpenReplicatorEventProducer(List<OpenReplicatorAvroEventFactory> eventFactories, DbusEventBufferAppendable eventBuffer, MaxSCNReaderWriter maxSCNReaderWriter, PhysicalSourceStaticConfig physicalSourceStaticConfig, DbusEventsStatisticsCollector relayInboundStatsCollector, MBeanServer mbeanServer, Logger log, SchemaRegistryService schemaRegistryService, String jmxDomain) throws DatabusException, InvalidConfigException {
        super(eventBuffer, maxSCNReaderWriter, physicalSourceStaticConfig, mbeanServer);
        this._maxSCNReaderWriter = maxSCNReaderWriter;
        this._physicalSourceStaticConfig = physicalSourceStaticConfig;
        this._physicalSourceName = physicalSourceStaticConfig.getName();
        this._log = null != log ? log : Logger.getLogger((String)("com.linkedin.databus2.producers.or_" + this._physicalSourceName));
        this._eventFactoryMap = new HashMap<Integer, OpenReplicatorAvroEventFactory>();
        for (OpenReplicatorAvroEventFactory s : eventFactories) {
            this._eventFactoryMap.put(s.getSourceId(), s);
        }
        this._or = new OpenReplicator();
        this._jmxDomain = jmxDomain;
        try {
            this._binlogFilePrefix = OpenReplicatorEventProducer.processUri(new URI(physicalSourceStaticConfig.getUri()), this._or);
        }
        catch (URISyntaxException u) {
            throw new InvalidConfigException((Throwable)u);
        }
        this._schemaRegistryService = schemaRegistryService;
        this._relayInboundStatsCollector = relayInboundStatsCollector;
        this._tableUriToSrcIdMap = new HashMap<String, Short>();
        this._tableUriToSrcNameMap = new HashMap<String, String>();
        for (LogicalSourceStaticConfig l : this._physicalSourceStaticConfig.getSources()) {
            this._tableUriToSrcIdMap.put(l.getUri().toLowerCase(), l.getId());
            this._tableUriToSrcNameMap.put(l.getUri().toLowerCase(), l.getName());
            ORMonitoredSourceInfo source = this.buildORMonitoredSourceInfo(l, this._physicalSourceStaticConfig);
            this._monitoredSources.put(source.getSourceId(), source);
        }
        LogicalSourceStaticConfig logicalSourceStaticConfig = new LogicalSourceStaticConfig(0, this._physicalSourceStaticConfig.getName(), "", "constant:1", 0, false, null, null, null);
        ORMonitoredSourceInfo source = this.buildORMonitoredSourceInfo(logicalSourceStaticConfig, this._physicalSourceStaticConfig);
        this._monitoredSources.put(source.getSourceId(), source);
    }

    private ORMonitoredSourceInfo buildORMonitoredSourceInfo(LogicalSourceStaticConfig sourceConfig, PhysicalSourceStaticConfig pConfig) throws DatabusException, InvalidConfigException {
        EventSourceStatistics statisticsBean = new EventSourceStatistics(sourceConfig.getName());
        ORMonitoredSourceInfo sourceInfo = new ORMonitoredSourceInfo(sourceConfig.getId(), sourceConfig.getName(), statisticsBean);
        this.registerMbeans(sourceInfo);
        return sourceInfo;
    }

    public static String processUri(URI uri, OpenReplicator or) throws InvalidConfigException {
        String path;
        String userInfo = uri.getUserInfo();
        if (null == userInfo) {
            throw new InvalidConfigException("missing user info in: " + uri);
        }
        int slashPos = userInfo.indexOf(47);
        if (slashPos < 0) {
            slashPos = userInfo.length();
        } else if (0 == slashPos) {
            throw new InvalidConfigException("missing user name in user info: " + userInfo);
        }
        String userName = userInfo.substring(0, slashPos);
        String userPass = slashPos < userInfo.length() - 1 ? userInfo.substring(slashPos + 1) : null;
        String hostName = uri.getHost();
        int port = uri.getPort();
        if (port < 0) {
            port = DEFAULT_MYSQL_PORT;
        }
        if (null == (path = uri.getPath())) {
            throw new InvalidConfigException("missing path: " + uri);
        }
        Matcher m = PATH_PATTERN.matcher(path);
        if (!m.matches()) {
            throw new InvalidConfigException("invalid path:" + path);
        }
        Object[] gp = m.group().split("/");
        if (gp.length != 3) {
            throw new InvalidConfigException("Invalid format " + Arrays.toString(gp));
        }
        String serverIdStr = gp[1];
        int serverId = -1;
        try {
            serverId = Integer.parseInt(serverIdStr);
        }
        catch (NumberFormatException e) {
            throw new InvalidConfigException("incorrect mysql serverid:" + serverId);
        }
        if (null != or) {
            or.setUser(userName);
            if (null != userPass) {
                or.setPassword(userPass);
            }
            or.setHost(hostName);
            or.setPort(port);
            or.setServerId(serverId);
        }
        return gp[2];
    }

    public static int logid(long scn) {
        if (scn == -1L || scn == 0L) {
            return 1;
        }
        return (int)(scn >> 32 & 0xFFFFFFFFFFFFFFFFL);
    }

    public static int offset(long scn) {
        if (scn == -1L || scn == 0L) {
            return 4;
        }
        return (int)(scn & 0xFFFFFFFFFFFFFFFFL);
    }

    public synchronized void start(long sinceSCN) {
        long sinceSCNToUse = 0L;
        if (sinceSCN > 0L) {
            sinceSCNToUse = sinceSCN;
        } else if (this._maxSCNReaderWriter != null) {
            try {
                long scn = this._maxSCNReaderWriter.getMaxScn();
                if (scn > 0L) {
                    sinceSCNToUse = scn;
                }
            }
            catch (DatabusException e) {
                this._log.warn((Object)("Could not read saved maxScn: Defaulting to startSCN=" + sinceSCNToUse));
            }
        }
        this._producerThread = new EventProducerThread(this._physicalSourceName, sinceSCNToUse);
        this._producerThread.start();
    }

    protected ReadEventCycleSummary readEventsFromAllSources(long sinceSCN) throws DatabusException, EventCreationException, UnsupportedKeyException {
        throw new RuntimeException("Not supported !!");
    }

    public synchronized void unpause() {
        try {
            this._producerThread.unpause();
        }
        catch (InterruptedException e) {
            this._log.info((Object)"Interrupted while unpausing EventProducer", (Throwable)e);
        }
        super.unpause();
    }

    public synchronized void pause() {
        try {
            this._producerThread.pause();
        }
        catch (InterruptedException e) {
            this._log.info((Object)"Interrupted while pausing EventProducer", (Throwable)e);
        }
        super.pause();
    }

    public synchronized void shutdown() {
        this._producerThread.shutdown();
        super.shutdown();
        for (ObjectName name : this._registeredMbeans) {
            try {
                this._mbeanServer.unregisterMBean(name);
                this._log.info((Object)("Unregistered or-source mbean: " + name));
            }
            catch (MBeanRegistrationException e) {
                this._log.warn((Object)("Exception when unregistering or-source statistics mbean: " + name + e));
            }
            catch (InstanceNotFoundException e) {
                this._log.warn((Object)("Exception when unregistering or-source statistics mbean: " + name + e));
            }
        }
    }

    public List<? extends EventSourceStatisticsIface> getSources() {
        return new ArrayList<ORMonitoredSourceInfo>(this._monitoredSources.values());
    }

    private void registerMbeans(ORMonitoredSourceInfo source) throws DatabusException {
        try {
            Hashtable<String, String> props = new Hashtable<String, String>();
            props.put("type", "SourceStatistics");
            props.put("name", source.getSourceName());
            ObjectName objectName = new ObjectName(this._jmxDomain, props);
            if (this._mbeanServer.isRegistered(objectName)) {
                this._log.warn((Object)("Unregistering old or-source statistics mbean: " + objectName));
                this._mbeanServer.unregisterMBean(objectName);
            }
            this._mbeanServer.registerMBean(source.getStatisticsBean(), objectName);
            this._log.info((Object)("Registered or-source statistics mbean: " + objectName));
            this._registeredMbeans.add(objectName);
        }
        catch (Exception ex) {
            this._log.error((Object)("Failed to register the or-source statistics mbean for source (" + source.getSourceName() + ") due to an exception."), (Throwable)ex);
            throw new DatabusException("Failed to initialize or event statistics mbeans.", (Throwable)ex);
        }
    }

    public ORMonitoredSourceInfo getSource(short sourceId) {
        return this._monitoredSources.get(sourceId);
    }

    public class EventProducerThread
    extends DatabusThreadBase
    implements ORListener.TransactionProcessor {
        private final AtomicLong _startPrevScn;
        private final long _sinceScn;

        public EventProducerThread(String sourceName, long sinceScn) {
            super("OpenReplicator_" + sourceName);
            this._startPrevScn = new AtomicLong(-1L);
            this._sinceScn = sinceScn;
        }

        public void run() {
            OpenReplicatorEventProducer.this._eventBuffer.start(this._sinceScn);
            this._startPrevScn.set(this._sinceScn);
            int offset = OpenReplicatorEventProducer.offset(this._sinceScn);
            int logid = OpenReplicatorEventProducer.logid(this._sinceScn);
            String binlogFile = String.format("%s.%06d", OpenReplicatorEventProducer.this._binlogFilePrefix, logid);
            ORListener orl = new ORListener(logid, this._log, OpenReplicatorEventProducer.this._binlogFilePrefix, OpenReplicatorEventProducer.this._producerThread, OpenReplicatorEventProducer.this._tableUriToSrcIdMap, OpenReplicatorEventProducer.this._tableUriToSrcNameMap, OpenReplicatorEventProducer.this._schemaRegistryService);
            OpenReplicatorEventProducer.this._or.setBinlogFileName(binlogFile);
            OpenReplicatorEventProducer.this._or.setBinlogPosition((long)offset);
            OpenReplicatorEventProducer.this._or.setBinlogEventListener((BinlogEventListener)orl);
            try {
                this._log.info((Object)String.format("Open Replicator starting from %s@%d", binlogFile, offset));
                OpenReplicatorEventProducer.this._or.start();
            }
            catch (Exception e) {
                throw new DatabusRuntimeException("failed to start open replicator: " + e.getMessage(), (Throwable)e);
            }
            this._log.info((Object)"Event Producer Thread done");
        }

        @Override
        public void onEndTransaction(Transaction txn) throws DatabusException {
            if (!this.isShutdownRequested()) {
                if (this.isPauseRequested()) {
                    LOG.info((Object)"Pause requested for OpenReplicator. Pausing !!");
                    this.signalPause();
                    LOG.info((Object)"Pausing. Waiting for resume command");
                    try {
                        this.awaitUnPauseRequest();
                    }
                    catch (InterruptedException e) {
                        this._log.info((Object)"Interrupted !!");
                    }
                    LOG.info((Object)"Resuming OpenReplicator !!");
                    this.signalResumed();
                    LOG.info((Object)"OpenReplicator resumed !!");
                }
                try {
                    this.addTxnToBuffer(txn);
                    OpenReplicatorEventProducer.this._maxSCNReaderWriter.saveMaxScn(txn.getScn());
                }
                catch (UnsupportedKeyException e) {
                    this._log.fatal((Object)("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer"), (Throwable)e);
                    throw new DatabusException((Throwable)e);
                }
                catch (EventCreationException e) {
                    this._log.fatal((Object)("Got EventCreationException exception while adding txn (" + txn + ") to the buffer"), (Throwable)e);
                    throw new DatabusException((Throwable)e);
                }
            }
            if (this.isShutdownRequested()) {
                try {
                    OpenReplicatorEventProducer.this._or.stop(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    this._log.error((Object)"Got exception while stopping open replicator", (Throwable)e);
                }
                this.doShutdownNotify();
                return;
            }
        }

        private void addTxnToBuffer(Transaction txn) throws DatabusException, UnsupportedKeyException, EventCreationException {
            if (txn.getScn() <= this._startPrevScn.get()) {
                this._log.info((Object)("Skipping this transaction, EOP already send for this event. Txn SCN =" + txn.getScn() + ", _startPrevScn=" + this._startPrevScn.get()));
                return;
            }
            EventSourceStatistics globalStats = OpenReplicatorEventProducer.this.getSource((short)0).getStatisticsBean();
            OpenReplicatorEventProducer.this._eventBuffer.startEvents();
            long scn = txn.getScn();
            long timestamp = txn.getTxnNanoTimestamp();
            ArrayList<EventReaderSummary> summaries = new ArrayList<EventReaderSummary>();
            for (PerSourceTransaction t : txn.getOrderedPerSourceTransactions()) {
                long startDbUpdatesMs = System.currentTimeMillis();
                short sourceId = (short)t.getSrcId();
                EventSourceStatistics perSourceStats = OpenReplicatorEventProducer.this.getSource(sourceId).getStatisticsBean();
                long dbUpdatesEventsSize = 0L;
                for (DbChangeEntry c : t.getDbChangeEntrySet()) {
                    int length = 0;
                    try {
                        length = ((OpenReplicatorAvroEventFactory)OpenReplicatorEventProducer.this._eventFactoryMap.get(t.getSrcId())).createAndAppendEvent(c, OpenReplicatorEventProducer.this._eventBuffer, false, OpenReplicatorEventProducer.this._relayInboundStatsCollector);
                        if (length < 0) {
                            this._log.error((Object)("Unable to append DBChangeEntry (" + c + ") to event buffer !! EVB State : " + OpenReplicatorEventProducer.this._eventBuffer));
                            throw new DatabusException("Unable to append DBChangeEntry (" + c + ") to event buffer !!");
                        }
                        dbUpdatesEventsSize += (long)length;
                    }
                    catch (DatabusException e) {
                        this._log.error((Object)"Got databus exception :", (Throwable)e);
                        perSourceStats.addError();
                        globalStats.addEmptyEventCycle();
                        throw e;
                    }
                    catch (UnsupportedKeyException e) {
                        perSourceStats.addError();
                        globalStats.addEmptyEventCycle();
                        this._log.error((Object)"Got UnsupportedKeyException :", (Throwable)e);
                        throw e;
                    }
                    catch (EventCreationException e) {
                        perSourceStats.addError();
                        globalStats.addEmptyEventCycle();
                        this._log.error((Object)"Got EventCreationException :", (Throwable)e);
                        throw e;
                    }
                    perSourceStats.addEventCycle(1, txn.getTxnReadLatencyNanos(), (long)length, scn);
                    globalStats.addEventCycle(1, txn.getTxnReadLatencyNanos(), (long)length, scn);
                }
                long endDbUpdatesMs = System.currentTimeMillis();
                long dbUpdatesElapsedTimeMs = endDbUpdatesMs - startDbUpdatesMs;
                EventReaderSummary summary = new EventReaderSummary(sourceId, ((ORMonitoredSourceInfo)OpenReplicatorEventProducer.this._monitoredSources.get(sourceId)).getSourceName(), scn, t.getDbChangeEntrySet().size(), dbUpdatesEventsSize, -1L, dbUpdatesElapsedTimeMs, timestamp, timestamp, -1L);
                if (OpenReplicatorEventProducer.this._eventsLog.isInfoEnabled()) {
                    OpenReplicatorEventProducer.this._eventsLog.info((Object)summary.toString());
                }
                summaries.add(summary);
                long tsEnd = System.currentTimeMillis();
                perSourceStats.addTimeOfLastDBAccess(tsEnd);
                globalStats.addTimeOfLastDBAccess(tsEnd);
            }
            OpenReplicatorEventProducer.this._eventBuffer.endEvents(scn, OpenReplicatorEventProducer.this._relayInboundStatsCollector);
            ReadEventCycleSummary summary = new ReadEventCycleSummary(OpenReplicatorEventProducer.this._physicalSourceStaticConfig.getName(), summaries, scn, -1L);
            if (OpenReplicatorEventProducer.this._eventsLog.isInfoEnabled()) {
                OpenReplicatorEventProducer.this._eventsLog.info((Object)summary.toString());
            }
        }
    }
}

