/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.foxtrot.client.senders;

import com.bluejeans.bigqueue.BigQueue;
import com.flipkart.foxtrot.client.Document;
import com.flipkart.foxtrot.client.EventSender;
import com.flipkart.foxtrot.client.serialization.EventSerializationHandler;
import com.google.common.collect.Lists;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedSender
extends EventSender {
    private static final Logger logger = LoggerFactory.getLogger((String)QueuedSender.class.getSimpleName());
    private static final int RETRIES = 5;
    private static final int MAX_PAYLOAD_SIZE = 2000000;
    private final EventSender eventSender;
    private final String path;
    private final MessageSenderThread messageSenderThread;
    private final ScheduledExecutorService scheduler;
    private BigQueue messageQueue;

    public QueuedSender(EventSender eventSender, EventSerializationHandler serializationHandler, String path, int batchSize) throws Exception {
        super(serializationHandler);
        this.eventSender = eventSender;
        this.path = path;
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx");
        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
        Files.createDirectories(Paths.get(path, new String[0]), attr);
        this.messageQueue = new BigQueue(path, "foxtrot-messages");
        this.messageSenderThread = new MessageSenderThread(this, eventSender, this.messageQueue, path, this.getSerializationHandler(), batchSize);
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.scheduler.scheduleWithFixedDelay(this.messageSenderThread, 0L, 1L, TimeUnit.SECONDS);
        this.scheduler.scheduleWithFixedDelay(new QueueCleaner(this.messageQueue, path), 0L, 15L, TimeUnit.SECONDS);
    }

    @Override
    public void send(Document document) throws Exception {
        this.messageQueue.enqueue(this.getSerializationHandler().serialize(document));
    }

    @Override
    public void send(String table, Document document) throws Exception {
        throw new IllegalAccessException("Send to table is not implemented for queued sender");
    }

    @Override
    public void send(List<Document> documents) throws Exception {
        for (Document document : documents) {
            this.messageQueue.enqueue(this.getSerializationHandler().serialize(document));
        }
    }

    @Override
    public void send(String table, List<Document> documents) throws Exception {
        throw new IllegalAccessException("Send to table is not implemented for queued sender");
    }

    @Override
    public void close() throws Exception {
        logger.info("queue={} closing_queued_sender", new Object[]{this.path});
        while (!this.messageQueue.isEmpty()) {
            Thread.sleep(500L);
            logger.info("queue={} message_queue_not_empty waiting_for_queue_to_get_empty", new Object[]{this.path});
        }
        while (this.messageSenderThread.isRunning()) {
            Thread.sleep(500L);
            logger.info("queue={} message_sender_thread_still_running waiting_for_completion", new Object[]{this.path});
        }
        this.scheduler.shutdownNow();
        logger.info("queue={} shutting_down_message_sender_thread", new Object[]{this.path});
        this.eventSender.close();
        logger.info("queue={} shutdown_completed_for_message_sender_thread", new Object[]{this.path});
    }

    private static final class QueueCleaner
    implements Runnable {
        private BigQueue messageQueue;
        private String path;

        private QueueCleaner(BigQueue messageQueue, String path) {
            this.messageQueue = messageQueue;
            this.path = path;
        }

        @Override
        public void run() {
            try {
                long startTime = System.currentTimeMillis();
                this.messageQueue.gc();
                logger.info("queue={} ran_gc_on_foxtrot_message_queue took={}", new Object[]{this.path, System.currentTimeMillis() - startTime});
            }
            catch (Exception e) {
                logger.error("queue={} gc_failed_on_foxtrot_message_queue", (Object)new Object[]{this.path}, (Object)e);
            }
        }
    }

    private static final class MessageSenderThread
    implements Runnable {
        private final QueuedSender sender;
        private final EventSerializationHandler serializationHandler;
        private EventSender eventSender;
        private BigQueue messageQueue;
        private int batchSize;
        private String path;
        private AtomicBoolean running = new AtomicBoolean(false);

        public MessageSenderThread(QueuedSender queuedSender, EventSender eventSender, BigQueue messageQueue, String path, EventSerializationHandler serializationHandler, int batchSize) throws Exception {
            this.sender = queuedSender;
            this.eventSender = eventSender;
            this.messageQueue = messageQueue;
            this.path = path;
            this.serializationHandler = serializationHandler;
            this.batchSize = batchSize;
        }

        @Override
        public void run() {
            this.running.set(true);
            try {
                while (!this.messageQueue.isEmpty()) {
                    byte[] data;
                    logger.info("queue={} messages_found_in_message_queue sender_invoked", new Object[]{this.path});
                    ArrayList entries = Lists.newArrayListWithExpectedSize((int)this.batchSize);
                    int sizeOfPayload = 0;
                    for (int i = 0; i < this.batchSize && null != (data = this.messageQueue.dequeue()); ++i) {
                        if ((sizeOfPayload += data.length + 24 + 8) > 2000000) {
                            if (data.length + 24 + 8 > 2000000) {
                                logger.error("queue={} message_size_limit_exceeded(2MB) message={}", new Object[]{this.path, new String(data)});
                                continue;
                            }
                            logger.info("queue={} batch_data_size_exceeds_threshold(2MB) size={} truncating_batch_size enqueuing_last_message_for_next_batch", new Object[]{this.path, sizeOfPayload});
                            this.messageQueue.enqueue(data);
                            break;
                        }
                        entries.add(this.serializationHandler.deserialize(data));
                    }
                    if (!entries.isEmpty()) {
                        int retryCount = 0;
                        while (true) {
                            ++retryCount;
                            try {
                                this.eventSender.send(entries);
                                logger.info("queue={} foxtrot_messages_sent count={}", new Object[]{this.path, entries.size()});
                            }
                            catch (Throwable t) {
                                logger.error("queue={} message_send_failed count={}", (Object)new Object[]{this.path, entries.size()}, (Object)t);
                                if (retryCount <= 5) continue;
                            }
                            break;
                        }
                        if (retryCount <= 5) continue;
                        logger.error("queue={} message_send_failed probably_api_down  re-queuing_messages", new Object[]{this.path});
                        this.sender.send(entries);
                        break;
                    }
                    logger.info("queue={} nothing_to_send_to_foxtrot", new Object[]{this.path});
                }
            }
            catch (Exception e) {
                logger.error("queue={} message_send_failed", (Object)new Object[]{this.path}, (Object)e);
            }
            this.running.set(false);
        }

        private boolean isRunning() {
            return this.running.get();
        }
    }
}

