/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.comms;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.olacabs.fabric.compute.comms.BlockingQueueCommsChannel;
import com.olacabs.fabric.compute.comms.CommsChannel;
import com.olacabs.fabric.compute.comms.CommsFrameworkMessage;
import com.olacabs.fabric.compute.comms.CommsMessageHandler;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisruptorCommsChannel<E>
implements CommsChannel<E> {
    private static final Logger log = LoggerFactory.getLogger(DisruptorCommsChannel.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(BlockingQueueCommsChannel.class);
    private final String name;
    private final boolean isSingleProducer;
    private final CommsMessageHandler<E> handler;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    private int bufferSize;
    private Disruptor<CommsFrameworkMessage<E>> disruptor;

    public DisruptorCommsChannel(String name, boolean isSingleProducer, String waitStrategy, int bufferSize, CommsMessageHandler<E> handler) {
        BlockingWaitStrategy waitStrategy1;
        this.isSingleProducer = isSingleProducer;
        this.handler = handler;
        this.name = name;
        if ((bufferSize & bufferSize - 1) != 0) {
            throw new IllegalArgumentException("Disruptor buffer size must always be a power of 2");
        }
        this.bufferSize = bufferSize;
        switch (waitStrategy) {
            case "block": {
                waitStrategy1 = new BlockingWaitStrategy();
                break;
            }
            case "lite": {
                waitStrategy1 = new LiteBlockingWaitStrategy();
                break;
            }
            case "timeout": {
                waitStrategy1 = new TimeoutBlockingWaitStrategy(10L, TimeUnit.MILLISECONDS);
                break;
            }
            case "sleep": {
                waitStrategy1 = new SleepingWaitStrategy();
                break;
            }
            case "yield": {
                waitStrategy1 = new YieldingWaitStrategy();
                break;
            }
            case "busy": {
                waitStrategy1 = new BusySpinWaitStrategy();
                break;
            }
            default: {
                waitStrategy1 = new SleepingWaitStrategy();
            }
        }
        this.disruptor = isSingleProducer ? new Disruptor(CommsFrameworkMessage::new, bufferSize, (Executor)this.executorService, ProducerType.SINGLE, (WaitStrategy)waitStrategy1) : new Disruptor(CommsFrameworkMessage::new, bufferSize, (Executor)this.executorService, ProducerType.MULTI, (WaitStrategy)waitStrategy1);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void publish(E sourceEvent) {
        try {
            RingBuffer ringBuffer = this.disruptor.getRingBuffer();
            ringBuffer.publishEvent((event, sequence) -> {
                event.setId(sequence);
                event.setSource(this.name);
                event.setPayload(sourceEvent);
            });
        }
        catch (Exception e) {
            log.error("Comms channel stopped");
        }
    }

    @Override
    public void start() {
        this.disruptor.handleEventsWith(new EventHandler[]{(event, sequence, endOfBatch) -> {
            try {
                this.handler.handlePipelineMessage(event.getPayload());
            }
            catch (Throwable t) {
                throw new RuntimeException("Error sending message to processor: ", t);
            }
        }});
        this.disruptor.start();
    }

    @Override
    public void stop() {
        this.disruptor.halt();
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public void setDisruptor(Disruptor<CommsFrameworkMessage<E>> disruptor) {
        this.disruptor = disruptor;
    }

    public String getName() {
        return this.name;
    }

    public boolean isSingleProducer() {
        return this.isSingleProducer;
    }

    public CommsMessageHandler<E> getHandler() {
        return this.handler;
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public Disruptor<CommsFrameworkMessage<E>> getDisruptor() {
        return this.disruptor;
    }
}

