/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core.test;

import com.linkedin.databus.core.BootstrapCheckpointHandler;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventInternalReadable;
import com.linkedin.databus.core.DbusEventInternalWritable;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusEventV2Factory;
import com.linkedin.databus.core.InvalidEventException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.test.DbusEventBufferReflector;
import com.linkedin.databus.core.test.DbusEventCorrupter;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Vector;
import java.util.logging.Logger;

public class DbusEventAppender
implements Runnable {
    public String MODULE = DbusEventAppender.class.getName();
    public Logger LOG = Logger.getLogger(this.MODULE);
    private final boolean _invokeStartOnBuffer;
    private final int _numDataEventsBeforeSkip;
    private final boolean _callInternalListeners;
    private final int _bootstrapCheckpointPerWindow;
    private final BootstrapCheckpointHandler _bstCheckpointHandler = new BootstrapCheckpointHandler("source1", "source2", "source3");
    private Checkpoint _bootstrapCheckpoint = null;
    private final Vector<DbusEvent> _events;
    private final DbusEventBuffer _buffer;
    private long _count;
    protected DbusEventsStatisticsCollector _stats;
    private final double _fraction;
    private final DbusEventBufferReflector _bufferReflector;
    private long _dataEvents = 0L;

    public DbusEventAppender(Vector<DbusEvent> events, DbusEventBuffer buffer, DbusEventsStatisticsCollector stats) throws Exception {
        this(events, buffer, stats, 1.0, true, -1);
    }

    public DbusEventAppender(Vector<DbusEvent> events, DbusEventBuffer buffer, int bootstrapCheckpointsPerWindow, DbusEventsStatisticsCollector stats) throws Exception {
        this(events, buffer, stats, 1.0, true, -1, true, bootstrapCheckpointsPerWindow);
    }

    public DbusEventAppender(Vector<DbusEvent> events, DbusEventBuffer buffer, DbusEventsStatisticsCollector stats, boolean invokeStartOnBuffer) throws Exception {
        this(events, buffer, stats, 1.0, invokeStartOnBuffer, -1);
    }

    public DbusEventAppender(Vector<DbusEvent> events, DbusEventBuffer buffer, DbusEventsStatisticsCollector stats, double fraction) throws Exception {
        this(events, buffer, stats, fraction, true, -1);
    }

    public DbusEventAppender(Vector<DbusEvent> events, DbusEventBuffer buffer, DbusEventsStatisticsCollector stats, double fraction, boolean invokeStartOnBuffer, int numDataEventsBeforeSkip) throws Exception {
        this(events, buffer, stats, fraction, invokeStartOnBuffer, numDataEventsBeforeSkip, true, 0);
    }

    public DbusEventAppender(Vector<DbusEvent> events, DbusEventBuffer buffer, DbusEventsStatisticsCollector stats, double fraction, boolean invokeStartOnBuffer, int numDataEventsBeforeSkip, boolean callInternalListeners, int bootstrapCheckpointPerWindow) throws Exception {
        this._events = events;
        this._buffer = buffer;
        this._bufferReflector = new DbusEventBufferReflector(this._buffer);
        this._count = 0L;
        this._stats = stats;
        this._fraction = fraction;
        this._invokeStartOnBuffer = invokeStartOnBuffer;
        this._numDataEventsBeforeSkip = numDataEventsBeforeSkip;
        this._callInternalListeners = callInternalListeners;
        this._bootstrapCheckpointPerWindow = bootstrapCheckpointPerWindow;
    }

    public long eventsEmitted() {
        return this._count;
    }

    public int addEventToBuffer(DbusEvent ev, int dataEventCount) {
        byte[] payload = new byte[((DbusEventInternalReadable)ev).payloadLength()];
        ev.value().get(payload);
        if (this._numDataEventsBeforeSkip < 0 || dataEventCount < this._numDataEventsBeforeSkip) {
            this._buffer.appendEvent(new DbusEventKey(ev.key()), ev.physicalPartitionId(), ev.logicalPartitionId(), ev.timestampInNanos(), ev.srcId(), ev.schemaId(), payload, false, this._stats);
            if (!this._bufferReflector.validateBuffer()) {
                throw new RuntimeException("Buffer validation 3 failed");
            }
            ++dataEventCount;
        }
        return dataEventCount;
    }

    public void addBootstrapCheckpointEventToBuffer(long lastScn, long dataEventCount, int numCheckpoints) {
        Checkpoint cp;
        Checkpoint checkpoint = cp = this._bootstrapCheckpoint == null ? this._bstCheckpointHandler.createInitialBootstrapCheckpoint(null, 0L) : this._bootstrapCheckpoint;
        if (cp.getBootstrapStartScn() == -1L) {
            cp.setBootstrapStartScn(0L);
        }
        cp.setWindowScn(lastScn);
        cp.setSnapshotOffset(dataEventCount);
        DbusEventInternalReadable ev = new DbusEventV2Factory().createCheckpointEvent(cp);
        ByteBuffer b = ev.value();
        byte[] bytes = new byte[b.remaining()];
        b.get(bytes);
        for (int i = 0; i < numCheckpoints; ++i) {
            this._buffer.appendEvent(new DbusEventKey(ev.key()), ev.physicalPartitionId(), ev.logicalPartitionId(), ev.timestampInNanos(), ev.srcId(), ev.schemaId(), bytes, false, this._stats);
        }
    }

    public DbusEventBufferReflector getDbusEventReflector() {
        return this._bufferReflector;
    }

    public Checkpoint getBootstrapCheckpoint() {
        return this._bootstrapCheckpoint;
    }

    public void setBootstrapCheckpoint(Checkpoint checkpoint) {
        this._bootstrapCheckpoint = checkpoint;
    }

    @Override
    public void run() {
        long lastScn = -1L;
        this._count = 0L;
        int dataEventCount = 0;
        int bootstrapCheckpoints = this._bootstrapCheckpointPerWindow;
        int maxCount = (int)(this._fraction * (double)this._events.size());
        for (DbusEvent ev : this._events) {
            if (dataEventCount >= maxCount) break;
            long evScn = ev.sequence();
            if (lastScn != evScn) {
                if (lastScn == -1L) {
                    if (this._invokeStartOnBuffer) {
                        this._buffer.start(evScn - 1L);
                    }
                    this._buffer.startEvents();
                    if (this._bootstrapCheckpoint != null) {
                        this.addBootstrapCheckpointEventToBuffer(evScn - 1L, dataEventCount, 1);
                    }
                } else {
                    ++this._count;
                    if (this._callInternalListeners) {
                        if (0 == bootstrapCheckpoints) {
                            this._buffer.endEvents(lastScn, this._stats);
                        } else {
                            this.addBootstrapCheckpointEventToBuffer(lastScn, dataEventCount, 1);
                        }
                        --bootstrapCheckpoints;
                    } else {
                        if (0 == bootstrapCheckpoints) {
                            this._buffer.endEvents(true, lastScn, false, false, this._stats);
                        } else {
                            this.addBootstrapCheckpointEventToBuffer(lastScn, dataEventCount, 1);
                        }
                        --bootstrapCheckpoints;
                    }
                    if (!this._bufferReflector.validateBuffer()) {
                        throw new RuntimeException("Buffer validation 1 failed");
                    }
                    if (bootstrapCheckpoints < 0) {
                        this._buffer.startEvents();
                        bootstrapCheckpoints = this._bootstrapCheckpointPerWindow;
                    }
                }
                if (!this._bufferReflector.validateBuffer()) {
                    throw new RuntimeException("Buffer validation 2 failed");
                }
                lastScn = evScn;
            }
            dataEventCount = this.addEventToBuffer(ev, dataEventCount);
            ++this._count;
        }
        if (lastScn != -1L && maxCount == this._events.size()) {
            ++this._count;
            this._buffer.endEvents(lastScn, this._stats);
        }
        this._dataEvents = dataEventCount;
    }

    public long numDataEvents() {
        return this._dataEvents;
    }

    public int tarnishEventsInBuffer(int[] positions, DbusEventCorrupter.EventCorruptionType type) throws InvalidEventException {
        int tarnishedEvents = 0;
        int count = 0;
        int posIndex = 0;
        boolean onlyDataEvents = type == DbusEventCorrupter.EventCorruptionType.PAYLOAD;
        Iterator<DbusEventInternalWritable> di = this._buffer.iterator();
        while (posIndex < positions.length && di.hasNext()) {
            DbusEventInternalWritable ev = di.next();
            if (onlyDataEvents && ev.isControlMessage()) continue;
            if (count == positions[posIndex]) {
                DbusEventCorrupter.toggleEventCorruption(type, ev);
                ++tarnishedEvents;
                ++posIndex;
            }
            ++count;
        }
        return tarnishedEvents;
    }
}

