/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir;

import com.github.ylgrgyq.reservoir.Codec;
import com.github.ylgrgyq.reservoir.LogExceptionHandler;
import com.github.ylgrgyq.reservoir.NamedThreadFactory;
import com.github.ylgrgyq.reservoir.ObjectQueueBuilder;
import com.github.ylgrgyq.reservoir.ObjectQueueProducer;
import com.github.ylgrgyq.reservoir.ObjectQueueStorage;
import com.github.ylgrgyq.reservoir.SerializationException;
import com.github.ylgrgyq.reservoir.StorageException;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.spotify.futures.CompletableFutures;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DisruptorBackedObjectQueueProducer<E>
implements ObjectQueueProducer<E> {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorBackedObjectQueueProducer.class);
    private final ObjectQueueStorage storage;
    private final Disruptor<ProducerEvent> disruptor;
    private final RingBuffer<ProducerEvent> ringBuffer;
    private final EventTranslatorThreeArg<ProducerEvent, byte[], CompletableFuture<Void>, Boolean> translator;
    private final ExecutorService executor;
    private final Codec<E> serializer;
    private volatile boolean closed;

    public DisruptorBackedObjectQueueProducer(ObjectQueueBuilder<E> builder) {
        Objects.requireNonNull(builder, "builder");
        this.storage = builder.getStorage();
        this.disruptor = new Disruptor<ProducerEvent>(() -> new ProducerEvent(), builder.getRingBufferSize(), new NamedThreadFactory("producer-worker-"));
        this.disruptor.handleEventsWith(new ProduceHandler(builder.getBatchSize()));
        this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<ProducerEvent>("object-queue-producer", (event, ex) -> {
            if (((ProducerEvent)event).future != null) {
                ((ProducerEvent)event).future.completeExceptionally(ex);
            }
        }));
        this.disruptor.start();
        this.translator = new ProducerTranslator();
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.executor = builder.getExecutorService();
        this.serializer = builder.getCodec();
    }

    @Override
    public CompletableFuture<Void> produce(E object) {
        Objects.requireNonNull(object, "object");
        if (this.closed) {
            return CompletableFutures.exceptionallyCompletedFuture(new IllegalStateException("producer has been closed"));
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            byte[] payload = this.serializer.serialize(object);
            this.ringBuffer.publishEvent(this.translator, payload, future, Boolean.FALSE);
        }
        catch (SerializationException ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    @Override
    public CompletableFuture<Void> flush() {
        if (this.closed) {
            return CompletableFutures.exceptionallyCompletedFuture(new IllegalStateException("producer has been closed"));
        }
        return this.doFlush();
    }

    @Override
    public void close() throws Exception {
        this.closed = true;
        CompletableFuture<Void> future = this.doFlush();
        future.join();
        this.disruptor.shutdown();
        this.executor.shutdown();
        this.storage.close();
    }

    private CompletableFuture<Void> doFlush() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.ringBuffer.publishEvent(this.translator, null, future, Boolean.TRUE);
        return future;
    }

    private final class ProduceHandler
    implements EventHandler<ProducerEvent> {
        private final int batchSize;
        private final List<byte[]> batchPayload;
        private final List<CompletableFuture<Void>> batchFutures;

        ProduceHandler(int batchSize) {
            this.batchSize = batchSize;
            this.batchPayload = new ArrayList<byte[]>(batchSize);
            this.batchFutures = new ArrayList<CompletableFuture<Void>>(batchSize);
        }

        @Override
        public void onEvent(ProducerEvent event, long sequence, boolean endOfBatch) {
            assert (event.future != null);
            if (event.flush) {
                if (!this.batchPayload.isEmpty()) {
                    this.flush();
                }
                DisruptorBackedObjectQueueProducer.this.executor.execute(() -> event.future.complete(null));
            } else {
                this.batchPayload.add(event.payload);
                this.batchFutures.add(event.future);
                if (this.batchPayload.size() >= this.batchSize || endOfBatch) {
                    this.flush();
                }
            }
            assert (this.batchPayload.size() == this.batchFutures.size()) : "batchPayload: " + this.batchPayload.size() + " batchFutures: " + this.batchFutures.size();
        }

        private void flush() {
            try {
                DisruptorBackedObjectQueueProducer.this.storage.store(this.batchPayload);
                this.completeFutures(this.batchFutures);
            }
            catch (StorageException ex) {
                this.completeFutures(this.batchFutures, ex);
            }
            this.batchPayload.clear();
            this.batchFutures.clear();
        }

        private void completeFutures(List<CompletableFuture<Void>> futures) {
            for (CompletableFuture<Void> future : futures) {
                try {
                    DisruptorBackedObjectQueueProducer.this.executor.execute(() -> future.complete(null));
                }
                catch (Exception ex) {
                    logger.error("Submit complete future task failed", ex);
                }
            }
        }

        private void completeFutures(List<CompletableFuture<Void>> futures, Throwable t) {
            for (CompletableFuture<Void> future : futures) {
                try {
                    DisruptorBackedObjectQueueProducer.this.executor.execute(() -> future.completeExceptionally(t));
                }
                catch (Exception ex) {
                    logger.error("Submit complete future task failed", ex);
                }
            }
        }
    }

    private static final class ProducerEvent {
        @Nullable
        private byte[] payload;
        @Nullable
        private CompletableFuture<Void> future;
        private boolean flush;

        private ProducerEvent() {
        }

        void reset() {
            this.payload = null;
            this.future = null;
            this.flush = false;
        }

        public String toString() {
            return "ProducerEvent{payload=" + Base64.getEncoder().encodeToString(this.payload) + ", flush=" + this.flush + '}';
        }

        static /* synthetic */ byte[] access$302(ProducerEvent x0, byte[] x1) {
            x0.payload = x1;
            return x1;
        }
    }

    private static final class ProducerTranslator
    implements EventTranslatorThreeArg<ProducerEvent, byte[], CompletableFuture<Void>, Boolean> {
        private ProducerTranslator() {
        }

        @Override
        public void translateTo(ProducerEvent event, long sequence, byte[] payload, CompletableFuture<Void> future, Boolean flush) {
            event.reset();
            event.future = future;
            if (Boolean.TRUE.equals(flush)) {
                event.flush = true;
            }
            if (payload != null) {
                ProducerEvent.access$302(event, payload);
            }
        }
    }
}

