/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.foxtrot.client.senders;

import com.flipkart.foxtrot.client.Document;
import com.flipkart.foxtrot.client.EventSender;
import com.flipkart.foxtrot.client.EventSerializationHandler;
import com.google.common.collect.Lists;
import com.leansoft.bigqueue.BigQueueImpl;
import com.leansoft.bigqueue.IBigQueue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedSender
extends EventSender {
    private static final Logger logger = LoggerFactory.getLogger((String)QueuedSender.class.getSimpleName());
    private static final int RETRIES = 5;
    private static final int MAX_PAYLOAD_SIZE = 2000000;
    private final EventSender eventSender;
    private IBigQueue messageQueue;
    private final ScheduledExecutorService scheduler;

    public QueuedSender(EventSender eventSender, EventSerializationHandler serializationHandler, String path, int batchSize, int numSecondsBetweenRefresh) throws Exception {
        super(serializationHandler);
        this.eventSender = eventSender;
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx");
        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
        Files.createDirectories(Paths.get(path, new String[0]), attr);
        this.messageQueue = new BigQueueImpl(path, "foxtrot-messages");
        MessageSenderThread messageSenderThread = new MessageSenderThread(this, eventSender, this.messageQueue, this.getSerializationHandler(), batchSize);
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.scheduler.scheduleWithFixedDelay(messageSenderThread, 0L, numSecondsBetweenRefresh, TimeUnit.SECONDS);
        this.scheduler.scheduleAtFixedRate(new QueueCleaner(this.messageQueue), 0L, 30L, TimeUnit.SECONDS);
    }

    @Override
    public void send(Document event) throws Exception {
        this.messageQueue.enqueue(this.getSerializationHandler().serialize(event));
    }

    @Override
    public void send(List<Document> events) throws Exception {
        this.messageQueue.enqueue(this.getSerializationHandler().serialize(events));
    }

    @Override
    public void close() throws Exception {
        while (!this.messageQueue.isEmpty()) {
            Thread.sleep(1000L);
            logger.debug("Message queue is not empty .. waiting");
        }
        this.scheduler.shutdownNow();
        logger.debug("Shut down sender thread");
        this.eventSender.close();
        logger.debug("Shut down scheduled sender");
    }

    private static final class QueueCleaner
    implements Runnable {
        private IBigQueue messageQueue;

        private QueueCleaner(IBigQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            try {
                long startTime = System.currentTimeMillis();
                this.messageQueue.gc();
                logger.info(String.format("Ran GC on queue. Took: %d milliseconds", System.currentTimeMillis() - startTime));
            }
            catch (IOException e) {
                logger.error("Could not perform GC on foxtrot message queue: ", (Throwable)e);
            }
        }
    }

    private static final class MessageSenderThread
    implements Runnable {
        private final QueuedSender sender;
        private EventSender eventSender;
        private IBigQueue messageQueue;
        private final EventSerializationHandler serializationHandler;
        private int batchSize;

        public MessageSenderThread(QueuedSender queuedSender, EventSender eventSender, IBigQueue messageQueue, EventSerializationHandler serializationHandler, int batchSize) throws Exception {
            this.sender = queuedSender;
            this.eventSender = eventSender;
            this.messageQueue = messageQueue;
            this.serializationHandler = serializationHandler;
            this.batchSize = batchSize;
        }

        @Override
        public void run() {
            try {
                while (!this.messageQueue.isEmpty()) {
                    byte[] data;
                    logger.info("There are messages in the foxtrot message queue. Sender invoked.");
                    ArrayList entries = Lists.newArrayListWithExpectedSize((int)this.batchSize);
                    int sizeOfPayload = 0;
                    for (int i = 0; i < this.batchSize && null != (data = this.messageQueue.dequeue()); ++i) {
                        if ((sizeOfPayload += data.length + 24 + 8) > 2000000) {
                            if (data.length + 24 + 8 > 2000000) {
                                logger.error(String.format("Dropping message as size > 2MB  packet : %s", new String(data)));
                                continue;
                            }
                            logger.info(String.format("data size %d > 2MB threshold, hence truncating batch size for this and enqueing the last overriding data to pass on in next batch.", sizeOfPayload));
                            this.messageQueue.enqueue(data);
                            break;
                        }
                        entries.add(this.serializationHandler.deserialize(data));
                    }
                    if (!entries.isEmpty()) {
                        int retryCount = 0;
                        while (true) {
                            ++retryCount;
                            try {
                                this.eventSender.send(entries);
                                logger.info(String.format("Sent %d events to foxtrot.", entries.size()));
                            }
                            catch (Throwable t) {
                                logger.error("Could not send events: ", t);
                                if (retryCount <= 5) continue;
                            }
                            break;
                        }
                        if (retryCount <= 5) continue;
                        logger.error("Could not send event. Probably foxtrot api is down. Re-queuing the messages. Order will be screwed up. But will appear proper on graph once ingested.");
                        this.sender.send(entries);
                        break;
                    }
                    logger.info("Nothing to send to foxtrot");
                }
            }
            catch (Exception e) {
                logger.error("Could not send message: ", (Throwable)e);
            }
        }
    }
}

