/*
 * Decompiled with CFR 0.152.
 */
package flipkart.cp.gojira.queuedsender;

import com.leansoft.bigqueue.BigQueueImpl;
import com.leansoft.bigqueue.IBigQueue;
import flipkart.cp.gojira.core.injectors.GuiceInjector;
import flipkart.cp.gojira.models.TestData;
import flipkart.cp.gojira.models.TestDataType;
import flipkart.cp.gojira.models.TestRequestData;
import flipkart.cp.gojira.models.TestResponseData;
import flipkart.cp.gojira.queuedsender.TestQueuedSender;
import flipkart.cp.gojira.serde.SerdeHandlerRepository;
import flipkart.cp.gojira.sinkstore.handlers.SinkHandler;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestQueuedSenderImpl
extends TestQueuedSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestQueuedSenderImpl.class.getSimpleName());
    private IBigQueue messageQueue;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

    @Override
    public void setup() throws Exception {
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx");
        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
        Files.createDirectories(Paths.get(this.testQueuedSenderConfig.getPath(), new String[0]), attr);
        this.messageQueue = new BigQueueImpl(this.testQueuedSenderConfig.getPath(), "gojira-messages");
        MessageSenderThread messageSenderThread = new MessageSenderThread(this.messageQueue);
        this.scheduler.scheduleWithFixedDelay(messageSenderThread, 20L, this.testQueuedSenderConfig.getQueuePurgeInterval(), TimeUnit.SECONDS);
        this.scheduler.scheduleAtFixedRate(new TestQueueCleaner(this.messageQueue), 20L, this.testQueuedSenderConfig.getQueuePurgeInterval(), TimeUnit.SECONDS);
    }

    @Override
    public void shutdown() throws Exception {
        while (!this.messageQueue.isEmpty()) {
            Thread.sleep(1000L);
        }
        this.scheduler.shutdownNow();
    }

    @Override
    public <T extends TestDataType> void send(TestData<TestRequestData<T>, TestResponseData<T>, T> testData) throws Exception {
        if (this.messageQueue.size() < this.testQueuedSenderConfig.getQueueSize()) {
            LOGGER.info("TestData with id: " + testData.getId() + " enqueued.");
            this.messageQueue.enqueue(GuiceInjector.getInjector().getInstance(SerdeHandlerRepository.class).getTestDataSerdeHandler().serialize(testData));
        } else {
            LOGGER.error("messageQueue size greater than " + this.testQueuedSenderConfig.getQueueSize() + " testData.id " + testData.getId());
        }
    }

    private static final class TestQueueCleaner
    implements Runnable {
        private IBigQueue messageQueue;

        private TestQueueCleaner(IBigQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            try {
                long startTime = System.currentTimeMillis();
                this.messageQueue.gc();
                LOGGER.info(String.format("Ran GC on queue. Took: %d milliseconds", System.currentTimeMillis() - startTime));
            }
            catch (IOException e) {
                LOGGER.error("Could not perform GC on hyperion message queue: ", e);
            }
        }
    }

    private static final class MessageSenderThread
    implements Runnable {
        private IBigQueue messageQueue;

        public MessageSenderThread(IBigQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            try {
                while (!this.messageQueue.isEmpty()) {
                    LOGGER.info("There are messages in the hyperion message queue. Sender invoked.");
                    byte[] data = this.messageQueue.dequeue();
                    if (null != data) {
                        TestData testData = GuiceInjector.getInjector().getInstance(SerdeHandlerRepository.class).getTestDataSerdeHandler().deserialize(data, TestData.class);
                        LOGGER.info("TestData with id: " + testData.getId() + " send for DataStore write.");
                        GuiceInjector.getInjector().getInstance(SinkHandler.class).write(testData.getId(), data);
                        continue;
                    }
                    break;
                }
            }
            catch (Exception e) {
                LOGGER.error("Could not send message: ", e);
            }
        }
    }
}

