/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir;

import com.github.ylgrgyq.reservoir.Codec;
import com.github.ylgrgyq.reservoir.LogExceptionHandler;
import com.github.ylgrgyq.reservoir.NamedThreadFactory;
import com.github.ylgrgyq.reservoir.ObjectQueueBuilder;
import com.github.ylgrgyq.reservoir.ObjectQueueProducer;
import com.github.ylgrgyq.reservoir.ObjectQueueStorage;
import com.github.ylgrgyq.reservoir.SerializationException;
import com.github.ylgrgyq.reservoir.StorageException;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DisruptorBackedObjectQueueProducer<E, S>
implements ObjectQueueProducer<E> {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorBackedObjectQueueProducer.class);
    private static final ThreadFactory producerWorkerFactory = new NamedThreadFactory("reservoir-object-queue-producer-worker-");
    private final ObjectQueueStorage<S> storage;
    private final Disruptor<ProducerEvent<S>> disruptor;
    private final RingBuffer<ProducerEvent<S>> ringBuffer;
    private final EventTranslatorThreeArg<ProducerEvent<S>, S, CompletableFuture<Void>, Boolean> translator;
    private final ExecutorService executor;
    private final boolean shutdownExecutor;
    private final Codec<E, S> serializer;
    private volatile boolean closed;

    DisruptorBackedObjectQueueProducer(ObjectQueueBuilder<E, S> builder) {
        Objects.requireNonNull(builder, "builder");
        this.storage = builder.getStorage();
        this.disruptor = new Disruptor<ProducerEvent>(() -> new ProducerEvent(), builder.getProducerRingBufferSize(), producerWorkerFactory);
        this.disruptor.handleEventsWith(new ProduceHandler(builder.getConsumerFetchBatchSize()));
        this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<ProducerEvent>("object-queue-producer", (event, ex) -> {
            if (((ProducerEvent)event).future != null) {
                ((ProducerEvent)event).future.completeExceptionally(ex);
            }
        }));
        this.disruptor.start();
        this.translator = new ProducerTranslator();
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.executor = builder.getProducerExecutorService();
        this.shutdownExecutor = builder.isShutdownProducerExecutorService();
        this.serializer = builder.getCodec();
    }

    @Override
    public CompletableFuture<Void> produce(E object) {
        Objects.requireNonNull(object, "object");
        if (this.closed) {
            return DisruptorBackedObjectQueueProducer.exceptionallyCompletedFuture(new IllegalStateException("producer has been closed"));
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            S payload = this.serializer.serialize(object);
            this.ringBuffer.publishEvent(this.translator, payload, future, Boolean.FALSE);
        }
        catch (SerializationException ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    @Override
    public CompletableFuture<Void> flush() {
        if (this.closed) {
            return DisruptorBackedObjectQueueProducer.exceptionallyCompletedFuture(new IllegalStateException("producer has been closed"));
        }
        return this.doFlush();
    }

    @Override
    public synchronized void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        CompletableFuture<Void> future = this.doFlush();
        future.join();
        this.disruptor.shutdown();
        if (this.shutdownExecutor) {
            this.executor.shutdown();
        }
        this.storage.close();
    }

    private CompletableFuture<Void> doFlush() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.ringBuffer.publishEvent(this.translator, null, future, Boolean.TRUE);
        return future;
    }

    private static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable throwable) {
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally(throwable);
        return future;
    }

    private final class ProduceHandler
    implements EventHandler<ProducerEvent<S>> {
        private final int batchSize;
        private final List<S> batchPayload;
        private final List<CompletableFuture<Void>> batchFutures;

        ProduceHandler(int batchSize) {
            this.batchSize = batchSize;
            this.batchPayload = new ArrayList(batchSize);
            this.batchFutures = new ArrayList<CompletableFuture<Void>>(batchSize);
        }

        @Override
        public void onEvent(ProducerEvent<S> event, long sequence, boolean endOfBatch) {
            assert (event.future != null);
            if (event.flush) {
                if (!this.batchPayload.isEmpty()) {
                    this.flush();
                }
                DisruptorBackedObjectQueueProducer.this.executor.execute(() -> event.future.complete(null));
            } else {
                this.batchPayload.add(event.payload);
                this.batchFutures.add(event.future);
                if (this.batchPayload.size() >= this.batchSize || endOfBatch) {
                    this.flush();
                }
            }
            assert (this.batchPayload.size() == this.batchFutures.size()) : "batchPayload: " + this.batchPayload.size() + " batchFutures: " + this.batchFutures.size();
        }

        private void flush() {
            try {
                DisruptorBackedObjectQueueProducer.this.storage.store(this.batchPayload);
                this.completeFutures(this.batchFutures);
            }
            catch (StorageException ex) {
                this.completeFutures(this.batchFutures, ex);
            }
            this.batchPayload.clear();
            this.batchFutures.clear();
        }

        private void completeFutures(List<CompletableFuture<Void>> futures) {
            for (CompletableFuture<Void> future : futures) {
                try {
                    DisruptorBackedObjectQueueProducer.this.executor.execute(() -> future.complete(null));
                }
                catch (Exception ex) {
                    logger.error("Submit complete future task failed", ex);
                }
            }
        }

        private void completeFutures(List<CompletableFuture<Void>> futures, Throwable t) {
            for (CompletableFuture<Void> future : futures) {
                try {
                    DisruptorBackedObjectQueueProducer.this.executor.execute(() -> future.completeExceptionally(t));
                }
                catch (Exception ex) {
                    logger.error("Submit complete future task failed", ex);
                }
            }
        }
    }

    private static final class ProducerEvent<S> {
        @Nullable
        private S payload;
        @Nullable
        private CompletableFuture<Void> future;
        private boolean flush;

        private ProducerEvent() {
        }

        void reset() {
            this.payload = null;
            this.future = null;
            this.flush = false;
        }

        public String toString() {
            return "ProducerEvent{payload=" + (this.payload instanceof byte[] ? Base64.getEncoder().encodeToString((byte[])this.payload) : this.payload) + ", flush=" + this.flush + '}';
        }
    }

    private static final class ProducerTranslator<S>
    implements EventTranslatorThreeArg<ProducerEvent<S>, S, CompletableFuture<Void>, Boolean> {
        private ProducerTranslator() {
        }

        @Override
        public void translateTo(ProducerEvent<S> event, long sequence, S payload, CompletableFuture<Void> future, Boolean flush) {
            event.reset();
            ((ProducerEvent)event).future = future;
            if (Boolean.TRUE.equals(flush)) {
                ((ProducerEvent)event).flush = true;
            }
            if (payload != null) {
                ((ProducerEvent)event).payload = payload;
            }
        }
    }
}

