/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers;

import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.util.RngUtils;
import com.linkedin.databus.eventgenerator.DataGenerator;
import com.linkedin.databus.eventgenerator.UnknownTypeException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.seq.MaxSCNReaderWriter;
import com.linkedin.databus2.producers.EventProducer;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import com.linkedin.databus2.schemas.NoSuchSchemaException;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.log4j.Logger;

public class RelayEventGenerator
implements EventProducer {
    public static final String MODULE = RelayEventGenerator.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final SchemaRegistryService _schemaRegistryService;
    private final DbusEventBufferAppendable _buffer;
    private final PhysicalSourceStaticConfig _pConfig;
    private final DbusEventsStatisticsCollector _statsCollector;
    private final MaxSCNReaderWriter _scnReaderWriter;
    private final long _eventRatePerSec;
    private final HashMap<String, DataGenerator> _dataGenerators;
    private final HashMap<String, byte[]> _schemaIds;
    private final HashMap<String, String> _schemaKeys;
    private long _scn = RngUtils.randomPositiveLong((long)2L, (long)50000L);
    private volatile State _state = State.INIT;
    boolean _shutdownRequested = false;
    boolean _pauseRequested = false;
    private long _restartScnOffset = 0L;
    Thread _worker;
    private final Lock _pauseLock = new ReentrantLock();
    private final Condition _pausedCondition = this._pauseLock.newCondition();

    public RelayEventGenerator(PhysicalSourceStaticConfig pConfig, SchemaRegistryService schemaRegistryService, DbusEventBufferAppendable dbusEventBuffer, DbusEventsStatisticsCollector statsCollector, MaxSCNReaderWriter scnReaderWriter) {
        this._pConfig = pConfig;
        this._schemaRegistryService = schemaRegistryService;
        this._buffer = dbusEventBuffer;
        this._statsCollector = statsCollector;
        this._eventRatePerSec = pConfig.getEventRatePerSec();
        this._dataGenerators = new HashMap(5);
        this._schemaIds = new HashMap(5);
        this._schemaKeys = new HashMap(5);
        this._scnReaderWriter = scnReaderWriter;
        this._restartScnOffset = pConfig.getRestartScnOffset();
        for (LogicalSourceStaticConfig lconf : pConfig.getSources()) {
            try {
                String schema = this._schemaRegistryService.fetchLatestSchemaBySourceName(lconf.getName());
                this._dataGenerators.put(lconf.getName(), new DataGenerator(schema));
                this._schemaIds.put(lconf.getName(), SchemaHelper.getSchemaId((String)schema));
                Schema eventSchema = Schema.parse((String)schema);
                String keyColumnName = "key";
                String keyNameOverride = SchemaHelper.getMetaField((Schema)eventSchema, (String)"pk");
                if (null != keyNameOverride) {
                    keyColumnName = keyNameOverride;
                }
                this._schemaKeys.put(lconf.getName(), keyColumnName);
            }
            catch (NoSuchSchemaException e) {
                LOG.error((Object)("Cannot find schema for " + lconf.getName() + " " + (Object)((Object)e)));
            }
            catch (DatabusException e) {
                LOG.error((Object)("Databus exception while attempting to find schema for " + lconf.getName() + " " + (Object)((Object)e)));
            }
        }
    }

    public synchronized void shutdown() {
        this._shutdownRequested = true;
        if (this._worker != null) {
            this._worker.interrupt();
        }
        LOG.warn((Object)"Shut down request sent to thread");
    }

    public synchronized void waitForShutdown() throws InterruptedException, IllegalStateException {
        if (this._state != State.SHUTDOWN && this._worker != null) {
            this._worker.join();
        }
    }

    public synchronized void waitForShutdown(long timeout) throws InterruptedException, IllegalStateException {
        if (this._state != State.SHUTDOWN && this._worker != null) {
            this._worker.join(timeout);
        }
    }

    public String getName() {
        return this._pConfig.getName() + ".mock";
    }

    public long getSCN() {
        return this._scn;
    }

    public synchronized void start(long sinceSCN) {
        if (this._state != State.RUNNING) {
            if (sinceSCN > 0L) {
                this._scn = sinceSCN;
            } else if (this._scnReaderWriter != null) {
                try {
                    long scn = this._scnReaderWriter.getMaxScn();
                    long newScn = scn >= this._restartScnOffset ? scn - this._restartScnOffset : 0L;
                    LOG.info((Object)("Checkpoint read = " + scn + " restartScnOffset=" + this._restartScnOffset + "Adjusted SCN=" + newScn));
                    if (newScn > 0L) {
                        this._scn = newScn;
                    }
                }
                catch (DatabusException e) {
                    LOG.warn((Object)("Could not read saved maxScn: Defaulting to random startSCN=" + this._scn));
                }
            }
            LOG.info((Object)("Starting with scn=" + this._scn));
            this._worker = new WorkerThread();
            this._worker.setDaemon(true);
            this._worker.start();
        } else {
            LOG.error((Object)"Thread already running! ");
        }
    }

    public synchronized boolean isRunning() {
        return this._state == State.RUNNING;
    }

    public synchronized boolean isPaused() {
        return this._state == State.PAUSED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void unpause() {
        this._pauseLock.lock();
        try {
            this._pauseRequested = false;
            this._pausedCondition.signalAll();
        }
        finally {
            this._pauseLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void pause() {
        this._pauseLock.lock();
        try {
            this._pauseRequested = true;
        }
        finally {
            this._pauseLock.unlock();
        }
    }

    int populateEvents(String source, short id, GenericRecord record, DbusEventKey key, byte[] schemaId, DbusEventsStatisticsCollector statsCollector, DbusEventBufferAppendable buffer) {
        if (record != null && key != null) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                BinaryEncoder encoder = new BinaryEncoder((OutputStream)bos);
                GenericDatumWriter writer = new GenericDatumWriter(record.getSchema());
                writer.write((Object)record, (Encoder)encoder);
                byte[] serializedValue = bos.toByteArray();
                short pPartitionId = RngUtils.randomPositiveShort();
                short lPartitionId = RngUtils.randomPositiveShort();
                long timeStamp = System.currentTimeMillis() * 1000000L;
                buffer.appendEvent(key, pPartitionId, lPartitionId, timeStamp, id, schemaId, serializedValue, false, statsCollector);
                return 1;
            }
            catch (IOException io) {
                LOG.error((Object)("Cannot create byte stream payload: " + source));
            }
        }
        return 0;
    }

    private class WorkerThread
    extends Thread {
        private WorkerThread() {
        }

        @Override
        public void run() {
            RelayEventGenerator.this._state = State.RUNNING;
            RelayEventGenerator.this._buffer.start(RelayEventGenerator.this._scn - 1L);
            while (!RelayEventGenerator.this._shutdownRequested) {
                RelayEventGenerator.this._pauseLock.lock();
                if (RelayEventGenerator.this._pauseRequested && RelayEventGenerator.this._state != State.PAUSED) {
                    RelayEventGenerator.this._state = State.PAUSED;
                    LOG.warn((Object)"Pausing event generator");
                    while (RelayEventGenerator.this._state == State.PAUSED && !RelayEventGenerator.this._shutdownRequested && RelayEventGenerator.this._pauseRequested) {
                        try {
                            RelayEventGenerator.this._pausedCondition.await();
                        }
                        catch (InterruptedException e) {
                            LOG.warn((Object)("Paused thread interrupted! Shutdown requested=" + RelayEventGenerator.this._shutdownRequested));
                        }
                    }
                }
                RelayEventGenerator.this._pauseLock.unlock();
                if (!RelayEventGenerator.this._shutdownRequested) {
                    RelayEventGenerator.this._state = State.RUNNING;
                }
                if (RelayEventGenerator.this._state != State.RUNNING) continue;
                RelayEventGenerator.this._buffer.startEvents();
                long cycleDurationMs = RelayEventGenerator.this._pConfig.getRetries().getInitSleep();
                long numEventsToGenerate = (long)((double)(RelayEventGenerator.this._eventRatePerSec * cycleDurationMs) / 1000.0);
                long startTime = System.currentTimeMillis();
                long totalEvents = 0L;
                for (LogicalSourceStaticConfig lconf : RelayEventGenerator.this._pConfig.getSources()) {
                    for (long i = 0L; i < numEventsToGenerate && !RelayEventGenerator.this._pauseRequested && !RelayEventGenerator.this._shutdownRequested; ++i) {
                        DataGenerator d = (DataGenerator)RelayEventGenerator.this._dataGenerators.get(lconf.getName());
                        GenericRecord record = null;
                        try {
                            record = d.generateRandomRecord();
                        }
                        catch (UnknownTypeException e) {
                            LOG.error((Object)("Could not generate record for source: " + lconf.getName()));
                            continue;
                        }
                        DbusEventKey eventKey = null;
                        try {
                            Object key = record.get((String)RelayEventGenerator.this._schemaKeys.get(lconf.getName()));
                            eventKey = new DbusEventKey(key);
                        }
                        catch (UnsupportedKeyException e) {
                            LOG.error((Object)("Unable to get key for " + lconf.getName()));
                        }
                        totalEvents += (long)RelayEventGenerator.this.populateEvents(lconf.getName(), lconf.getId(), record, eventKey, (byte[])RelayEventGenerator.this._schemaIds.get(lconf.getName()), RelayEventGenerator.this._statsCollector, RelayEventGenerator.this._buffer);
                    }
                }
                RelayEventGenerator.this._buffer.endEvents(RelayEventGenerator.this._scn, RelayEventGenerator.this._statsCollector);
                long timeTaken = System.currentTimeMillis() - startTime;
                LOG.debug((Object)("Time taken to populate " + totalEvents + " = " + timeTaken));
                long timeLeft = cycleDurationMs - timeTaken;
                if (timeLeft > 0L && !RelayEventGenerator.this._pauseRequested && !RelayEventGenerator.this._shutdownRequested) {
                    try {
                        Thread.sleep(timeLeft);
                    }
                    catch (InterruptedException e) {
                        LOG.error((Object)("Thread interrupted from sleep: state=" + (Object)((Object)RelayEventGenerator.this._state)));
                    }
                }
                if (RelayEventGenerator.this._scnReaderWriter != null) {
                    try {
                        RelayEventGenerator.this._scnReaderWriter.saveMaxScn(RelayEventGenerator.this._scn);
                    }
                    catch (DatabusException e) {
                        LOG.error((Object)"Cannot save scn!");
                    }
                }
                RelayEventGenerator.this._scn += RngUtils.randomPositiveLong((long)1L, (long)100000L);
            }
            RelayEventGenerator.this._state = State.SHUTDOWN;
        }
    }

    public static enum State {
        INIT,
        PAUSED,
        RUNNING,
        SHUTDOWN;

    }
}

