/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.foxtrot.client.senders;

import com.flipkart.foxtrot.client.Document;
import com.flipkart.foxtrot.client.EventSender;
import com.flipkart.foxtrot.client.EventSerializationHandler;
import com.flipkart.foxtrot.client.FoxtrotClientConfig;
import com.flipkart.foxtrot.client.cluster.FoxtrotCluster;
import com.flipkart.foxtrot.client.cluster.FoxtrotClusterMember;
import com.flipkart.foxtrot.client.serialization.SerializationException;
import com.google.common.base.Preconditions;
import com.google.common.net.MediaType;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpAsyncEventSender
extends EventSender {
    private static final Logger logger = LoggerFactory.getLogger((String)HttpAsyncEventSender.class.getSimpleName());
    private final String table;
    private final FoxtrotCluster client;
    private final ScheduledExecutorService executorService;
    private CloseableHttpAsyncClient httpClient;

    public HttpAsyncEventSender(FoxtrotClientConfig config, FoxtrotCluster client, EventSerializationHandler serializationHandler) throws IOReactorException {
        super(serializationHandler);
        this.table = config.getTable();
        this.client = client;
        DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
        PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager((ConnectingIOReactor)ioReactor);
        cm.setMaxTotal(100);
        this.httpClient = HttpAsyncClients.custom().setConnectionManager((NHttpClientConnectionManager)cm).build();
        Evictor connEvictor = new Evictor((NHttpClientConnectionManager)cm);
        this.executorService = Executors.newScheduledThreadPool(1);
        this.executorService.scheduleWithFixedDelay(connEvictor, 1L, 5L, TimeUnit.SECONDS);
        this.httpClient.start();
    }

    @Override
    public void send(Document document) {
        this.send(Collections.singletonList(document));
    }

    @Override
    public void send(List<Document> documents) {
        try {
            this.send(this.getSerializationHandler().serialize(documents));
        }
        catch (SerializationException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws Exception {
        this.executorService.shutdownNow();
        logger.debug("Shut down connection evictor");
        this.httpClient.close();
        logger.debug("Closed HTTP Client");
    }

    public void send(byte[] payload) {
        FoxtrotClusterMember clusterMember = this.client.member();
        Preconditions.checkNotNull((Object)clusterMember, (Object)"No members found in foxtrot cluster");
        try {
            URI requestURI = new URIBuilder().setScheme("http").setHost(clusterMember.getHost()).setPort(clusterMember.getPort()).setPath(String.format("/foxtrot/v1/document/%s/bulk", this.table)).build();
            HttpPost post = new HttpPost(requestURI);
            post.setHeader("Content-Type", MediaType.JSON_UTF_8.toString());
            post.setEntity((HttpEntity)new ByteArrayEntity(payload));
            this.httpClient.execute((HttpUriRequest)post, (FutureCallback)new FutureCallback<HttpResponse>(){

                public void completed(HttpResponse response) {
                    if (response.getStatusLine().getStatusCode() != 201) {
                        try {
                            logger.error("Could not send event: {}", (Object)EntityUtils.toString((HttpEntity)response.getEntity()));
                        }
                        catch (IOException e) {
                            logger.error("Could not deserialize API response.", (Throwable)e);
                        }
                    }
                }

                public void failed(Exception ex) {
                    logger.error("Could not send event.", (Throwable)ex);
                }

                public void cancelled() {
                    logger.error("Call to foxtrot cancelled.");
                }
            });
            logger.debug("Sent event to {}:{}", (Object)clusterMember.getHost(), (Object)clusterMember.getPort());
        }
        catch (URISyntaxException e) {
            logger.error("Invalid syntax: ", (Throwable)e);
        }
    }

    private static class Evictor
    implements Runnable {
        private final NHttpClientConnectionManager connectionManager;

        public Evictor(NHttpClientConnectionManager connectionManager) {
            this.connectionManager = connectionManager;
        }

        @Override
        public void run() {
            try {
                this.connectionManager.closeExpiredConnections();
                this.connectionManager.closeIdleConnections(5L, TimeUnit.SECONDS);
            }
            catch (Exception ex) {
                logger.error("Error cleaning up connections.", (Throwable)ex);
            }
        }
    }
}

