/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir;

import com.github.ylgrgyq.reservoir.Codec;
import com.github.ylgrgyq.reservoir.DeserializationException;
import com.github.ylgrgyq.reservoir.ObjectQueueBuilder;
import com.github.ylgrgyq.reservoir.ObjectQueueConsumer;
import com.github.ylgrgyq.reservoir.ObjectQueueStorage;
import com.github.ylgrgyq.reservoir.SerializedObjectWithId;
import com.github.ylgrgyq.reservoir.StorageException;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;

final class AutoCommitObjectQueueConsumer<E, S>
implements ObjectQueueConsumer<E> {
    private final ObjectQueueStorage<S> storage;
    private final BlockingQueue<E> queue;
    private final int batchSize;
    private final ReentrantLock lock;
    private final Codec<E, S> deserializer;
    private long lastCommittedId;
    private volatile boolean closed;

    AutoCommitObjectQueueConsumer(ObjectQueueBuilder<E, S> builder) throws StorageException {
        Objects.requireNonNull(builder, "builder");
        this.storage = builder.getStorage();
        this.batchSize = builder.getConsumerFetchBatchSize();
        this.queue = new ArrayBlockingQueue(2 * this.batchSize);
        this.lock = new ReentrantLock();
        this.lastCommittedId = this.storage.getLastCommittedId();
        this.deserializer = builder.getCodec();
    }

    @Override
    public E fetch() throws InterruptedException, StorageException {
        Object obj;
        while ((obj = this.queue.poll()) == null) {
            this.blockFetchFromStorage(0L, TimeUnit.NANOSECONDS);
        }
        return obj;
    }

    @Override
    @Nullable
    public E fetch(long timeout, TimeUnit unit) throws InterruptedException, StorageException {
        Object obj;
        Objects.requireNonNull(unit);
        while ((obj = this.queue.poll()) == null && this.blockFetchFromStorage(timeout, unit)) {
        }
        return obj;
    }

    @Override
    public void commit() {
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    @Override
    public void close() throws Exception {
        this.closed = true;
        this.lock.lock();
        try {
            this.storage.close();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean blockFetchFromStorage(long timeout, TimeUnit unit) throws InterruptedException, StorageException {
        this.lock.lock();
        try {
            if (this.closed) {
                throw new InterruptedException("consumer closed");
            }
            if (!this.queue.isEmpty()) {
                boolean bl = true;
                return bl;
            }
            long lastId = this.lastCommittedId;
            List<SerializedObjectWithId<S>> payloads = timeout == 0L ? this.storage.fetch(lastId, this.batchSize) : this.storage.fetch(lastId, this.batchSize, timeout, unit);
            if (!payloads.isEmpty()) {
                for (SerializedObjectWithId<S> p : payloads) {
                    S serializeP = p.getSerializedObject();
                    try {
                        E pObj = this.deserializer.deserialize(serializeP);
                        this.queue.put(pObj);
                    }
                    catch (InterruptedException ex) {
                        throw ex;
                    }
                    catch (Exception ex) {
                        String msg = "deserialize object with id: " + p.getId() + " failed. Content is: " + (serializeP instanceof byte[] ? Base64.getEncoder().encodeToString((byte[])serializeP) + " (Base64)" : serializeP);
                        throw new DeserializationException(msg, ex);
                    }
                    lastId = p.getId();
                }
                this.storage.commitId(lastId);
                this.lastCommittedId = lastId;
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }
}

