/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.redriver.service;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.SharedMetricRegistries;
import com.flipkart.flux.redriver.dao.MessageDao;
import com.flipkart.flux.redriver.model.ScheduledMessage;
import com.flipkart.polyguice.core.Initializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MessageManagerService
implements Initializable {
    private static final Logger logger = LoggerFactory.getLogger(MessageManagerService.class);
    private static final String scheduledDeletionSvcName = "redriver-batch-delete-executor-svc";
    private static final String taskRegisterSvcName = "redriver-task-register-executor-svc";
    private final MessageDao messageDao;
    private final Integer batchDeleteInterval;
    private final Integer batchSize;
    private final ConcurrentLinkedQueue<Long> messagesToDelete;
    private final InstrumentedScheduledExecutorService scheduledDeletionService;
    private final InstrumentedExecutorService persistenceExecutorService;

    @Inject
    public MessageManagerService(MessageDao messageDao, @Named(value="redriver.noOfPersistenceWorkers") int noOfPersistenceWorkers, @Named(value="redriver.batchDelete.intervalms") Integer batchDeleteInterval, @Named(value="redriver.batchDelete.batchSize") Integer batchSize) {
        this.messageDao = messageDao;
        this.batchDeleteInterval = batchDeleteInterval;
        this.batchSize = batchSize;
        this.messagesToDelete = new ConcurrentLinkedQueue();
        this.scheduledDeletionService = new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(2), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), scheduledDeletionSvcName);
        this.persistenceExecutorService = new InstrumentedExecutorService(Executors.newFixedThreadPool(noOfPersistenceWorkers), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), taskRegisterSvcName);
    }

    public List<ScheduledMessage> retrieveOldest(int offset, int count) {
        return this.messageDao.retrieveOldest(offset, count);
    }

    public void saveMessage(ScheduledMessage message) {
        this.persistenceExecutorService.execute(() -> this.messageDao.save(message));
    }

    public void scheduleForRemoval(Long taskId) {
        this.messagesToDelete.add(taskId);
    }

    public void initialize() {
        this.scheduledDeletionService.scheduleAtFixedRate(() -> {
            Long currentMessageIdToDelete = null;
            ArrayList<Long> messageIdsToDelete = new ArrayList<Long>(this.batchSize);
            while (messageIdsToDelete.size() < this.batchSize && (currentMessageIdToDelete = this.messagesToDelete.poll()) != null) {
                messageIdsToDelete.add(currentMessageIdToDelete);
            }
            if (!messageIdsToDelete.isEmpty()) {
                this.messageDao.deleteInBatch(messageIdsToDelete);
            }
        }, 0L, (long)this.batchDeleteInterval.intValue(), TimeUnit.MILLISECONDS);
        this.registerShutdownHook((ExecutorService)this.scheduledDeletionService, 2, "Could not shutdown executorService redriver-batch-delete-executor-svc");
        this.registerShutdownHook((ExecutorService)this.persistenceExecutorService, 5, "Error occurred while terminating Redriver's persistence executor service");
    }

    private void registerShutdownHook(final ExecutorService executorService, final int timeout, final String customErrorMsg) {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                executorService.shutdown();
                try {
                    executorService.awaitTermination(timeout, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    logger.error(customErrorMsg, (Throwable)e);
                }
            }
        });
    }
}

