/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir;

import com.github.ylgrgyq.reservoir.Codec;
import com.github.ylgrgyq.reservoir.DeserializationException;
import com.github.ylgrgyq.reservoir.ObjectQueueBuilder;
import com.github.ylgrgyq.reservoir.ObjectQueueConsumer;
import com.github.ylgrgyq.reservoir.ObjectQueueStorage;
import com.github.ylgrgyq.reservoir.SerializedObjectWithId;
import com.github.ylgrgyq.reservoir.StorageException;
import java.util.Base64;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;

final class ManualCommitObjectQueueConsumer<E, S>
implements ObjectQueueConsumer<E> {
    private final ObjectQueueStorage<S> storage;
    private final BlockingQueue<DeserializedObjectWithId<E>> queue;
    private final ReentrantLock lock;
    private final int batchSize;
    private final Codec<E, S> deserializer;
    private long lastCommittedId;
    private volatile boolean closed;

    ManualCommitObjectQueueConsumer(ObjectQueueBuilder<E, S> builder) throws StorageException {
        Objects.requireNonNull(builder, "builder");
        this.storage = builder.getStorage();
        this.batchSize = builder.getConsumerFetchBatchSize();
        this.queue = new ArrayBlockingQueue<DeserializedObjectWithId<E>>(2 * this.batchSize);
        this.lastCommittedId = this.storage.getLastCommittedId();
        this.deserializer = builder.getCodec();
        this.lock = new ReentrantLock();
    }

    @Override
    public E fetch() throws InterruptedException, StorageException {
        DeserializedObjectWithId payload;
        while ((payload = (DeserializedObjectWithId)this.queue.peek()) == null) {
            this.blockFetchFromStorage(0L, TimeUnit.NANOSECONDS);
        }
        return (E)payload.object;
    }

    @Override
    @Nullable
    public E fetch(long timeout, TimeUnit unit) throws InterruptedException, StorageException {
        DeserializedObjectWithId payload;
        Objects.requireNonNull(unit);
        while ((payload = (DeserializedObjectWithId)this.queue.peek()) == null && this.blockFetchFromStorage(timeout, unit)) {
        }
        return (E)(payload == null ? null : payload.object);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws StorageException {
        DeserializedObjectWithId payload = (DeserializedObjectWithId)this.queue.poll();
        if (payload == null) {
            throw new NoSuchElementException();
        }
        this.lock.lock();
        try {
            long id = payload.id;
            if (id > this.lastCommittedId) {
                this.lastCommittedId = id;
                this.storage.commitId(id);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    @Override
    public void close() throws Exception {
        this.closed = true;
        this.lock.lock();
        try {
            this.storage.close();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean blockFetchFromStorage(long timeout, TimeUnit unit) throws InterruptedException, StorageException {
        this.lock.lock();
        try {
            if (this.closed) {
                throw new InterruptedException("consumer closed");
            }
            if (!this.queue.isEmpty()) {
                boolean bl = true;
                return bl;
            }
            long lastId = this.lastCommittedId;
            List<SerializedObjectWithId<S>> payloads = timeout == 0L ? this.storage.fetch(lastId, this.batchSize) : this.storage.fetch(lastId, this.batchSize, timeout, unit);
            for (SerializedObjectWithId<S> p : payloads) {
                S serializeP = p.getSerializedObject();
                try {
                    E pObj = this.deserializer.deserialize(serializeP);
                    this.queue.put(new DeserializedObjectWithId<E>(p.getId(), pObj));
                }
                catch (InterruptedException ex) {
                    throw ex;
                }
                catch (Exception ex) {
                    String msg = "deserialize object with id: " + p.getId() + " failed. Content is: " + (serializeP instanceof byte[] ? Base64.getEncoder().encodeToString((byte[])serializeP) + " (Base64)" : serializeP);
                    throw new DeserializationException(msg, ex);
                }
            }
            boolean bl = !payloads.isEmpty();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    private static final class DeserializedObjectWithId<E> {
        private final E object;
        private final long id;

        DeserializedObjectWithId(long id, E object) {
            this.object = object;
            this.id = id;
        }
    }
}

