/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir.storage;

import com.github.ylgrgyq.reservoir.Bits;
import com.github.ylgrgyq.reservoir.NamedThreadFactory;
import com.github.ylgrgyq.reservoir.ObjectQueueStorage;
import com.github.ylgrgyq.reservoir.ObjectWithId;
import com.github.ylgrgyq.reservoir.StorageException;
import com.github.ylgrgyq.reservoir.storage.BadRecordException;
import com.github.ylgrgyq.reservoir.storage.FileBasedStorageBuilder;
import com.github.ylgrgyq.reservoir.storage.FileName;
import com.github.ylgrgyq.reservoir.storage.LogReader;
import com.github.ylgrgyq.reservoir.storage.LogWriter;
import com.github.ylgrgyq.reservoir.storage.Manifest;
import com.github.ylgrgyq.reservoir.storage.ManifestRecord;
import com.github.ylgrgyq.reservoir.storage.Memtable;
import com.github.ylgrgyq.reservoir.storage.SSTableFileMetaInfo;
import com.github.ylgrgyq.reservoir.storage.SeekableIterator;
import com.github.ylgrgyq.reservoir.storage.StorageRuntimeException;
import com.github.ylgrgyq.reservoir.storage.TableBuilder;
import com.github.ylgrgyq.reservoir.storage.TableCache;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class FileBasedStorage
implements ObjectQueueStorage {
    private static final Logger logger = LoggerFactory.getLogger(FileBasedStorage.class.getName());
    private static final ThreadFactory safeCloseThreadFactory = new NamedThreadFactory("safe-close-thread-");
    private final ExecutorService sstableWriterPool;
    private final String baseDir;
    private final TableCache tableCache;
    private final Manifest manifest;
    private final long readRetryIntervalMillis;
    private final FileLock storageLock;
    @Nullable
    private LogWriter dataLogWriter;
    private volatile int dataLogFileNumber;
    @Nullable
    private LogWriter consumerCommitLogWriter;
    private volatile int consumerCommitLogFileNumber;
    private long lastCommittedId;
    private long lastTryTruncateTime;
    private long truncateIntervalNanos;
    private Memtable mm;
    @Nullable
    private Memtable imm;
    @Nullable
    private volatile CompletableFuture<Void> closeFuture;
    @Nullable
    private Thread safeCloseThread;

    FileBasedStorage(FileBasedStorageBuilder builder) throws StorageException {
        Objects.requireNonNull(builder, "builder");
        String storageBaseDir = builder.getStorageBaseDir();
        Path baseDirPath = Paths.get(storageBaseDir, new String[0]);
        if (Files.exists(baseDirPath, new LinkOption[0]) && !Files.isDirectory(baseDirPath, new LinkOption[0])) {
            throw new IllegalArgumentException("\"" + storageBaseDir + "\" must be a directory");
        }
        this.sstableWriterPool = builder.getFlushMemtableExecutorService();
        this.readRetryIntervalMillis = builder.getReadRetryIntervalMillis();
        this.mm = new Memtable();
        this.baseDir = storageBaseDir;
        this.lastCommittedId = Long.MIN_VALUE;
        this.tableCache = new TableCache(this.baseDir);
        this.manifest = new Manifest(this.baseDir);
        this.truncateIntervalNanos = TimeUnit.MILLISECONDS.toNanos(builder.getTruncateIntervalMillis());
        this.closeFuture = null;
        boolean initStorageSuccess = false;
        try {
            this.createStorageDir();
            this.storageLock = this.lockStorage(storageBaseDir);
            logger.debug("Start init storage under {}", (Object)storageBaseDir);
            ManifestRecord record = ManifestRecord.newPlainRecord();
            Path currentFilePath = Paths.get(storageBaseDir, FileName.getCurrentFileName());
            if (Files.exists(currentFilePath, new LinkOption[0])) {
                this.recoverStorage(currentFilePath, record);
            }
            if (this.dataLogWriter == null) {
                this.dataLogWriter = this.createNewDataLogWriter();
            }
            if (this.consumerCommitLogWriter == null) {
                this.consumerCommitLogWriter = this.createConsumerCommitLogWriter();
            }
            record.setDataLogFileNumber(this.dataLogFileNumber);
            record.setConsumerCommitLogFileNumber(this.consumerCommitLogFileNumber);
            this.manifest.logRecord(record);
            this.lastTryTruncateTime = System.nanoTime();
            initStorageSuccess = true;
        }
        catch (StorageException | IOException t) {
            throw new IllegalStateException("init storage failed", t);
        }
        catch (Exception ex) {
            throw new StorageException(ex);
        }
        finally {
            if (!initStorageSuccess) {
                this.blockSafeClose();
            }
        }
    }

    @Override
    public synchronized void commitId(long id) throws StorageException {
        if (this.closed()) {
            throw new IllegalStateException("storage is closed");
        }
        try {
            if (id > this.lastCommittedId) {
                assert (this.consumerCommitLogWriter != null);
                byte[] bs = new byte[8];
                Bits.putLong(bs, 0, id);
                this.consumerCommitLogWriter.append(bs);
                this.lastCommittedId = id;
                if (System.nanoTime() - this.lastTryTruncateTime > this.truncateIntervalNanos) {
                    this.tryTruncate();
                }
            }
        }
        catch (IOException ex) {
            throw new StorageException(ex);
        }
    }

    @Override
    public synchronized long getLastCommittedId() {
        return this.lastCommittedId;
    }

    @Override
    public List<ObjectWithId> fetch(long fromId, int limit) throws InterruptedException, StorageException {
        List<ObjectWithId> entries;
        if (this.closed()) {
            throw new IllegalStateException("storage is closed");
        }
        while ((entries = this.doFetch(fromId, limit)).isEmpty()) {
            Thread.sleep(this.readRetryIntervalMillis);
        }
        return Collections.unmodifiableList(entries);
    }

    @Override
    public List<ObjectWithId> fetch(long fromId, int limit, long timeout, TimeUnit unit) throws InterruptedException, StorageException {
        long remain;
        List<ObjectWithId> entries;
        if (this.closed()) {
            throw new IllegalStateException("storage is closed");
        }
        long end = System.nanoTime() + unit.toNanos(timeout);
        while ((entries = this.doFetch(fromId, limit)).isEmpty() && (remain = TimeUnit.NANOSECONDS.toMillis(end - System.nanoTime())) > 0L) {
            Thread.sleep(Math.min(remain, this.readRetryIntervalMillis));
        }
        return Collections.unmodifiableList(entries);
    }

    synchronized long getLastProducedId() {
        if (!this.mm.isEmpty()) {
            return this.mm.lastId();
        }
        if (this.imm != null && !this.imm.isEmpty()) {
            return this.imm.lastId();
        }
        return this.manifest.getLastId();
    }

    @Override
    public synchronized void store(List<byte[]> batch) throws StorageException {
        Objects.requireNonNull(batch, "batch");
        if (this.closed()) {
            throw new IllegalStateException("storage is closed");
        }
        if (batch.isEmpty()) {
            logger.warn("append with empty entries");
            return;
        }
        long id = this.getLastProducedId();
        try {
            for (byte[] bs : batch) {
                ObjectWithId e = new ObjectWithId(++id, bs);
                if (this.makeRoomForEntry(false)) {
                    assert (this.dataLogWriter != null);
                    this.dataLogWriter.append(this.encodeObjectWithId(e));
                    this.mm.add(e);
                    continue;
                }
                throw new StorageException("no more room to storage data");
            }
        }
        catch (IOException ex) {
            throw new StorageException("append log on file based storage failed", ex);
        }
    }

    @Override
    public void close() {
        this.close(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(boolean force) {
        CompletableFuture<Void> f = this.closeFuture;
        if (f == null) {
            FileBasedStorage fileBasedStorage = this;
            synchronized (fileBasedStorage) {
                f = this.closeFuture;
                if (f == null) {
                    this.closeFuture = f = new CompletableFuture();
                    PostClose task = new PostClose(f);
                    boolean shutdownNow = force;
                    if (!force) {
                        try {
                            this.sstableWriterPool.submit(task);
                        }
                        catch (RejectedExecutionException unused) {
                            shutdownNow = true;
                        }
                    }
                    if (shutdownNow) {
                        this.sstableWriterPool.shutdownNow();
                        task.run();
                    }
                }
            }
        }
        assert (f == this.closeFuture);
        f.join();
    }

    boolean closed() {
        return this.closeFuture != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void writeMemtable(Memtable immutableMemtable) {
        Object record;
        logger.debug("start write mem table in background");
        boolean writeMemtableSuccess = false;
        try {
            record = null;
            if (!immutableMemtable.isEmpty()) {
                assert (immutableMemtable.firstId() > this.manifest.getLastId());
                SSTableFileMetaInfo meta = this.writeMemTableToSSTable(immutableMemtable);
                record = ManifestRecord.newPlainRecord();
                ((ManifestRecord)record).addMeta(meta);
                ((ManifestRecord)record).setConsumerCommitLogFileNumber(this.consumerCommitLogFileNumber);
                ((ManifestRecord)record).setDataLogFileNumber(this.dataLogFileNumber);
                this.manifest.logRecord((ManifestRecord)record);
            }
            Set remainMetasFileNumberSet = this.manifest.searchMetas(Long.MIN_VALUE).stream().map(SSTableFileMetaInfo::getFileNumber).collect(Collectors.toSet());
            for (Integer fileNumber : this.tableCache.getAllFileNumbers()) {
                if (remainMetasFileNumberSet.contains(fileNumber)) continue;
                this.tableCache.evict(fileNumber);
            }
            this.deleteOutdatedFiles(this.baseDir, this.dataLogFileNumber, this.consumerCommitLogFileNumber, this.tableCache);
            writeMemtableSuccess = true;
            logger.debug("write mem table in background done with manifest record {}", record);
        }
        catch (Throwable t) {
            logger.error("write memtable in background failed", t);
        }
        finally {
            record = this;
            synchronized (record) {
                if (!writeMemtableSuccess) {
                    this.safeClose();
                } else {
                    assert (this.imm == immutableMemtable);
                    this.imm = null;
                }
                this.notifyAll();
            }
        }
    }

    private void createStorageDir() throws IOException {
        Path storageDirPath = Paths.get(this.baseDir, new String[0]);
        try {
            Files.createDirectories(storageDirPath, new FileAttribute[0]);
        }
        catch (FileAlreadyExistsException fileAlreadyExistsException) {
            // empty catch block
        }
    }

    private FileLock lockStorage(String baseDir) throws IOException {
        FileLock lock;
        Path lockFilePath = Paths.get(baseDir, FileName.getLockFileName());
        FileChannel lockChannel = FileChannel.open(lockFilePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        try {
            lock = lockChannel.tryLock();
            if (lock == null) {
                throw new IllegalStateException("failed to lock directory: " + baseDir);
            }
        }
        catch (IOException ex) {
            lockChannel.close();
            throw ex;
        }
        return lock;
    }

    private void releaseStorageLock() throws IOException {
        if (this.storageLock != null) {
            Channel channel = this.storageLock.acquiredBy();
            try {
                this.storageLock.release();
            }
            finally {
                if (channel.isOpen()) {
                    channel.close();
                }
            }
        }
    }

    private synchronized void recoverStorage(Path currentFilePath, ManifestRecord record) throws IOException, StorageException {
        assert (Files.exists(currentFilePath, new LinkOption[0]));
        String currentManifestFileName = new String(Files.readAllBytes(currentFilePath), StandardCharsets.UTF_8);
        if (currentManifestFileName.isEmpty()) {
            throw new StorageException("empty CURRENT file in storage dir: " + this.baseDir);
        }
        this.manifest.recover(currentManifestFileName);
        this.recoverFromDataLogFiles(record);
        this.recoverLastConsumerCommittedId();
    }

    private void recoverFromDataLogFiles(ManifestRecord record) throws IOException, StorageException {
        int dataLogFileNumber = this.manifest.getDataLogFileNumber();
        List<FileName.FileNameMeta> dataLogFileMetas = FileName.getFileNameMetas(this.baseDir, fileMeta -> fileMeta.getType() == FileName.FileType.Log && fileMeta.getFileNumber() >= dataLogFileNumber);
        for (int i = 0; i < dataLogFileMetas.size(); ++i) {
            FileName.FileNameMeta fileMeta2 = dataLogFileMetas.get(i);
            this.recoverMemtableFromDataLogFiles(fileMeta2.getFileNumber(), record, i == dataLogFileMetas.size() - 1);
        }
    }

    private void recoverMemtableFromDataLogFiles(int fileNumber, ManifestRecord record, boolean lastLogFile) throws IOException, StorageException {
        Path logFilePath = Paths.get(this.baseDir, FileName.getLogFileName(fileNumber));
        if (!Files.exists(logFilePath, new LinkOption[0])) {
            logger.warn("Log file {} was deleted. We can't recover memtable from it.", (Object)logFilePath);
            return;
        }
        FileChannel readLogChannel = FileChannel.open(logFilePath, StandardOpenOption.READ);
        boolean flushedNewTable = false;
        Memtable recoveredMm = null;
        try (LogReader reader = new LogReader(readLogChannel, true);){
            List<byte[]> logOpt;
            while (!(logOpt = reader.readLog()).isEmpty()) {
                ObjectWithId e = this.decodeObjectWithId(logOpt);
                if (recoveredMm == null) {
                    recoveredMm = new Memtable();
                }
                recoveredMm.add(e);
                if (recoveredMm.getMemoryUsedInBytes() <= 0x800000) continue;
                SSTableFileMetaInfo meta = this.writeMemTableToSSTable(recoveredMm);
                record.addMeta(meta);
                flushedNewTable = true;
                recoveredMm = null;
            }
            if (lastLogFile && !flushedNewTable) {
                assert (this.dataLogWriter == null);
                assert (this.dataLogFileNumber == 0);
                FileChannel logFile = FileChannel.open(logFilePath, StandardOpenOption.WRITE);
                this.dataLogWriter = new LogWriter(logFile, readLogChannel.position());
                this.dataLogFileNumber = fileNumber;
                if (recoveredMm != null) {
                    this.mm = recoveredMm;
                }
            } else if (recoveredMm != null) {
                SSTableFileMetaInfo meta = this.writeMemTableToSSTable(recoveredMm);
                record.addMeta(meta);
            }
        }
        catch (BadRecordException ex) {
            logger.warn("got \"{}\" record in log file:\"{}\". ", (Object)ex.getType(), (Object)logFilePath);
        }
    }

    private synchronized void recoverLastConsumerCommittedId() throws IOException, StorageException {
        FileName.FileNameMeta fileMeta2;
        int fileNumber = this.manifest.getConsumerCommittedIdLogFileNumber();
        List<FileName.FileNameMeta> consumerLogFileMetas = FileName.getFileNameMetas(this.baseDir, fileMeta -> fileMeta.getType() == FileName.FileType.ConsumerCommit && fileMeta.getFileNumber() >= fileNumber);
        for (int i = consumerLogFileMetas.size() - 1; i >= 0 && !this.recoverLastConsumerCommittedIdFromLogFile((fileMeta2 = consumerLogFileMetas.get(i)).getFileNumber()); --i) {
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private synchronized boolean recoverLastConsumerCommittedIdFromLogFile(int fileNumber) throws IOException, StorageException {
        Path logFilePath = Paths.get(this.baseDir, FileName.getConsumerCommittedIdFileName(fileNumber));
        if (!Files.exists(logFilePath, new LinkOption[0])) {
            logger.warn("Log file {} was deleted. We can't recover consumer committed id from it.", (Object)logFilePath);
            return false;
        }
        FileChannel ch = FileChannel.open(logFilePath, StandardOpenOption.READ);
        try (LogReader reader = new LogReader(ch, true);){
            List<byte[]> logOpt;
            long id = this.lastCommittedId;
            while (!(logOpt = reader.readLog()).isEmpty()) {
                id = Bits.getLong(this.compact(logOpt), 0);
            }
            long readEndPosition = ch.position();
            if (id <= this.lastCommittedId) return false;
            FileChannel logFile = FileChannel.open(logFilePath, StandardOpenOption.WRITE);
            assert (this.consumerCommitLogWriter == null);
            assert (this.consumerCommitLogFileNumber == 0);
            this.consumerCommitLogWriter = new LogWriter(logFile, readEndPosition);
            this.consumerCommitLogFileNumber = fileNumber;
            this.lastCommittedId = id;
            boolean bl = true;
            return bl;
        }
        catch (BadRecordException ex) {
            logger.warn("got \"{}\" record in data log file:\"{}\". ", (Object)ex.getType(), (Object)logFilePath);
        }
        return false;
    }

    private byte[] encodeObjectWithId(ObjectWithId obj) {
        ByteBuffer buffer = ByteBuffer.allocate(12 + obj.getObjectInBytes().length);
        buffer.putLong(obj.getId());
        buffer.putInt(obj.getObjectInBytes().length);
        buffer.put(obj.getObjectInBytes());
        return buffer.array();
    }

    private ObjectWithId decodeObjectWithId(List<byte[]> bytes) {
        ByteBuffer buffer = ByteBuffer.wrap(this.compact(bytes));
        long id = buffer.getLong();
        int length = buffer.getInt();
        byte[] bs = new byte[length];
        buffer.get(bs);
        return new ObjectWithId(id, bs);
    }

    private byte[] compact(List<byte[]> output) {
        int size = output.stream().mapToInt(b -> ((byte[])b).length).sum();
        ByteBuffer buffer = ByteBuffer.allocate(size);
        for (byte[] bytes : output) {
            buffer.put(bytes);
        }
        return buffer.array();
    }

    private synchronized List<ObjectWithId> doFetch(long fromId, int limit) throws StorageException {
        if (!this.mm.isEmpty() && fromId >= this.mm.firstId()) {
            return this.mm.getEntries(fromId, limit);
        }
        try {
            Itr itr;
            if (this.imm != null && fromId >= this.imm.firstId()) {
                List<SeekableIterator<Long, ObjectWithId>> itrs = Arrays.asList(this.imm.iterator(), this.mm.iterator());
                for (SeekableIterator seekableIterator : itrs) {
                    seekableIterator.seek(fromId);
                }
                itr = new Itr(itrs);
            } else {
                itr = this.internalIterator(fromId);
            }
            ArrayList<ObjectWithId> ret = new ArrayList<ObjectWithId>();
            while (itr.hasNext()) {
                ObjectWithId e = itr.next();
                if (ret.size() >= limit) break;
                ret.add(e);
            }
            return ret;
        }
        catch (StorageRuntimeException ex) {
            throw new StorageException(ex.getMessage(), ex.getCause());
        }
    }

    private synchronized boolean makeRoomForEntry(boolean force) throws IOException, StorageException {
        try {
            boolean forceRun = force;
            while (true) {
                if (this.closed()) {
                    return false;
                }
                if (!forceRun && this.mm.getMemoryUsedInBytes() <= 0x800000) break;
                if (this.imm != null) {
                    this.wait();
                    continue;
                }
                forceRun = false;
                this.makeRoomForEntry0();
            }
            return true;
        }
        catch (InterruptedException t) {
            throw new StorageException("thread was interrupted when waiting room for new entry");
        }
    }

    private void makeRoomForEntry0() throws IOException, StorageException {
        LogWriter logWriter = this.createNewDataLogWriter();
        if (this.dataLogWriter != null) {
            this.dataLogWriter.close();
        }
        this.dataLogWriter = logWriter;
        this.imm = this.mm;
        this.mm = new Memtable();
        logger.debug("Trigger compaction, new log file number={}", (Object)this.dataLogFileNumber);
        try {
            this.sstableWriterPool.submit(() -> this.writeMemtable(this.imm));
        }
        catch (RejectedExecutionException ex) {
            throw new StorageException("flush memtable task was rejected", ex);
        }
    }

    private LogWriter createNewDataLogWriter() throws IOException {
        int nextLogFileNumber = this.manifest.getNextFileNumber();
        String nextLogFile = FileName.getLogFileName(nextLogFileNumber);
        FileChannel logFile = FileChannel.open(Paths.get(this.baseDir, nextLogFile), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        LogWriter writer = new LogWriter(logFile);
        this.dataLogFileNumber = nextLogFileNumber;
        return writer;
    }

    private LogWriter createConsumerCommitLogWriter() throws IOException {
        int nextLogFileNumber = this.manifest.getNextFileNumber();
        String nextLogFile = FileName.getConsumerCommittedIdFileName(nextLogFileNumber);
        FileChannel logFile = FileChannel.open(Paths.get(this.baseDir, nextLogFile), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        LogWriter writer = new LogWriter(logFile);
        this.consumerCommitLogFileNumber = nextLogFileNumber;
        return writer;
    }

    private SSTableFileMetaInfo writeMemTableToSSTable(Memtable mm3) throws IOException {
        SSTableFileMetaInfo meta = new SSTableFileMetaInfo();
        int fileNumber = this.manifest.getNextFileNumber();
        meta.setFileNumber(fileNumber);
        meta.setFirstId(mm3.firstId());
        meta.setLastId(mm3.lastId());
        String tableFileName = FileName.getSSTableName(fileNumber);
        Path tableFile = Paths.get(this.baseDir, tableFileName);
        try (FileChannel ch = FileChannel.open(tableFile, StandardOpenOption.CREATE, StandardOpenOption.WRITE);){
            TableBuilder tableBuilder = new TableBuilder(ch);
            for (ObjectWithId entry : mm3) {
                byte[] data = entry.getObjectInBytes();
                tableBuilder.add(entry.getId(), data);
            }
            long tableFileSize = tableBuilder.finishBuild();
            if (tableFileSize > 0L) {
                meta.setFileSize(tableFileSize);
                ch.force(true);
            }
        }
        this.tableCache.loadTable(fileNumber, meta.getFileSize());
        if (meta.getFileSize() <= 0L) {
            Files.deleteIfExists(tableFile);
        }
        return meta;
    }

    private Itr internalIterator(long start) throws StorageException {
        List<SeekableIterator<Long, ObjectWithId>> itrs = this.getSSTableIterators(start);
        if (this.imm != null) {
            itrs.add(this.imm.iterator().seek(start));
        }
        itrs.add(this.mm.iterator().seek(start));
        for (SeekableIterator<Long, ObjectWithId> itr : itrs) {
            itr.seek(start);
            if (!itr.hasNext()) continue;
            break;
        }
        return new Itr(itrs);
    }

    private List<SeekableIterator<Long, ObjectWithId>> getSSTableIterators(long start) throws StorageException {
        try {
            List<SSTableFileMetaInfo> metas = this.manifest.searchMetas(start);
            ArrayList<SeekableIterator<Long, ObjectWithId>> ret = new ArrayList<SeekableIterator<Long, ObjectWithId>>(metas.size());
            for (SSTableFileMetaInfo meta : metas) {
                ret.add(this.tableCache.iterator(meta.getFileNumber(), meta.getFileSize()));
            }
            return ret;
        }
        catch (IOException ex) {
            throw new StorageException(String.format("get sstable iterators start: %s from SSTable failed", start), ex);
        }
    }

    private synchronized void tryTruncate() {
        try {
            long lastCommittedId = this.getLastCommittedId();
            long truncateId = Math.max(0L, lastCommittedId);
            this.manifest.truncateToId(truncateId);
            this.lastTryTruncateTime = System.nanoTime();
        }
        catch (Exception ex) {
            logger.error("Truncate handler failed for entry", ex);
        }
    }

    private synchronized void safeClose() {
        if (this.safeCloseThread == null) {
            Thread t = safeCloseThreadFactory.newThread(() -> {
                try {
                    this.close();
                }
                catch (Exception ex) {
                    logger.error("Close storage under directory: {} failed", (Object)this.baseDir, (Object)ex);
                }
            });
            t.setDaemon(true);
            t.start();
            this.safeCloseThread = t;
        }
    }

    private void blockSafeClose() {
        this.safeClose();
        try {
            assert (this.safeCloseThread != null);
            this.safeCloseThread.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void deleteOutdatedFiles(String baseDir, int dataLogFileNumber, int consumerCommittedIdLogFileNumber, TableCache tableCache) {
        List<Path> outdatedFilePaths = FileBasedStorage.getOutdatedFiles(baseDir, dataLogFileNumber, consumerCommittedIdLogFileNumber, tableCache);
        try {
            for (Path path : outdatedFilePaths) {
                Files.deleteIfExists(path);
            }
        }
        catch (IOException t) {
            logger.error("delete outdated files:{} failed", (Object)outdatedFilePaths, (Object)t);
        }
    }

    private static List<Path> getOutdatedFiles(String baseDir, int dataLogFileNumber, int consumerCommittedIdLogFileNumber, TableCache tableCache) {
        File dirFile = new File(baseDir);
        File[] files = dirFile.listFiles();
        if (files != null) {
            return Arrays.stream(files).filter(File::isFile).map(File::getName).map(FileName::parseFileName).filter(meta -> {
                switch (meta.getType()) {
                    case ConsumerCommit: {
                        return meta.getFileNumber() < consumerCommittedIdLogFileNumber;
                    }
                    case Log: {
                        return meta.getFileNumber() < dataLogFileNumber;
                    }
                    case SSTable: {
                        return !tableCache.hasTable(meta.getFileNumber());
                    }
                }
                return false;
            }).map(meta -> Paths.get(baseDir, meta.getFileName())).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    private class PostClose
    implements Runnable {
        private final CompletableFuture<Void> closeFuture;

        private PostClose(CompletableFuture<Void> closeFuture) {
            this.closeFuture = closeFuture;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            FileBasedStorage fileBasedStorage = FileBasedStorage.this;
            synchronized (fileBasedStorage) {
                try {
                    FileBasedStorage.this.sstableWriterPool.shutdown();
                    if (FileBasedStorage.this.dataLogWriter != null) {
                        FileBasedStorage.this.dataLogWriter.close();
                    }
                    if (FileBasedStorage.this.consumerCommitLogWriter != null) {
                        FileBasedStorage.this.consumerCommitLogWriter.close();
                    }
                    FileBasedStorage.this.manifest.close();
                    FileBasedStorage.this.tableCache.evictAll();
                    FileBasedStorage.this.releaseStorageLock();
                    logger.debug("File based storage shutdown successfully");
                    this.closeFuture.complete(null);
                }
                catch (Exception ex) {
                    this.closeFuture.completeExceptionally(ex);
                }
            }
        }
    }

    private static class Itr
    implements Iterator<ObjectWithId> {
        private final List<SeekableIterator<Long, ObjectWithId>> iterators;
        private int lastItrIndex;

        Itr(List<SeekableIterator<Long, ObjectWithId>> iterators) {
            this.iterators = iterators;
        }

        @Override
        public boolean hasNext() {
            for (int i = this.lastItrIndex; i < this.iterators.size(); ++i) {
                SeekableIterator<Long, ObjectWithId> itr = this.iterators.get(i);
                if (!itr.hasNext()) continue;
                this.lastItrIndex = i;
                return true;
            }
            this.lastItrIndex = this.iterators.size();
            return false;
        }

        @Override
        public ObjectWithId next() {
            assert (this.lastItrIndex >= 0 && this.lastItrIndex < this.iterators.size());
            return (ObjectWithId)this.iterators.get(this.lastItrIndex).next();
        }
    }
}

