/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.redriver.service;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.SharedMetricRegistries;
import com.flipkart.flux.redriver.dao.MessageDao;
import com.flipkart.flux.redriver.model.ScheduledMessage;
import com.flipkart.polyguice.core.Disposable;
import com.flipkart.polyguice.core.Initializable;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MessageManagerService
implements Disposable,
Initializable {
    private final MessageDao messageDao;
    private final Long batchDeleteInterval;
    private final Integer batchSize;
    private final ConcurrentLinkedQueue<ScheduledMessage> messagesToDelete;
    private static final Logger logger = LoggerFactory.getLogger(MessageManagerService.class);
    private final InstrumentedScheduledExecutorService scheduledExecutorService;

    @Inject
    public MessageManagerService(MessageDao messageDao, @Named(value="redriver.batchdelete.intervalms") Integer batchDeleteInterval, @Named(value="redriver.batchdelete.batchsize") Integer batchSize) {
        this.messageDao = messageDao;
        this.batchDeleteInterval = (long)batchDeleteInterval;
        this.batchSize = batchSize;
        this.messagesToDelete = new ConcurrentLinkedQueue();
        this.scheduledExecutorService = new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(2), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"));
    }

    public void saveMessage(ScheduledMessage message) {
        this.messageDao.save(message);
    }

    public void scheduleForRemoval(ScheduledMessage message) {
        this.messagesToDelete.add(message);
    }

    public void dispose() {
        this.scheduledExecutorService.shutdown();
        try {
            this.scheduledExecutorService.awaitTermination(2L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            logger.error("Could not shutdown scheduled executor service", (Throwable)e);
        }
    }

    public void initialize() {
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            int i = 0;
            ScheduledMessage currentMessageToDelete = null;
            ArrayList<Long> messageIdsToDelete = new ArrayList<Long>(this.batchSize);
            do {
                if ((currentMessageToDelete = this.messagesToDelete.poll()) == null) continue;
                messageIdsToDelete.add(currentMessageToDelete.getTaskId());
            } while (currentMessageToDelete != null && ++i < this.batchSize);
            if (!messageIdsToDelete.isEmpty()) {
                this.messageDao.deleteInBatch(messageIdsToDelete);
            }
        }, 0L, this.batchDeleteInterval.longValue(), TimeUnit.MILLISECONDS);
    }
}

