/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core;

import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.Encoding;
import com.linkedin.databus.core.InternalDatabusEventsListenerAbstract;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.core.util.RangeBasedReaderWriterLock;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

@Deprecated
public class EventLogWriter
extends InternalDatabusEventsListenerAbstract
implements Runnable {
    public static final String MODULE = EventLogWriter.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final DbusEventBuffer _eventBuffer;
    private final File _writeDir;
    private final RangeBasedReaderWriterLock _lockProvider;
    private final Encoding _encoding;
    private final AtomicBoolean _stopRunning;
    private final ArrayBlockingQueue<RangeBasedReaderWriterLock.LockToken> _contiguousRanges;
    private final long _batchLimit;
    private final boolean _blockOnWrite;
    private final long _individualFileMaxBytes;
    private BatchState _batchState;
    private long _batchStartOffset;
    private long _batchNextOffset;
    private long _totalBytesWritten;
    private long _currentFileBytesWritten;
    private FileChannel _currentWritableByteChannel;
    private int _currentWriteFileIndex;
    private final ArrayBlockingQueue<File> _writeFileHandles;
    private final boolean _enabled;
    private final File _writeSessionDir;
    private final ReentrantLock _writeLock;
    private final long _fsyncIntervalInMillis;
    private long _lastFsyncTime;

    public long getTotalBytesWritten() {
        return this._totalBytesWritten;
    }

    public EventLogWriter(boolean enabled, DbusEventBuffer eventBuffer, File writeDir, File writeSessionDir, Encoding encoding, boolean blockOnWrite, long writeBatchSizeInBytes, int maxPendingWritesBeforeAbort, long individualFileMaxBytes, int maxFiles, long fsyncIntervalInMillis) {
        this._enabled = enabled;
        this._eventBuffer = eventBuffer;
        this._lockProvider = eventBuffer.getRwLockProvider();
        this._encoding = encoding;
        this._batchState = BatchState.INIT;
        this._stopRunning = new AtomicBoolean(false);
        this._writeLock = new ReentrantLock();
        this._fsyncIntervalInMillis = fsyncIntervalInMillis;
        this._blockOnWrite = blockOnWrite;
        this._contiguousRanges = this._blockOnWrite ? new ArrayBlockingQueue(10) : new ArrayBlockingQueue(maxPendingWritesBeforeAbort);
        this._batchLimit = writeBatchSizeInBytes;
        this._individualFileMaxBytes = individualFileMaxBytes;
        this._totalBytesWritten = 0L;
        this._currentFileBytesWritten = 0L;
        if (!writeDir.isDirectory() && !writeDir.mkdir()) {
            throw new RuntimeException(writeDir + "is not a directory, and I could not create one");
        }
        if (!writeDir.canWrite()) {
            throw new RuntimeException("Do not have write privileges on " + writeDir);
        }
        this._writeDir = writeDir;
        this._writeSessionDir = writeSessionDir;
        this._writeFileHandles = new ArrayBlockingQueue(maxFiles);
        LOG.info((Object)("Configured with writeDirectory :" + this._writeDir.getAbsolutePath()));
        LOG.info((Object)("Configured with writeSession Directory :" + this._writeSessionDir.getAbsolutePath()));
        this._eventBuffer.addInternalListener(this);
    }

    public EventLogWriter(StaticConfig staticConfig) {
        this(staticConfig.isEnabled(), staticConfig.getEventBuffer(), staticConfig.getTopLevelLogDir(), staticConfig.getWriteSessionDir(), staticConfig.getEncoding(), staticConfig.getBlockOnWrite(), staticConfig.getWriteBatchSizeInBytes(), staticConfig.getMaxPendingWrites(), staticConfig.getIndividualFileMaxBytes(), staticConfig.getMaxFiles(), staticConfig.getFsyncIntervalInMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this._stopRunning.set(!this._enabled);
        RangeBasedReaderWriterLock.LockToken readRangeLock = null;
        if (this._writeSessionDir.exists()) {
            for (File f : this._writeSessionDir.listFiles()) {
                LOG.info((Object)("deleting file " + f.getAbsolutePath() + " in directory"));
                if (f.delete()) continue;
                LOG.error((Object)("deleting failed: " + f.getAbsolutePath()));
                this._stopRunning.set(true);
            }
        } else if (!this._writeSessionDir.mkdir()) {
            LOG.error((Object)("directory creation failed: " + this._writeSessionDir));
            this._stopRunning.set(true);
        }
        while (!this._stopRunning.get()) {
            try {
                readRangeLock = this._contiguousRanges.poll(this._fsyncIntervalInMillis, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.info((Object)"interrupted");
            }
            if (readRangeLock != null) {
                try {
                    this.performWrite(readRangeLock);
                    continue;
                }
                catch (IOException e) {
                    LOG.error((Object)("error writing events:" + e.getMessage()), (Throwable)e);
                    this._stopRunning.set(true);
                    continue;
                }
                finally {
                    this._lockProvider.releaseReaderLock(readRangeLock);
                    continue;
                }
            }
            this._writeLock.lock();
            try {
                if (this._batchState != BatchState.STARTED) continue;
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)"Timeout: will queue write request");
                }
                this.queueWriteRequest();
                this._batchState = BatchState.INIT;
            }
            finally {
                this._writeLock.unlock();
            }
        }
        this.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(DbusEvent event, long offset, int size) {
        if (this._stopRunning.get()) {
            return;
        }
        this._writeLock.lock();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("EventLogWriter:onEvent:" + event));
            }
            if (this._batchState == BatchState.INIT) {
                this._batchStartOffset = offset;
                this._batchNextOffset = offset + (long)size;
                this._batchState = BatchState.STARTED;
                return;
            }
            if (this._batchState == BatchState.STARTED) {
                if (this._batchNextOffset != offset || this._batchNextOffset - this._batchStartOffset > this._batchLimit) {
                    this.queueWriteRequest();
                    this._batchStartOffset = offset;
                }
                this._batchNextOffset = offset + (long)size;
            }
        }
        finally {
            this._writeLock.unlock();
        }
    }

    private void queueWriteRequest() {
        RangeBasedReaderWriterLock.LockToken readRangeLockToken = null;
        try {
            readRangeLockToken = this._lockProvider.acquireReaderLock(this._batchStartOffset, this._batchNextOffset, this._eventBuffer.getBufferPositionParser(), "EventLogWriter.queueWriteRequest");
        }
        catch (InterruptedException e1) {
            LOG.warn((Object)"queueWriteRequest read lock wait interrupted", (Throwable)e1);
            this._stopRunning.set(true);
        }
        catch (TimeoutException e1) {
            LOG.error((Object)"queueWriteRequest read lock wait timed out", (Throwable)e1);
            this._stopRunning.set(true);
        }
        if (this._blockOnWrite) {
            try {
                this._contiguousRanges.put(readRangeLockToken);
            }
            catch (InterruptedException e) {
                LOG.info((Object)"interrupted");
            }
        } else {
            boolean addRange = this._contiguousRanges.offer(readRangeLockToken);
            if (!addRange) {
                LOG.error((Object)"Not able to keep up with event load, abandoning attempts to persist the buffer");
                this._stopRunning.set(true);
                for (RangeBasedReaderWriterLock.LockToken readRangeToken : this._contiguousRanges) {
                    this._lockProvider.releaseReaderLock(readRangeToken);
                }
                this._lockProvider.releaseReaderLock(readRangeLockToken);
                if (this._writeSessionDir.exists()) {
                    for (File f : this._writeSessionDir.listFiles()) {
                        LOG.info((Object)("deleting file " + f.getAbsolutePath() + " in directory"));
                        if (f.delete()) continue;
                        LOG.error((Object)("deleting failed: " + f.getAbsolutePath()));
                    }
                }
            }
        }
    }

    public void stop() {
        this._stopRunning.set(true);
    }

    private void performWrite(RangeBasedReaderWriterLock.LockToken readRangeLock) throws IOException {
        if (this._enabled) {
            FileChannel writeChannel = this.getCurrentWriteChannel();
            assert (writeChannel != null);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("BatchWrite:" + readRangeLock.getRange().start + ":" + readRangeLock.getRange().end));
            }
            int bytesWritten = this._eventBuffer.batchWrite(readRangeLock.getRange(), writeChannel, this._encoding);
            this._totalBytesWritten += (long)bytesWritten;
            this._currentFileBytesWritten += (long)bytesWritten;
            if (System.currentTimeMillis() - this._lastFsyncTime > this._fsyncIntervalInMillis) {
                writeChannel.force(true);
            }
        }
    }

    /*
     * Exception decompiling
     */
    private void flush() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private FileChannel getCurrentWriteChannel() {
        if (this._currentWritableByteChannel != null && this._currentFileBytesWritten >= this._individualFileMaxBytes) {
            try {
                this._currentWritableByteChannel.force(true);
                this._lastFsyncTime = System.currentTimeMillis();
                this._currentWritableByteChannel.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            this._currentWritableByteChannel = null;
        }
        if (this._currentWritableByteChannel == null) {
            File writeFile = this.getNextWriteFile();
            try {
                this._currentWritableByteChannel = new FileOutputStream(writeFile).getChannel();
            }
            catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            this._currentFileBytesWritten = 0L;
        }
        return this._currentWritableByteChannel;
    }

    private File getNextWriteFile() {
        ++this._currentWriteFileIndex;
        String fileName = this._writeSessionDir.getAbsolutePath() + File.separator + "eventBuffer_" + this._currentWriteFileIndex + ".";
        switch (this._encoding) {
            case BINARY: {
                fileName = fileName + "bin";
                break;
            }
            case JSON: 
            case JSON_PLAIN_VALUE: {
                fileName = fileName + "json";
            }
        }
        LOG.info((Object)("Creating new file " + fileName));
        File writeFile = new File(fileName);
        try {
            writeFile.createNewFile();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        assert (writeFile.canWrite());
        while (!this._writeFileHandles.offer(writeFile)) {
            File deleteHandle = this._writeFileHandles.poll();
            if (deleteHandle == null || deleteHandle.delete()) continue;
            LOG.error((Object)("deleting failed:" + deleteHandle.getAbsolutePath()));
        }
        return writeFile;
    }

    public static class Config
    implements ConfigBuilder<StaticConfig> {
        public static final boolean DEFAULT_ENABLED = false;
        public static final String DEFAULT_TOP_LEVEL_LOG_DIR = "eventLog";
        public static final String DEFAULT_WRITE_SESSION_DIR = null;
        public static final String DEFAULT_ENCODING = Encoding.BINARY.name();
        public static final long DEFAULT_WRITE_BATCH_SIZE_IN_BYTES = 65536L;
        public static final boolean DEFAULT_BLOCK_ON_WRITE = false;
        private static int FIVE_HUNDRED_MEGABYTES_IN_BYTES;
        public static final int DEFAULT_INDIVIDUAL_FILE_MAX_SIZE;
        public static final int DEFAULT_MAX_FILES = 5;
        public static final long DEFAULT_FSYNC_INTERVAL_IN_MILLIS = 10000L;
        public static final int DEFAULT_MAX_PENDING_WRITES = 20;
        protected DbusEventBuffer _eventBuffer;
        protected boolean _enabled;
        protected String _topLevelLogDir;
        protected String _writeSessionDir;
        protected long _individualFileMaxBytes;
        protected int _maxFiles;
        protected String _encoding;
        protected boolean _blockOnWrite;
        protected long _writeBatchSizeInBytes;
        protected int _maxPendingWrites;
        protected long _fsyncIntervalInMillis;

        public Config() {
            this._enabled = false;
            this._topLevelLogDir = DEFAULT_TOP_LEVEL_LOG_DIR;
            this._encoding = DEFAULT_ENCODING;
            this._writeBatchSizeInBytes = 65536L;
            this._blockOnWrite = false;
            this._individualFileMaxBytes = DEFAULT_INDIVIDUAL_FILE_MAX_SIZE;
            this._maxFiles = 5;
            this._fsyncIntervalInMillis = 10000L;
            this._maxPendingWrites = 20;
        }

        public Config(Config other) {
            this._enabled = other._enabled;
            this._topLevelLogDir = other._topLevelLogDir;
            this._encoding = other._encoding;
            this._writeBatchSizeInBytes = other._writeBatchSizeInBytes;
            this._blockOnWrite = other._blockOnWrite;
            this._individualFileMaxBytes = other._individualFileMaxBytes;
            this._eventBuffer = other._eventBuffer;
            this._maxFiles = other._maxFiles;
            this._fsyncIntervalInMillis = other._fsyncIntervalInMillis;
            this._maxPendingWrites = other._maxPendingWrites;
        }

        @Override
        public StaticConfig build() throws InvalidConfigException {
            LOG.info((Object)("Event Log Writer enabled: " + this._enabled));
            File writeDir = new File(this._topLevelLogDir);
            if (writeDir.exists() && !writeDir.canWrite()) {
                throw new InvalidConfigException("Invalid Config value : Cannot write to writeDir: " + this._topLevelLogDir);
            }
            LOG.info((Object)("Event Log Writer writeDir: " + writeDir.getAbsolutePath()));
            if (this._writeSessionDir == null) {
                File writeSessionDir = new File(writeDir.getAbsolutePath() + File.separator + "session" + "_" + System.currentTimeMillis());
                this._writeSessionDir = writeSessionDir.getAbsolutePath();
            }
            LOG.info((Object)("Event Log Writer writeSessionDir: " + this._writeSessionDir));
            Encoding encoding = null;
            try {
                encoding = Encoding.valueOf(this._encoding);
            }
            catch (Exception e) {
                throw new InvalidConfigException("Invalid Config Value for encoding: " + this._encoding);
            }
            if (this._maxFiles <= 0) {
                throw new InvalidConfigException("EventLogWriter: maxFiles configured <= 0 : " + this._maxFiles);
            }
            if (this._maxPendingWrites <= 0) {
                throw new InvalidConfigException("EventLogWriter: maxPendingWrites configured <=0 : " + this._maxPendingWrites);
            }
            LOG.info((Object)("Event Log Writer encoding: " + this._encoding));
            LOG.info((Object)("Event Log Writer writeBatchSizeInBytes: " + this._writeBatchSizeInBytes));
            LOG.info((Object)("Event Log Writer blockOnWrite: " + this._blockOnWrite));
            LOG.info((Object)("Event Log Writer individualFileMaxBytes: " + this._individualFileMaxBytes));
            LOG.info((Object)("Event Log Writer maxFiles: " + this._maxFiles));
            LOG.info((Object)("Event Log Writer maxPendingWrites: " + this._maxPendingWrites));
            LOG.info((Object)("Event Log Writer fsyncIntervalInMillis: " + this._fsyncIntervalInMillis));
            return new StaticConfig(this._eventBuffer, this._enabled, this._topLevelLogDir, this._writeSessionDir, encoding, this._writeBatchSizeInBytes, this._blockOnWrite, this._individualFileMaxBytes, this._maxFiles, this._maxPendingWrites, this._fsyncIntervalInMillis);
        }

        public boolean isEnabled() {
            return this._enabled;
        }

        public void setEnabled(boolean enabled) {
            this._enabled = enabled;
        }

        public String getTopLevelLogDir() {
            return this._topLevelLogDir;
        }

        public void setTopLevelDir(String topLevelLogDir) {
            this._topLevelLogDir = topLevelLogDir;
        }

        public String getWriteSessionDir() {
            return this._writeSessionDir;
        }

        public void setWriteSessionDir(String writeSessionDir) {
            this._writeSessionDir = writeSessionDir;
        }

        public String getEncoding() {
            return this._encoding;
        }

        public void setEncoding(String encoding) {
            this._encoding = encoding;
        }

        public long getWriteBatchSizeInBytes() {
            return this._writeBatchSizeInBytes;
        }

        public void setWriteBatchSizeInBytes(long writeBatchSizeInBytes) {
            this._writeBatchSizeInBytes = writeBatchSizeInBytes;
        }

        public boolean getBlockOnWrite() {
            return this._blockOnWrite;
        }

        public void setBlockOnWrite(boolean blockOnWrite) {
            this._blockOnWrite = blockOnWrite;
        }

        public long getIndividualFileMaxBytes() {
            return this._individualFileMaxBytes;
        }

        public void setIndividualFileMaxBytes(long individualFileMaxBytes) {
            this._individualFileMaxBytes = individualFileMaxBytes;
        }

        public int getMaxFiles() {
            return this._maxFiles;
        }

        public void setMaxFiles(int maxFiles) {
            this._maxFiles = maxFiles;
        }

        public int getMaxPendingWrites() {
            return this._maxPendingWrites;
        }

        public void setMaxPendingWrites(int maxPendingWrites) {
            this._maxPendingWrites = maxPendingWrites;
        }

        public long getFsyncIntervalInMillis() {
            return this._fsyncIntervalInMillis;
        }

        public void setFsyncIntervalInMillis(long fsyncIntervalInMillis) {
            this._fsyncIntervalInMillis = fsyncIntervalInMillis;
        }

        static {
            DEFAULT_INDIVIDUAL_FILE_MAX_SIZE = FIVE_HUNDRED_MEGABYTES_IN_BYTES = 512000000;
        }
    }

    public static class StaticConfig {
        protected DbusEventBuffer _eventBuffer;
        private final boolean _enabled;
        private final File _topLevelLogDir;
        private final File _writeSessionDir;
        private final Encoding _encoding;
        private final long _writeBatchSizeInBytes;
        private final boolean _blockOnWrite;
        private final long _individualFileMaxBytes;
        private final int _maxFiles;
        private final int _maxPendingWrites;
        private final long _fsyncIntervalInMillis;
        private EventLogWriter _existingLogWriter;

        public StaticConfig(DbusEventBuffer eventBuffer, boolean enabled, String topLevelLogsDir, String writeSessionDir, Encoding encoding, long writeBatchSizeInBytes, boolean blockOnWrite, long individualFileMaxBytes, int maxFiles, int maxPendingWrites, long fsyncIntervalInMillis) {
            this._eventBuffer = eventBuffer;
            this._enabled = enabled;
            this._topLevelLogDir = new File(topLevelLogsDir);
            this._writeSessionDir = new File(writeSessionDir);
            this._encoding = encoding;
            this._writeBatchSizeInBytes = writeBatchSizeInBytes;
            this._blockOnWrite = blockOnWrite;
            this._individualFileMaxBytes = individualFileMaxBytes;
            this._maxFiles = maxFiles;
            this._maxPendingWrites = maxPendingWrites;
            this._fsyncIntervalInMillis = fsyncIntervalInMillis;
        }

        public DbusEventBuffer getEventBuffer() {
            return this._eventBuffer;
        }

        public boolean isEnabled() {
            return this._enabled;
        }

        public File getTopLevelLogDir() {
            return this._topLevelLogDir;
        }

        public File getWriteSessionDir() {
            return this._writeSessionDir;
        }

        public Encoding getEncoding() {
            return this._encoding;
        }

        public long getWriteBatchSizeInBytes() {
            return this._writeBatchSizeInBytes;
        }

        public boolean getBlockOnWrite() {
            return this._blockOnWrite;
        }

        public long getIndividualFileMaxBytes() {
            return this._individualFileMaxBytes;
        }

        public int getMaxFiles() {
            return this._maxFiles;
        }

        public int getMaxPendingWrites() {
            return this._maxPendingWrites;
        }

        public long getFsyncIntervalInMillis() {
            return this._fsyncIntervalInMillis;
        }

        public EventLogWriter getOrCreateEventLogWriter(DbusEventBuffer eventBuffer) {
            this._eventBuffer = eventBuffer;
            this._existingLogWriter = new EventLogWriter(this);
            return this._existingLogWriter;
        }
    }

    private static enum BatchState {
        INIT,
        STARTED,
        ENDED;

    }
}

