/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers;

import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.mbean.DatabusReadOnlyStatus;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.EventProducer;
import com.linkedin.databus2.producers.db.EventSourceStatisticsIface;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public abstract class AbstractEventProducer
implements EventProducer {
    private final String _name;
    private EventProducerThread _thread;
    protected boolean _pauseRequested = false;
    protected boolean _shutdownRequested = false;
    private final AtomicLong _sinceSCN = new AtomicLong(-1L);
    protected final DbusEventBufferAppendable _eventBuffer;
    private final MaxSCNReaderWriter _maxScnReaderWriter;
    private final DatabusComponentStatus _status;
    private final DatabusReadOnlyStatus _statusMBean;
    protected final MBeanServer _mbeanServer;
    private long _restartScnOffset;
    private boolean _coldStart = true;
    protected final Logger _log;
    protected final Logger _eventsLog;

    public AbstractEventProducer(DbusEventBufferAppendable eventBuffer, MaxSCNReaderWriter maxScnReaderWriter, PhysicalSourceStaticConfig physicalSourceConfig, MBeanServer mbeanServer) {
        this._eventBuffer = eventBuffer;
        this._name = physicalSourceConfig.getName();
        this._maxScnReaderWriter = maxScnReaderWriter;
        this._restartScnOffset = physicalSourceConfig.getRestartScnOffset();
        this._coldStart = true;
        this._status = new DatabusComponentStatus(this._name + ".dbPuller", physicalSourceConfig.getRetries());
        this._mbeanServer = mbeanServer;
        this._statusMBean = new DatabusReadOnlyStatus(this._name, this._status, -1L);
        this._statusMBean.registerAsMbean(this._mbeanServer);
        this._log = Logger.getLogger((String)(this.getClass().getName() + "_" + this._name));
        this._eventsLog = Logger.getLogger((String)("com.linkedin.databus2.producers.db.events." + this._name));
    }

    public Logger getEventsLog() {
        return this._eventsLog;
    }

    @Override
    public String getName() {
        return this._name;
    }

    @Override
    public long getSCN() {
        return this._sinceSCN.get();
    }

    @Override
    public synchronized void start(long sinceSCN) {
        this._log.info((Object)(this.getClass().getSimpleName() + ".start: sinceScn = " + sinceSCN + "restartScnOffset = " + this._restartScnOffset));
        if (sinceSCN < 0L && null != this._maxScnReaderWriter) {
            try {
                this._log.info((Object)"attempting to read persistent event producer checkpoint ");
                sinceSCN = this._maxScnReaderWriter.getMaxScn();
                if (sinceSCN > 1L && this._coldStart && sinceSCN > this._restartScnOffset) {
                    this._log.info((Object)("sinceSCN read from SCNRW = " + sinceSCN + " restartScnOffset = " + this._restartScnOffset));
                    sinceSCN -= this._restartScnOffset;
                }
                this._log.info((Object)("Proposed sinceSCN = " + sinceSCN));
            }
            catch (Exception e) {
                this._log.error((Object)("Unable to load MaxSCN: " + e.getMessage()), (Throwable)e);
            }
        }
        if (this._thread == null) {
            this._sinceSCN.set(sinceSCN);
            this._log.info((Object)("Starting EventProducerThread from SCN: " + this._sinceSCN.get()));
            this._pauseRequested = false;
            this._shutdownRequested = false;
            this._sinceSCN.set(sinceSCN);
            if (this._coldStart) {
                this._eventBuffer.start(sinceSCN);
            }
            this._thread = new EventProducerThread(this._name);
            this._thread.setDaemon(true);
            this._thread.start();
        } else {
            this._log.warn((Object)"attempting to change checkpoint of a running event producer thread -- ignoring.");
            this._pauseRequested = false;
            this._shutdownRequested = false;
            this.notifyAll();
        }
        this._log.info((Object)("" + this._name + " started."));
    }

    @Override
    public synchronized boolean isPaused() {
        return this._pauseRequested;
    }

    @Override
    public synchronized boolean isRunning() {
        return this._thread != null && !this._shutdownRequested && !this._pauseRequested;
    }

    @Override
    public synchronized void unpause() {
        this._pauseRequested = false;
        this.notifyAll();
        if (this._thread == null) {
            this._log.warn((Object)"Unpause requested when no event thread is running.");
        }
    }

    @Override
    public synchronized void pause() {
        this._pauseRequested = true;
        this.notifyAll();
    }

    @Override
    public synchronized void shutdown() {
        this._shutdownRequested = true;
        this.notifyAll();
        if (this._thread != null) {
            this._coldStart = false;
            this._thread.interrupt();
        }
        this._statusMBean.unregisterMbean(this._mbeanServer);
    }

    @Override
    public void waitForShutdown() throws InterruptedException, IllegalStateException {
        while (null != this._thread && this._thread.isAlive()) {
            this._thread.join();
        }
    }

    @Override
    public void waitForShutdown(long timeoutMs) throws InterruptedException, IllegalStateException {
        if (null != this._thread && this._thread.isAlive()) {
            this._thread.join(timeoutMs);
        }
        if (null != this._thread && this._thread.isAlive()) {
            throw new IllegalStateException();
        }
    }

    public DatabusReadOnlyStatus getStatusMBean() {
        return this._statusMBean;
    }

    protected abstract ReadEventCycleSummary readEventsFromAllSources(long var1) throws DatabusException, EventCreationException, UnsupportedKeyException;

    public abstract List<? extends EventSourceStatisticsIface> getSources();

    protected DbusEventBufferAppendable getEventBuffer() {
        return this._eventBuffer;
    }

    protected MaxSCNReaderWriter getMaxScnReaderWriter() {
        return this._maxScnReaderWriter;
    }

    private class EventProducerThread
    extends Thread {
        public EventProducerThread(String producerName) {
            super("EventProducerThread_" + producerName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            boolean firstPause = true;
            while (true) {
                AbstractEventProducer abstractEventProducer = AbstractEventProducer.this;
                synchronized (abstractEventProducer) {
                    while (AbstractEventProducer.this._pauseRequested && !AbstractEventProducer.this._shutdownRequested) {
                        if (firstPause) {
                            firstPause = false;
                            AbstractEventProducer.this._log.info((Object)"EventProducerThread is pausing because a pause was requested.");
                        }
                        try {
                            AbstractEventProducer.this.wait();
                        }
                        catch (InterruptedException ex) {}
                    }
                    if (AbstractEventProducer.this._shutdownRequested) {
                        AbstractEventProducer.this._log.info((Object)"EventProducerThread is stopping because a shutdown was requested.");
                        AbstractEventProducer.this._thread = null;
                        AbstractEventProducer.this._eventBuffer.rollbackEvents();
                        return;
                    }
                }
                try {
                    ReadEventCycleSummary summary = AbstractEventProducer.this.readEventsFromAllSources(AbstractEventProducer.this._sinceSCN.get());
                    long newSinceSCN = Math.max(summary.getEndOfWindowScn(), AbstractEventProducer.this._sinceSCN.get());
                    AbstractEventProducer.this._sinceSCN.set(newSinceSCN);
                    if (AbstractEventProducer.this._eventsLog.isDebugEnabled() || AbstractEventProducer.this._eventsLog.isInfoEnabled() && summary.getTotalEventNum() > 0) {
                        AbstractEventProducer.this._eventsLog.info((Object)summary.toString());
                    }
                    if (AbstractEventProducer.this._status.getRetriesNum() > 0) {
                        AbstractEventProducer.this._status.resume();
                    }
                    AbstractEventProducer.this._status.getRetriesCounter().reset();
                    AbstractEventProducer.this._status.getRetriesCounter().sleep();
                }
                catch (EventCreationException ex) {
                    AbstractEventProducer.this._log.error((Object)("EventCreationException occurred while reading events from " + AbstractEventProducer.this._name + ". This error is most likely configuration or data dependent and may require manual intervention."), (Throwable)ex);
                    AbstractEventProducer.this._status.retryOnError(AbstractEventProducer.this._name + " error: " + ex.getMessage());
                }
                catch (UnsupportedKeyException ex) {
                    AbstractEventProducer.this._log.error((Object)("UnsupportedKeyException occurred while reading events from " + AbstractEventProducer.this._name + ". This error is most likely configuration or data dependent and may require manual intervention."), (Throwable)ex);
                    AbstractEventProducer.this._status.retryOnError(AbstractEventProducer.this._name + " error: " + ex.getMessage());
                }
                catch (DatabusException ex) {
                    AbstractEventProducer.this._log.error((Object)("DatabusException occurred while reading events from " + AbstractEventProducer.this._name + ". This error may be due to a transient issue (database is down?):" + ex.getMessage()), (Throwable)ex);
                    AbstractEventProducer.this._status.retryOnError(AbstractEventProducer.this._name + " error: " + ex.getMessage());
                }
                catch (Exception e) {
                    AbstractEventProducer.this._log.error((Object)("unknown exception occurred while reading events from " + AbstractEventProducer.this._name + ". This error may be due to a transient issue (database is down?):" + e.getMessage()), (Throwable)e);
                    AbstractEventProducer.this._status.retryOnError(AbstractEventProducer.this._name + " error: " + e.getMessage());
                }
                firstPause = false;
            }
        }
    }
}

