/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.procedure2.store.wal;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile;
import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFormat;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALProcedureStore
implements ProcedureStore {
    private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
    private static final int MAX_RETRIES_BEFORE_ABORT = 3;
    private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
    private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
    private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
    private static final boolean DEFAULT_USE_HSYNC = true;
    private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
    private static final long DEFAULT_ROLL_THRESHOLD = 0x2000000L;
    private final CopyOnWriteArrayList<ProcedureStore.ProcedureStoreListener> listeners = new CopyOnWriteArrayList();
    private final LinkedList<ProcedureWALFile> logs = new LinkedList();
    private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition waitCond = this.lock.newCondition();
    private final Condition slotCond = this.lock.newCondition();
    private final Condition syncCond = this.lock.newCondition();
    private final LeaseRecovery leaseRecovery;
    private final Configuration conf;
    private final FileSystem fs;
    private final Path logDir;
    private AtomicBoolean inSync = new AtomicBoolean(false);
    private LinkedTransferQueue<ByteSlot> slotsCache = null;
    private Set<ProcedureWALFile> corruptedLogs = null;
    private AtomicLong totalSynced = new AtomicLong(0L);
    private FSDataOutputStream stream = null;
    private long lastRollTs = 0L;
    private long flushLogId = 0L;
    private int slotIndex = 0;
    private Thread syncThread;
    private ByteSlot[] slots;
    private long rollThreshold;
    private boolean useHsync;
    private int syncWaitMsec;

    public WALProcedureStore(Configuration conf, FileSystem fs, Path logDir, LeaseRecovery leaseRecovery) {
        this.fs = fs;
        this.conf = conf;
        this.logDir = logDir;
        this.leaseRecovery = leaseRecovery;
    }

    @Override
    public void start(int numSlots) throws IOException {
        if (this.running.getAndSet(true)) {
            return;
        }
        this.slots = new ByteSlot[numSlots];
        this.slotsCache = new LinkedTransferQueue();
        while (this.slotsCache.size() < numSlots) {
            this.slotsCache.offer(new ByteSlot());
        }
        this.rollThreshold = this.conf.getLong(ROLL_THRESHOLD_CONF_KEY, 0x2000000L);
        this.syncWaitMsec = this.conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, 100);
        this.useHsync = this.conf.getBoolean(USE_HSYNC_CONF_KEY, true);
        this.syncThread = new Thread("WALProcedureStoreSyncThread"){

            @Override
            public void run() {
                while (WALProcedureStore.this.running.get()) {
                    try {
                        WALProcedureStore.this.syncLoop();
                    }
                    catch (IOException e) {
                        LOG.error("got an exception from the sync-loop", e);
                        WALProcedureStore.this.sendAbortProcessSignal();
                    }
                }
            }
        };
        this.syncThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop(boolean abort) {
        if (!this.running.getAndSet(false)) {
            return;
        }
        LOG.info("Stopping the WAL Procedure Store");
        if (this.lock.tryLock()) {
            try {
                this.waitCond.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }
        if (!abort) {
            try {
                this.syncThread.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.closeStream();
        for (ProcedureWALFile log : this.logs) {
            log.close();
        }
        this.logs.clear();
    }

    @Override
    public boolean isRunning() {
        return this.running.get();
    }

    @Override
    public int getNumThreads() {
        return this.slots == null ? 0 : this.slots.length;
    }

    public ProcedureStoreTracker getStoreTracker() {
        return this.storeTracker;
    }

    @Override
    public void registerListener(ProcedureStore.ProcedureStoreListener listener) {
        this.listeners.add(listener);
    }

    @Override
    public boolean unregisterListener(ProcedureStore.ProcedureStoreListener listener) {
        return this.listeners.remove(listener);
    }

    @Override
    public void recoverLease() throws IOException {
        LOG.info("Starting WAL Procedure Store lease recovery");
        FileStatus[] oldLogs = this.getLogFiles();
        while (this.running.get()) {
            this.flushLogId = this.initOldLogs(oldLogs) + 1L;
            if (!this.rollWriter(this.flushLogId)) {
                LOG.debug("someone else has already created log " + this.flushLogId);
                continue;
            }
            oldLogs = this.getLogFiles();
            if (this.getMaxLogId(oldLogs) > this.flushLogId) {
                LOG.debug("someone else created new logs. expected maxLogId < " + this.flushLogId);
                this.logs.getLast().removeFile();
                continue;
            }
            LOG.info("lease acquired flushLogId=" + this.flushLogId);
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Iterator<Procedure> load() throws IOException {
        if (this.logs.isEmpty()) {
            throw new RuntimeException("recoverLease() must be called before loading data");
        }
        if (this.logs.size() == 1) {
            LOG.debug("No state logs to replay");
            return null;
        }
        final ArrayList toRemove = new ArrayList();
        Iterator<ProcedureWALFile> it = this.logs.descendingIterator();
        it.next();
        try {
            Iterator<Procedure> iterator = ProcedureWALFormat.load(it, this.storeTracker, new ProcedureWALFormat.Loader(){

                @Override
                public void removeLog(ProcedureWALFile log) {
                    toRemove.add(log);
                }

                @Override
                public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
                    if (WALProcedureStore.this.corruptedLogs == null) {
                        WALProcedureStore.this.corruptedLogs = new HashSet();
                    }
                    WALProcedureStore.this.corruptedLogs.add(log);
                }
            });
            return iterator;
        }
        finally {
            if (!toRemove.isEmpty()) {
                for (ProcedureWALFile log : toRemove) {
                    this.removeLogFile(log);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void insert(Procedure proc, Procedure[] subprocs) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs));
        }
        ByteSlot slot = this.acquireSlot();
        long logId = -1L;
        try {
            if (subprocs != null) {
                ProcedureWALFormat.writeInsert(slot, proc, subprocs);
            } else {
                assert (!proc.hasParent());
                ProcedureWALFormat.writeInsert(slot, proc);
            }
            logId = this.pushData(slot);
        }
        catch (IOException e) {
            LOG.fatal("Unable to serialize one of the procedure: proc=" + proc + " subprocs=" + Arrays.toString(subprocs), e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
        ProcedureStoreTracker procedureStoreTracker = this.storeTracker;
        synchronized (procedureStoreTracker) {
            this.storeTracker.insert(proc, subprocs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void update(Procedure proc) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("update " + proc);
        }
        ByteSlot slot = this.acquireSlot();
        long logId = -1L;
        try {
            ProcedureWALFormat.writeUpdate(slot, proc);
            logId = this.pushData(slot);
        }
        catch (IOException e) {
            LOG.fatal("Unable to serialize the procedure: " + proc, e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
        boolean removeOldLogs = false;
        ProcedureStoreTracker procedureStoreTracker = this.storeTracker;
        synchronized (procedureStoreTracker) {
            this.storeTracker.update(proc);
            if (logId == this.flushLogId) {
                removeOldLogs = this.storeTracker.isUpdated();
            }
        }
        if (removeOldLogs) {
            this.removeAllLogs(logId - 1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void delete(long procId) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("delete " + procId);
        }
        ByteSlot slot = this.acquireSlot();
        long logId = -1L;
        try {
            ProcedureWALFormat.writeDelete(slot, procId);
            logId = this.pushData(slot);
        }
        catch (IOException e) {
            LOG.fatal("Unable to serialize the procedure: " + procId, e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
        boolean removeOldLogs = false;
        ProcedureStoreTracker procedureStoreTracker = this.storeTracker;
        synchronized (procedureStoreTracker) {
            this.storeTracker.delete(procId);
            if (logId == this.flushLogId && this.storeTracker.isEmpty() && this.totalSynced.get() > this.rollThreshold) {
                removeOldLogs = this.rollWriterOrDie(logId + 1L);
            }
        }
        if (removeOldLogs) {
            this.removeAllLogs(logId);
        }
    }

    private ByteSlot acquireSlot() {
        ByteSlot slot = this.slotsCache.poll();
        return slot != null ? slot : new ByteSlot();
    }

    private void releaseSlot(ByteSlot slot) {
        slot.reset();
        this.slotsCache.offer(slot);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long pushData(ByteSlot slot) {
        assert (this.isRunning() && !this.logs.isEmpty()) : "recoverLease() must be called before inserting data";
        long logId = -1L;
        this.lock.lock();
        try {
            while (true) {
                if (this.inSync.get()) {
                    this.syncCond.await();
                    continue;
                }
                if (this.slotIndex != this.slots.length) break;
                this.slotCond.signal();
                this.syncCond.await();
            }
            this.slots[this.slotIndex++] = slot;
            logId = this.flushLogId;
            if (this.slotIndex == 1) {
                this.waitCond.signal();
            }
            if (this.slotIndex == this.slots.length) {
                this.waitCond.signal();
                this.slotCond.signal();
            }
            this.syncCond.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.sendAbortProcessSignal();
        }
        finally {
            this.lock.unlock();
        }
        return logId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncLoop() throws IOException {
        this.inSync.set(false);
        while (this.running.get()) {
            this.lock.lock();
            try {
                if (this.slotIndex == 0) {
                    if (LOG.isTraceEnabled()) {
                        float rollTsSec = (float)(System.currentTimeMillis() - this.lastRollTs) / 1000.0f;
                        LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", StringUtils.humanSize(this.totalSynced.get()), StringUtils.humanSize((float)this.totalSynced.get() / rollTsSec)));
                    }
                    this.waitCond.await();
                    if (this.slotIndex == 0) continue;
                }
                long syncWaitSt = System.currentTimeMillis();
                if (this.slotIndex != this.slots.length) {
                    this.slotCond.await(this.syncWaitMsec, TimeUnit.MILLISECONDS);
                }
                long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
                if (LOG.isTraceEnabled() && (syncWaitMs > 10L || this.slotIndex < this.slots.length)) {
                    float rollSec = (float)(System.currentTimeMillis() - this.lastRollTs) / 1000.0f;
                    LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) + " slotIndex=" + this.slotIndex + " totalSynced=" + StringUtils.humanSize(this.totalSynced.get()) + " " + StringUtils.humanSize((float)this.totalSynced.get() / rollSec) + "/sec");
                }
                this.inSync.set(true);
                this.totalSynced.addAndGet(this.syncSlots());
                this.slotIndex = 0;
                this.inSync.set(false);
                this.syncCond.signalAll();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.sendAbortProcessSignal();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private long syncSlots() {
        int retry = 0;
        long totalSynced = 0L;
        while (true) {
            try {
                totalSynced = this.syncSlots(this.stream, this.slots, 0, this.slotIndex);
            }
            catch (Throwable e) {
                if (++retry != 3) continue;
                LOG.error("sync slot failed, abort.", e);
                this.sendAbortProcessSignal();
                if (this.running.get()) continue;
            }
            break;
        }
        return totalSynced;
    }

    protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) throws IOException {
        long totalSynced = 0L;
        for (int i = 0; i < count; ++i) {
            ByteSlot data = slots[offset + i];
            data.writeTo(stream);
            totalSynced += (long)data.size();
        }
        if (this.useHsync) {
            stream.hsync();
        } else {
            stream.hflush();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Sync slots=" + count + '/' + slots.length + " flushed=" + StringUtils.humanSize(totalSynced));
        }
        return totalSynced;
    }

    private void sendAbortProcessSignal() {
        if (!this.listeners.isEmpty()) {
            for (ProcedureStore.ProcedureStoreListener listener : this.listeners) {
                listener.abortProcess();
            }
        }
    }

    private boolean rollWriterOrDie(long logId) {
        try {
            return this.rollWriter(logId);
        }
        catch (IOException e) {
            LOG.warn("Unable to roll the log", e);
            this.sendAbortProcessSignal();
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean rollWriter(long logId) throws IOException {
        ProcedureProtos.ProcedureWALHeader header = ProcedureProtos.ProcedureWALHeader.newBuilder().setVersion(1).setType(0).setMinProcId(this.storeTracker.getMinProcId()).setLogId(logId).build();
        FSDataOutputStream newStream = null;
        Path newLogFile = null;
        long startPos = -1L;
        try {
            newLogFile = this.getLogFilePath(logId);
            newStream = this.fs.create(newLogFile, false);
            ProcedureWALFormat.writeHeader(newStream, header);
            startPos = newStream.getPos();
        }
        catch (FileAlreadyExistsException e) {
            LOG.error("Log file with id=" + logId + " already exists", e);
            return false;
        }
        this.lock.lock();
        try {
            this.closeStream();
            ProcedureStoreTracker procedureStoreTracker = this.storeTracker;
            synchronized (procedureStoreTracker) {
                this.storeTracker.resetUpdates();
            }
            this.stream = newStream;
            this.flushLogId = logId;
            this.totalSynced.set(0L);
            this.lastRollTs = System.currentTimeMillis();
            this.logs.add(new ProcedureWALFile(this.fs, newLogFile, header, startPos));
        }
        finally {
            this.lock.unlock();
        }
        LOG.info("Roll new state log: " + logId);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeStream() {
        block7: {
            try {
                if (this.stream == null) break block7;
                try {
                    ProcedureWALFormat.writeTrailer(this.stream, this.storeTracker);
                }
                catch (IOException e) {
                    LOG.warn("Unable to write the trailer: " + e.getMessage());
                }
                this.stream.close();
            }
            catch (IOException e) {
                LOG.error("Unable to close the stream", e);
            }
            finally {
                this.stream = null;
            }
        }
    }

    private void removeAllLogs(long lastLogId) {
        ProcedureWALFile log;
        LOG.info("Remove all state logs with ID less then " + lastLogId);
        while (!this.logs.isEmpty() && lastLogId >= (log = this.logs.getFirst()).getLogId()) {
            this.removeLogFile(log);
        }
    }

    private boolean removeLogFile(ProcedureWALFile log) {
        try {
            LOG.debug("remove log: " + log);
            log.removeFile();
            this.logs.remove(log);
        }
        catch (IOException e) {
            LOG.error("unable to remove log " + log, e);
            return false;
        }
        return true;
    }

    public Set<ProcedureWALFile> getCorruptedLogs() {
        return this.corruptedLogs;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFileSystem() {
        return this.fs;
    }

    protected Path getLogFilePath(long logId) throws IOException {
        return new Path(this.logDir, String.format("state-%020d.log", logId));
    }

    private static long getLogIdFromName(String name) {
        int start;
        int end = name.lastIndexOf(".log");
        for (start = name.lastIndexOf(45) + 1; start < end && name.charAt(start) == '0'; ++start) {
        }
        return Long.parseLong(name.substring(start, end));
    }

    private FileStatus[] getLogFiles() throws IOException {
        try {
            return this.fs.listStatus(this.logDir, new PathFilter(){

                @Override
                public boolean accept(Path path) {
                    String name = path.getName();
                    return name.startsWith("state-") && name.endsWith(".log");
                }
            });
        }
        catch (FileNotFoundException e) {
            LOG.warn("log directory not found: " + e.getMessage());
            return null;
        }
    }

    private long getMaxLogId(FileStatus[] logFiles) {
        long maxLogId = 0L;
        if (logFiles != null && logFiles.length > 0) {
            for (int i = 0; i < logFiles.length; ++i) {
                maxLogId = Math.max(maxLogId, WALProcedureStore.getLogIdFromName(logFiles[i].getPath().getName()));
            }
        }
        return maxLogId;
    }

    private long initOldLogs(FileStatus[] logFiles) throws IOException {
        this.logs.clear();
        long maxLogId = 0L;
        if (logFiles != null && logFiles.length > 0) {
            for (int i = 0; i < logFiles.length; ++i) {
                Path logPath = logFiles[i].getPath();
                this.leaseRecovery.recoverFileLease(this.fs, logPath);
                maxLogId = Math.max(maxLogId, WALProcedureStore.getLogIdFromName(logPath.getName()));
                ProcedureWALFile log = this.initOldLog(logFiles[i]);
                if (log == null) continue;
                this.logs.add(log);
            }
            Collections.sort(this.logs);
            this.initTrackerFromOldLogs();
        }
        return maxLogId;
    }

    private void initTrackerFromOldLogs() {
        if (!this.logs.isEmpty()) {
            ProcedureWALFile log = this.logs.getLast();
            try {
                log.readTracker(this.storeTracker);
            }
            catch (IOException e) {
                LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
                this.storeTracker.clear();
                this.storeTracker.setPartialFlag(true);
            }
        }
    }

    private ProcedureWALFile initOldLog(FileStatus logFile) throws IOException {
        ProcedureWALFile log = new ProcedureWALFile(this.fs, logFile);
        if (logFile.getLen() == 0L) {
            LOG.warn("Remove uninitialized log " + logFile);
            log.removeFile();
            return null;
        }
        LOG.debug("opening state-log: " + logFile);
        try {
            log.open();
        }
        catch (ProcedureWALFormat.InvalidWALDataException e) {
            LOG.warn("Remove uninitialized log " + logFile, e);
            log.removeFile();
            return null;
        }
        catch (IOException e) {
            String msg = "Unable to read state log: " + logFile;
            LOG.error(msg, e);
            throw new IOException(msg, e);
        }
        if (log.isCompacted()) {
            try {
                log.readTrailer();
            }
            catch (IOException e) {
                LOG.warn("Unfinished compacted log " + logFile, e);
                log.removeFile();
                return null;
            }
        }
        return log;
    }

    public static interface LeaseRecovery {
        public void recoverFileLease(FileSystem var1, Path var2) throws IOException;
    }
}

