/*
 * Decompiled with CFR 0.152.
 */
package flipkart.cp.convert.chronosQ.impl.rmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import flipkart.cp.convert.chronosQ.core.SchedulerEntry;
import flipkart.cp.convert.chronosQ.core.SchedulerSink;
import flipkart.cp.convert.chronosQ.exceptions.ErrorCode;
import flipkart.cp.convert.chronosQ.exceptions.SchedulerException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RmqSchedulerSink
implements SchedulerSink {
    private static final Logger log = LoggerFactory.getLogger(RmqSchedulerSink.class);
    private Channel channel;
    private String exchange;
    private String queueName;
    private AMQP.BasicProperties properties = null;

    public RmqSchedulerSink(Channel channel, String exchange, String queueName) {
        this.channel = channel;
        this.exchange = exchange;
        this.queueName = queueName;
    }

    public RmqSchedulerSink(Channel channel, String exchange) {
        this.channel = channel;
        this.exchange = exchange;
    }

    public RmqSchedulerSink(Channel channel, String exchange, String queueName, AMQP.BasicProperties properties) {
        this(channel, exchange, queueName);
        this.properties = properties;
    }

    public CompletableFuture<Void> giveExpiredForProcessing(SchedulerEntry schedulerEntry) throws SchedulerException {
        try {
            log.info("Got message to be published " + schedulerEntry);
            this.channel.basicPublish(this.exchange, this.queueName, null, schedulerEntry.getPayload().getBytes());
            log.info("Message published -" + schedulerEntry);
            return CompletableFuture.completedFuture(null);
        }
        catch (IOException ex) {
            log.error("Unable to publish message to queue - " + schedulerEntry + "-" + ex.getMessage());
            throw new SchedulerException((Throwable)ex, ErrorCode.SCHEDULER_SINK_ERROR);
        }
    }

    public CompletableFuture<Void> giveExpiredListForProcessing(List<SchedulerEntry> schedulerEntries) throws SchedulerException {
        for (SchedulerEntry value : schedulerEntries) {
            this.giveExpiredForProcessing(value);
        }
        return CompletableFuture.completedFuture(null);
    }
}

