/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.comms;

import com.olacabs.fabric.compute.comms.CommsChannel;
import com.olacabs.fabric.compute.comms.CommsFrameworkMessage;
import com.olacabs.fabric.compute.comms.CommsMessageHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockingQueueCommsChannel<E>
implements CommsChannel<E> {
    private static final Logger log = LoggerFactory.getLogger(BlockingQueueCommsChannel.class);
    private final String name;
    private final boolean isSingleProducer;
    private final CommsMessageHandler<E> handler;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    private LinkedBlockingQueue<CommsFrameworkMessage<E>> queue;
    private Future jobFuture;

    BlockingQueueCommsChannel(String name, boolean isSingleProducer, CommsMessageHandler<E> handler) {
        this.isSingleProducer = isSingleProducer;
        this.handler = handler;
        this.name = name;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void publish(E sourceEvent) {
        try {
            this.queue.put(CommsFrameworkMessage.builder().id(0L).payload(sourceEvent).source(this.name).build());
        }
        catch (InterruptedException e) {
            log.error("Comms channel stopped");
        }
    }

    @Override
    public void start() {
        this.queue = new LinkedBlockingQueue(8);
        this.jobFuture = this.executorService.submit(() -> {
            while (true) {
                CommsFrameworkMessage<E> message = this.queue.take();
                try {
                    this.handler.handlePipelineMessage(message.getPayload());
                }
                catch (Throwable t) {
                    throw new RuntimeException("Error sending message to processor: ", t);
                }
            }
        });
    }

    @Override
    public void stop() {
        if (null != this.jobFuture) {
            this.jobFuture.cancel(true);
        }
    }
}

