/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core.test;

import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventInternalWritable;
import com.linkedin.databus.core.util.EventBufferConsumer;
import java.util.HashSet;
import java.util.Vector;
import org.apache.log4j.Logger;

public class DbusEventBufferConsumer
implements Runnable,
EventBufferConsumer {
    public static final Logger LOG = Logger.getLogger((String)DbusEventBufferConsumer.class.getName());
    private final DbusEventBuffer _buffer;
    private final Vector<DbusEvent> _out;
    private final HashSet<Long> _seenKeys;
    private final int _maxEvents;
    private boolean _stop;
    private final int _deletionInterval;
    private boolean _invalidEvent;
    private long _eventsReadTillInvalidEvent;
    private volatile Throwable _exceptionThrown;

    public DbusEventBufferConsumer(DbusEventBuffer buffer, int maxEvents, int deletionInterval, Vector<DbusEvent> out) {
        this._buffer = buffer;
        this._out = out;
        this._seenKeys = new HashSet();
        this._maxEvents = maxEvents;
        this._stop = false;
        this._deletionInterval = deletionInterval;
        this._invalidEvent = false;
        this._eventsReadTillInvalidEvent = 0L;
    }

    @Override
    public void run() {
        this.reset();
        try {
            int totalEvents = 0;
            long allTotalEvents = 0L;
            DbusEventBuffer.DbusEventIterator iDbusEvent = this._buffer.acquireIterator("Test_DbusEventBufferConsumer");
            do {
                if (!iDbusEvent.hasNext()) {
                    if (!this._invalidEvent) {
                        if (this._deletionInterval > 0) {
                            iDbusEvent.remove();
                        }
                        try {
                            iDbusEvent.await();
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                            return;
                        }
                    } else if (allTotalEvents >= this.eventsReadTillInvalidEvent()) {
                        LOG.info((Object)("total events read until invalid event=" + allTotalEvents + "; terminating"));
                        this.stop();
                    }
                }
                while (iDbusEvent.hasNext()) {
                    DbusEventInternalWritable e = iDbusEvent.next();
                    ++allTotalEvents;
                    if (!(e.isCheckpointMessage() || e.isControlMessage() || e.isEndOfPeriodMarker() || this._seenKeys.contains(e.key()))) {
                        this._out.add(e.createCopy());
                        this._seenKeys.add(e.key());
                        ++totalEvents;
                    }
                    if (this._deletionInterval <= 0 || allTotalEvents % (long)this._deletionInterval != 0L) continue;
                    iDbusEvent.remove();
                }
            } while (totalEvents < this._maxEvents && !this._stop);
            iDbusEvent.remove();
        }
        catch (RuntimeException e) {
            this._exceptionThrown = e;
            LOG.error((Object)("consumer exception:" + e.getMessage()), (Throwable)e);
        }
        catch (Error e) {
            this._exceptionThrown = e;
            LOG.error((Object)("consumer error:" + e.getMessage()), (Throwable)e);
        }
    }

    public boolean runWithTimeout(long timeoutMs) {
        boolean success;
        Thread runThread = new Thread((Runnable)this, "runWithTimeout-" + this);
        runThread.setDaemon(true);
        runThread.start();
        try {
            runThread.join(timeoutMs);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        boolean bl = success = !runThread.isAlive();
        if (!success) {
            this.stop();
            runThread.interrupt();
        }
        return success;
    }

    @Override
    public void onInvalidEvent(long numEventsRead) {
        this._invalidEvent = true;
        this._eventsReadTillInvalidEvent = numEventsRead;
    }

    public void stop() {
        this._stop = true;
    }

    public boolean hasStopped() {
        return this._stop;
    }

    public boolean hasInvalidEvent() {
        return this._invalidEvent;
    }

    public long eventsReadTillInvalidEvent() {
        return this._eventsReadTillInvalidEvent;
    }

    public void reset() {
        this._stop = false;
        this._invalidEvent = false;
        this._eventsReadTillInvalidEvent = 0L;
        this._exceptionThrown = null;
    }

    public Throwable getExceptionThrown() {
        return this._exceptionThrown;
    }
}

