/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.redriver.service;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.SharedMetricRegistries;
import com.flipkart.flux.redriver.dao.MessageDao;
import com.flipkart.flux.redriver.model.ScheduledMessage;
import com.flipkart.flux.redriver.model.SmIdAndTaskIdWithExecutionVersion;
import com.flipkart.polyguice.core.Initializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Singleton
public class MessageManagerService
implements Initializable {
    private static final Logger logger = LogManager.getLogger(MessageManagerService.class);
    private static final String scheduledDeletionSvcName = "redriver-batch-delete-executor-svc";
    private static final String taskRegisterSvcName = "redriver-task-register-executor-svc";
    private static final String scheduledInsertionSvcName = "redriver-batch-insertion-executer-svc";
    private final MessageDao messageDao;
    private final Integer batchDeleteInterval;
    private final Integer batchDeleteSize;
    private final Integer batchInsertInterval;
    private final Integer batchInsertSize;
    private final ConcurrentLinkedQueue<SmIdAndTaskIdWithExecutionVersion> messagesToDelete;
    private final ConcurrentLinkedQueue<ScheduledMessage> messagesToInsertOrUpdate;
    private final InstrumentedScheduledExecutorService scheduledDeletionService;
    private final InstrumentedScheduledExecutorService scheduledInsertionService;
    private final InstrumentedExecutorService persistenceExecutorService;

    @Inject
    public MessageManagerService(MessageDao messageDao, @Named(value="redriver.noOfPersistenceWorkers") int noOfPersistenceWorkers, @Named(value="redriver.batchDelete.intervalms") Integer batchDeleteInterval, @Named(value="redriver.batchDelete.batchSize") Integer batchDeleteSize, @Named(value="redriver.batchInsert.batchSize") Integer batchInsertSize, @Named(value="redriver.batchInsert.intervalms") Integer batchInsertInterval) {
        this.messageDao = messageDao;
        this.batchDeleteInterval = batchDeleteInterval;
        this.batchDeleteSize = batchDeleteSize;
        this.batchInsertInterval = batchInsertInterval;
        this.batchInsertSize = batchInsertSize;
        this.messagesToInsertOrUpdate = new ConcurrentLinkedQueue();
        this.messagesToDelete = new ConcurrentLinkedQueue();
        this.scheduledInsertionService = new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(3), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), scheduledInsertionSvcName);
        this.scheduledDeletionService = new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(2), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), scheduledDeletionSvcName);
        this.persistenceExecutorService = new InstrumentedExecutorService(Executors.newFixedThreadPool(noOfPersistenceWorkers), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), taskRegisterSvcName + this.hashCode());
    }

    public List<ScheduledMessage> retrieveOldest(int offset, int count) {
        logger.info("Retrieving messages from ScheduledMessages offset:{} count:{}", (Object)offset, (Object)count);
        return this.messageDao.retrieveOldest(offset, count);
    }

    public void saveMessage(ScheduledMessage message) {
        this.messagesToInsertOrUpdate.add(message);
    }

    public void scheduleForRemoval(String stateMachine, Long taskId, Long executionVersion) {
        this.messagesToDelete.add(new SmIdAndTaskIdWithExecutionVersion(stateMachine, taskId, executionVersion));
    }

    public void initialize() {
        this.scheduledDeletionService.scheduleAtFixedRate(() -> {
            try {
                SmIdAndTaskIdWithExecutionVersion currentMessageIdToDelete = null;
                ArrayList<SmIdAndTaskIdWithExecutionVersion> messageIdsToDelete = new ArrayList<SmIdAndTaskIdWithExecutionVersion>(this.batchDeleteSize);
                while (messageIdsToDelete.size() < this.batchDeleteSize && (currentMessageIdToDelete = this.messagesToDelete.poll()) != null) {
                    messageIdsToDelete.add(currentMessageIdToDelete);
                }
                if (!messageIdsToDelete.isEmpty()) {
                    logger.info("Running Deletion job, trying deleting {} messages", (Object)messageIdsToDelete.size());
                    int rowsAffected = this.messageDao.deleteInBatch(messageIdsToDelete);
                    logger.info("Actually Deleted {} rows", (Object)rowsAffected);
                }
            }
            catch (Throwable throwable) {
                logger.error("ScheduledDeletion Job failed for Redriver Messages.", throwable);
            }
        }, 0L, (long)this.batchDeleteInterval.intValue(), TimeUnit.MILLISECONDS);
        this.scheduledInsertionService.scheduleAtFixedRate(() -> {
            try {
                ScheduledMessage currentMessageIdToInsert = null;
                ArrayList<ScheduledMessage> messagesToInsert = new ArrayList<ScheduledMessage>(this.batchInsertSize);
                while (messagesToInsert.size() < this.batchInsertSize && (currentMessageIdToInsert = this.messagesToInsertOrUpdate.poll()) != null) {
                    messagesToInsert.add(currentMessageIdToInsert);
                }
                if (!messagesToInsert.isEmpty()) {
                    logger.info("Running Insertion job, trying inserting {} messages", (Object)messagesToInsert.size());
                    int rowsAffected = this.messageDao.bulkInsertOrUpdate(messagesToInsert);
                    logger.info("Actually Inserted/Updated {} rows", (Object)rowsAffected);
                }
            }
            catch (Throwable throwable) {
                logger.error("ScheduledInsertion Job failed for Redriver Messages.", throwable);
            }
        }, 0L, (long)this.batchInsertInterval.intValue(), TimeUnit.MILLISECONDS);
        this.registerShutdownHook((ExecutorService)this.scheduledInsertionService, 10, "Could not shutdown executorService " + this.scheduledInsertionService);
        this.registerShutdownHook((ExecutorService)this.scheduledDeletionService, 2, "Could not shutdown executorService redriver-batch-delete-executor-svc");
        this.registerShutdownHook((ExecutorService)this.persistenceExecutorService, 5, "Error occurred while terminating Redriver's persistence executor service");
    }

    private void registerShutdownHook(final ExecutorService executorService, final int timeout, final String customErrorMsg) {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                executorService.shutdown();
                try {
                    executorService.awaitTermination(timeout, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    logger.error(customErrorMsg, (Throwable)e);
                }
            }
        });
    }
}

