/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir.benchmark.storage;

import com.github.ylgrgyq.reservoir.Bits;
import com.github.ylgrgyq.reservoir.NamedThreadFactory;
import com.github.ylgrgyq.reservoir.ObjectQueueStorage;
import com.github.ylgrgyq.reservoir.ObjectWithId;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.IndexType;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RocksDbStorage
implements ObjectQueueStorage {
    private static final Logger logger = LoggerFactory.getLogger(RocksDbStorage.class);
    private static final String DEFAULT_QUEUE_NAME = "reservoir_queue";
    private static final byte[] CONSUMER_COMMIT_ID_META_KEY = "consumer_committed_id".getBytes(StandardCharsets.UTF_8);
    private static final ThreadFactory threadFactory = new NamedThreadFactory("storage-background-truncate-handler-");
    private final Thread backgroundTruncateHandler;
    private final List<ColumnFamilyOptions> cfOptions = new ArrayList<ColumnFamilyOptions>();
    private final long readRetryIntervalMillis;
    private final RocksDB db;
    private final ColumnFamilyHandle defaultColumnFamilyHandle;
    private final ColumnFamilyHandle columnFamilyHandle;
    private final List<ColumnFamilyHandle> columnFamilyHandles;
    private final WriteOptions writeOptions;
    private final ReadOptions readOptions;
    private final DBOptions dbOptions;
    private volatile boolean closed;

    public RocksDbStorage(String path) throws InterruptedException {
        this(path, false, 500L, TimeUnit.MINUTES.toMillis(1L));
    }

    public RocksDbStorage(String path, boolean destroyPreviousDbFiles) throws InterruptedException {
        this(path, destroyPreviousDbFiles, 500L, TimeUnit.MINUTES.toMillis(1L));
    }

    public RocksDbStorage(String path, boolean destroyPreviousDbFiles, long readRetryIntervalMillis) throws InterruptedException {
        this(path, destroyPreviousDbFiles, readRetryIntervalMillis, TimeUnit.MINUTES.toMillis(1L));
    }

    public RocksDbStorage(String path, boolean destroyPreviousDbFiles, long readRetryIntervalMillis, long detectTruncateIntervalMillis) throws InterruptedException {
        this.readRetryIntervalMillis = readRetryIntervalMillis;
        try {
            DBOptions dbOptions = this.createDefaultRocksDBOptions();
            dbOptions.setCreateMissingColumnFamilies(true);
            dbOptions.setCreateIfMissing(true);
            this.dbOptions = dbOptions;
            WriteOptions writeOptions = new WriteOptions();
            writeOptions.setSync(false);
            this.writeOptions = writeOptions;
            ReadOptions totalOrderReadOptions = new ReadOptions();
            totalOrderReadOptions.setTotalOrderSeek(true);
            this.readOptions = totalOrderReadOptions;
            File dir = new File(path);
            if (dir.exists() && !dir.isDirectory()) {
                throw new IllegalStateException("Invalid log path, it's a regular file: " + path);
            }
            ColumnFamilyOptions columnFamilyOptions = this.createDefaultColumnFamilyOptions();
            if (destroyPreviousDbFiles) {
                try (Options destroyOptions = new Options(dbOptions, columnFamilyOptions);){
                    RocksDB.destroyDB(path, destroyOptions);
                }
            }
            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>();
            columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
            ColumnFamilyOptions options = this.createDefaultColumnFamilyOptions();
            columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_QUEUE_NAME.getBytes(StandardCharsets.UTF_8), options));
            this.columnFamilyHandles = new ArrayList<ColumnFamilyHandle>();
            this.db = RocksDB.open(dbOptions, path, columnFamilyDescriptors, this.columnFamilyHandles);
            this.defaultColumnFamilyHandle = this.columnFamilyHandles.get(0);
            this.columnFamilyHandle = this.columnFamilyHandles.get(1);
            this.backgroundTruncateHandler = threadFactory.newThread(new BackgroundTruncateHandler(detectTruncateIntervalMillis));
            this.backgroundTruncateHandler.start();
        }
        catch (RocksDBException ex) {
            String msg = String.format("init RocksDb on path %s failed", path);
            this.close();
            throw new IllegalStateException(msg, ex);
        }
    }

    @Override
    public void commitId(long id) {
        try {
            byte[] bs = new byte[8];
            Bits.putLong(bs, 0, id);
            this.db.put(this.defaultColumnFamilyHandle, this.writeOptions, CONSUMER_COMMIT_ID_META_KEY, bs);
        }
        catch (RocksDBException ex) {
            throw new IllegalStateException("fail to commit id: " + id, ex);
        }
    }

    @Override
    public long getLastCommittedId() {
        try {
            byte[] commitIdInBytes = this.db.get(this.defaultColumnFamilyHandle, this.readOptions, CONSUMER_COMMIT_ID_META_KEY);
            if (commitIdInBytes != null) {
                return Bits.getLong(commitIdInBytes, 0);
            }
            return 0L;
        }
        catch (RocksDBException ex) {
            throw new IllegalStateException("fail to get last committed id: ", ex);
        }
    }

    long getLastProducedId() {
        try (RocksIterator it = this.db.newIterator(this.columnFamilyHandle, this.readOptions);){
            it.seekToLast();
            if (it.isValid()) {
                long l = Bits.getLong(it.key(), 0);
                return l;
            }
            long l = 0L;
            return l;
        }
    }

    @Override
    public List<ObjectWithId> fetch(long fromId, int limit) throws InterruptedException {
        if (fromId < 0L) {
            throw new IllegalArgumentException("fromId: " + fromId + " (expect: >=0)");
        }
        ArrayList<ObjectWithId> entries = new ArrayList<ObjectWithId>(limit);
        while (true) {
            try (RocksIterator it = this.db.newIterator(this.columnFamilyHandle, this.readOptions);){
                it.seek(this.getKeyBytes(fromId));
                while (it.isValid() && entries.size() < limit) {
                    long id = Bits.getLong(it.key(), 0);
                    if (id != fromId) {
                        ObjectWithId entry = new ObjectWithId(id, it.value());
                        entries.add(entry);
                    }
                    it.next();
                }
            }
            if (!entries.isEmpty()) break;
            Thread.sleep(this.readRetryIntervalMillis);
        }
        return Collections.unmodifiableList(entries);
    }

    @Override
    public List<ObjectWithId> fetch(long fromId, int limit, long timeout, TimeUnit unit) throws InterruptedException {
        if (fromId < 0L) {
            throw new IllegalArgumentException("fromId: " + fromId + " (expect: >=0)");
        }
        long end = System.nanoTime() + unit.toNanos(timeout);
        ArrayList<ObjectWithId> entries = new ArrayList<ObjectWithId>(limit);
        while (true) {
            long remain;
            try (RocksIterator it = this.db.newIterator(this.columnFamilyHandle, this.readOptions);){
                it.seek(this.getKeyBytes(fromId));
                while (it.isValid() && entries.size() < limit) {
                    long id = Bits.getLong(it.key(), 0);
                    if (id != fromId) {
                        ObjectWithId entry = new ObjectWithId(id, it.value());
                        entries.add(entry);
                    }
                    it.next();
                }
            }
            if (!entries.isEmpty() || (remain = TimeUnit.NANOSECONDS.toMillis(end - System.nanoTime())) <= 0L) break;
            Thread.sleep(Math.min(remain, this.readRetryIntervalMillis));
        }
        return Collections.unmodifiableList(entries);
    }

    @Override
    public void store(List<byte[]> queue) {
        Objects.requireNonNull(queue, "queue");
        try {
            WriteBatch batch = new WriteBatch();
            long id = this.getLastProducedId();
            for (byte[] e : queue) {
                batch.put(this.columnFamilyHandle, this.getKeyBytes(++id), e);
            }
            this.db.write(this.writeOptions, batch);
        }
        catch (RocksDBException e) {
            throw new IllegalStateException("fail to append entry", e);
        }
    }

    @Override
    public void close() throws InterruptedException {
        this.closed = true;
        if (this.backgroundTruncateHandler != null) {
            this.backgroundTruncateHandler.interrupt();
            this.backgroundTruncateHandler.join();
        }
        this.closeDB();
        this.closeOptions();
    }

    private DBOptions createDefaultRocksDBOptions() {
        DBOptions opts = new DBOptions();
        opts.setCreateIfMissing(true);
        opts.setCreateMissingColumnFamilies(true);
        opts.setMaxOpenFiles(-1);
        int cpus = Runtime.getRuntime().availableProcessors();
        opts.setMaxBackgroundCompactions(Math.min(cpus, 4));
        opts.setMaxBackgroundFlushes(1);
        return opts;
    }

    private ColumnFamilyOptions createDefaultColumnFamilyOptions() {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig().setIndexType(IndexType.kHashSearch).setBlockSize(4096L).setFilterPolicy(new BloomFilter(16, false)).setCacheIndexAndFilterBlocks(true).setBlockCache(new LRUCache(0x20000000L, 8));
        ColumnFamilyOptions options = new ColumnFamilyOptions();
        this.cfOptions.add(options);
        return options;
    }

    private void closeDB() {
        for (ColumnFamilyHandle handle : this.columnFamilyHandles) {
            handle.close();
        }
        if (this.db != null) {
            this.db.close();
        }
    }

    private void closeOptions() {
        if (this.dbOptions != null) {
            this.dbOptions.close();
        }
        if (this.cfOptions != null) {
            for (ColumnFamilyOptions opt : this.cfOptions) {
                opt.close();
            }
            this.cfOptions.clear();
        }
        if (this.writeOptions != null) {
            this.writeOptions.close();
        }
        if (this.readOptions != null) {
            this.readOptions.close();
        }
    }

    private byte[] getKeyBytes(long id) {
        byte[] ks = new byte[8];
        Bits.putLong(ks, 0, id);
        return ks;
    }

    static {
        RocksDB.loadLibrary();
    }

    private class BackgroundTruncateHandler
    implements Runnable {
        private final long detectTruncateIntervalMillis;

        BackgroundTruncateHandler(long detectTruncateIntervalMillis) {
            this.detectTruncateIntervalMillis = detectTruncateIntervalMillis;
        }

        @Override
        public void run() {
            while (!RocksDbStorage.this.closed) {
                try {
                    long lastCommittedId = RocksDbStorage.this.getLastCommittedId();
                    long truncateId = Math.max(0L, lastCommittedId - 1000L);
                    try {
                        RocksDbStorage.this.db.deleteRange(RocksDbStorage.this.columnFamilyHandle, RocksDbStorage.this.getKeyBytes(0L), RocksDbStorage.this.getKeyBytes(truncateId));
                    }
                    catch (RocksDBException e) {
                        logger.error("Fail to truncatePrefix {}", (Object)truncateId, (Object)e);
                    }
                    Thread.sleep(this.detectTruncateIntervalMillis);
                }
                catch (InterruptedException lastCommittedId) {
                }
                catch (Exception ex) {
                    logger.error("Truncate handler failed for entry", ex);
                    break;
                }
            }
        }
    }
}

