/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core.test;

import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.OffsetNotFoundException;
import com.linkedin.databus.core.ScnNotFoundException;
import com.linkedin.databus.core.StreamEventsArgs;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import java.nio.channels.WritableByteChannel;
import org.apache.log4j.Logger;

public class DbusEventBufferWriter
implements Runnable {
    public static final Logger LOG = Logger.getLogger((String)DbusEventBufferWriter.class.getName());
    private final WritableByteChannel _channel;
    private final DbusEventBuffer _buffer;
    private final int _batchsize;
    private boolean _stop;
    private long _count;
    private long _expectedEvents;
    private final DbusEventsStatisticsCollector _stats;

    public DbusEventBufferWriter(DbusEventBuffer buffer, WritableByteChannel channel, int batchsize, DbusEventsStatisticsCollector stats) {
        this._channel = channel;
        this._buffer = buffer;
        this._batchsize = batchsize;
        this._stop = false;
        this._count = 0L;
        this._expectedEvents = -1L;
        this._stats = stats;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this._stop = false;
        this._expectedEvents = -1L;
        this._count = 0L;
        try {
            Checkpoint cp = new Checkpoint();
            cp.setFlexible();
            do {
                int streamedEvents = 0;
                StreamEventsArgs args = new StreamEventsArgs(this._batchsize).setStatsCollector(this._stats);
                while ((streamedEvents = this._buffer.streamEvents(cp, this._channel, args).getNumEventsStreamed()) > 0) {
                    this._count += (long)streamedEvents;
                }
            } while (!this._stop && !this.endOfEvents());
        }
        catch (ScnNotFoundException e) {
            LOG.error((Object)("SCN not found! " + e));
        }
        catch (OffsetNotFoundException e) {
            LOG.error((Object)("Offset not found! " + e));
        }
        finally {
            this._stop = false;
        }
    }

    public void stop() {
        this._stop = true;
    }

    public long eventsWritten() {
        return this._count;
    }

    public long expectedEvents() {
        return this._expectedEvents;
    }

    public void setExpectedEvents(long e) {
        this._expectedEvents = e;
    }

    private boolean endOfEvents() {
        return this._expectedEvents > 0L && this.eventsWritten() >= this.expectedEvents();
    }
}

