/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.foxtrot.client.senders;

import com.flipkart.foxtrot.client.Document;
import com.flipkart.foxtrot.client.EventSender;
import com.flipkart.foxtrot.client.FoxtrotClientConfig;
import com.flipkart.foxtrot.client.cluster.FoxtrotCluster;
import com.flipkart.foxtrot.client.cluster.FoxtrotClusterMember;
import com.flipkart.foxtrot.client.senders.impl.CustomKeepAliveStrategy;
import com.flipkart.foxtrot.client.serialization.EventSerializationHandler;
import com.flipkart.foxtrot.client.serialization.SerializationException;
import com.google.common.base.Preconditions;
import com.google.common.net.MediaType;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpAsyncEventSender
extends EventSender {
    private static final Logger logger = LoggerFactory.getLogger((String)HttpAsyncEventSender.class.getSimpleName());
    private final String table;
    private final FoxtrotCluster client;
    private final ScheduledExecutorService executorService;
    private CloseableHttpAsyncClient httpClient;

    public HttpAsyncEventSender(FoxtrotClientConfig config, FoxtrotCluster client, EventSerializationHandler serializationHandler) throws IOReactorException {
        super(serializationHandler);
        this.table = config.getTable();
        this.client = client;
        DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
        PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager((ConnectingIOReactor)ioReactor);
        cm.setMaxTotal(1024);
        cm.setDefaultMaxPerRoute(1);
        this.httpClient = HttpAsyncClients.custom().setConnectionManager((NHttpClientConnectionManager)cm).setKeepAliveStrategy((ConnectionKeepAliveStrategy)new CustomKeepAliveStrategy(config)).build();
        Evictor connEvictor = new Evictor(this.table, (NHttpClientConnectionManager)cm);
        this.executorService = Executors.newScheduledThreadPool(1);
        this.executorService.scheduleWithFixedDelay(connEvictor, 1L, 5L, TimeUnit.SECONDS);
        this.httpClient.start();
    }

    @Override
    public void send(Document document) {
        this.send(Collections.singletonList(document));
    }

    @Override
    public void send(List<Document> documents) {
        try {
            this.send(this.getSerializationHandler().serialize(documents));
        }
        catch (SerializationException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws Exception {
        logger.info("table={} shutting_down_executor_service", new Object[]{this.table});
        this.executorService.shutdownNow();
        logger.info("table={} executor_service_shutdown_completed", new Object[]{this.table});
        logger.info("table={} closing_down_http_client", new Object[]{this.table});
        this.httpClient.close();
        logger.info("table={} closed_http_client", new Object[]{this.table});
    }

    public void send(byte[] payload) {
        FoxtrotClusterMember clusterMember = this.client.member();
        Preconditions.checkNotNull((Object)clusterMember, (Object)"No members found in foxtrot cluster");
        try {
            URI requestURI = new URIBuilder().setScheme("http").setHost(clusterMember.getHost()).setPort(clusterMember.getPort()).setPath(String.format("/foxtrot/v1/document/%s/bulk", this.table)).build();
            HttpPost post = new HttpPost(requestURI);
            post.setHeader("Content-Type", MediaType.JSON_UTF_8.toString());
            post.setEntity((HttpEntity)new ByteArrayEntity(payload));
            this.httpClient.execute((HttpUriRequest)post, (FutureCallback)new FutureCallback<HttpResponse>(){

                public void completed(HttpResponse response) {
                    try {
                        if (response.getStatusLine().getStatusCode() != 201) {
                            logger.error("table={} message_sending_failed api_response={}", new Object[]{HttpAsyncEventSender.this.table, EntityUtils.toString((HttpEntity)response.getEntity())});
                        }
                        logger.debug("Payload: {}", (Object)EntityUtils.toString((HttpEntity)response.getEntity()));
                    }
                    catch (IOException e) {
                        logger.error("table={} api_response_deserialization_failed", (Object)new Object[]{HttpAsyncEventSender.this.table}, (Object)e);
                    }
                }

                public void failed(Exception ex) {
                    logger.error("table={} message_sending_failed", (Object)new Object[]{HttpAsyncEventSender.this.table}, (Object)ex);
                }

                public void cancelled() {
                    logger.error("table={} call_to_foxtrot_cancelled", new Object[]{HttpAsyncEventSender.this.table});
                }
            });
            logger.debug("table={} messages_sent host={} port={}", new Object[]{this.table, clusterMember.getHost(), clusterMember.getPort()});
        }
        catch (URISyntaxException e) {
            logger.error("table={} invalid_uri_syntax", (Object)new Object[]{this.table}, (Object)e);
        }
    }

    private static class Evictor
    implements Runnable {
        private String table;
        private final NHttpClientConnectionManager connectionManager;

        public Evictor(String table, NHttpClientConnectionManager connectionManager) {
            this.table = table;
            this.connectionManager = connectionManager;
        }

        @Override
        public void run() {
            try {
                this.connectionManager.closeExpiredConnections();
                this.connectionManager.closeIdleConnections(5L, TimeUnit.SECONDS);
            }
            catch (Exception ex) {
                logger.error("table={} connection_cleanup_failed", (Object)new Object[]{this.table}, (Object)ex);
            }
        }
    }
}

