/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.redriver.service;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.SharedMetricRegistries;
import com.flipkart.flux.redriver.model.ScheduledMessage;
import com.flipkart.flux.redriver.service.MessageManagerService;
import com.flipkart.flux.task.redriver.RedriverRegistry;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class RedriverService {
    private static final Logger logger = LoggerFactory.getLogger(RedriverService.class);
    private static final String scheduledExectorSvcName = "redriver-batch-read-executor-svc";
    private Integer batchReadInterval;
    private Integer batchSize;
    private Long initialDelay = 10000L;
    private MessageManagerService messageService;
    private ScheduledFuture scheduledFuture;
    private final RedriverRegistry redriverRegistry;
    private final InstrumentedScheduledExecutorService scheduledExecutorService;

    @Inject
    public RedriverService(MessageManagerService messageService, RedriverRegistry redriverRegistry, @Named(value="redriver.batchread.intervalms") Integer batchReadInterval, @Named(value="redriver.batchread.batchsize") Integer batchSize) {
        this.redriverRegistry = redriverRegistry;
        this.batchReadInterval = batchReadInterval;
        this.batchSize = batchSize;
        this.messageService = messageService;
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.setRemoveOnCancelPolicy(true);
        this.scheduledExecutorService = new InstrumentedScheduledExecutorService(Executors.unconfigurableScheduledExecutorService(executor), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), scheduledExectorSvcName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        RedriverService redriverService = this;
        synchronized (redriverService) {
            if (this.scheduledFuture == null || this.scheduledFuture.isDone()) {
                this.scheduledFuture = this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                    try {
                        this.redrive();
                    }
                    catch (Exception e) {
                        logger.error("Error while running redrive", (Throwable)e);
                    }
                }, this.initialDelay.longValue(), (long)this.batchReadInterval.intValue(), TimeUnit.MILLISECONDS);
                logger.info("RedriverService started");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        RedriverService redriverService = this;
        synchronized (redriverService) {
            this.scheduledFuture.cancel(false);
            logger.info("RedriverService stopped");
        }
    }

    public Boolean isRunning() {
        return this.scheduledFuture != null && !this.scheduledFuture.isDone();
    }

    private void redrive() {
        List<ScheduledMessage> messages;
        int offset = 0;
        long now = System.currentTimeMillis();
        do {
            messages = this.messageService.retrieveOldest(offset, this.batchSize);
            messages.stream().filter(e -> e.getScheduledTime() < now).forEach(e -> this.redriverRegistry.redriveTask(e.getTaskId()));
            offset += this.batchSize.intValue();
        } while (messages.size() == this.batchSize.intValue() && messages.get(messages.size() - 1).getScheduledTime() < now);
    }

    public void setInitialDelay(Long initialDelay) {
        this.initialDelay = initialDelay;
    }
}

