/*
 * Decompiled with CFR 0.152.
 */
package io.appform.foxtrot.client.senders;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.squareup.okhttp.ConnectionPool;
import com.squareup.okhttp.OkHttpClient;
import feign.Client;
import feign.Feign;
import feign.Logger;
import feign.Response;
import feign.slf4j.Slf4jLogger;
import io.appform.foxtrot.client.Document;
import io.appform.foxtrot.client.EventSender;
import io.appform.foxtrot.client.FoxtrotClientConfig;
import io.appform.foxtrot.client.cluster.FoxtrotClusterMember;
import io.appform.foxtrot.client.cluster.IFoxtrotCluster;
import io.appform.foxtrot.client.selectors.FoxtrotTarget;
import io.appform.foxtrot.client.senders.FoxtrotHttpClient;
import io.appform.foxtrot.client.serialization.EventSerializationHandler;
import io.appform.foxtrot.client.serialization.SerializationException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpAsyncEventSender
extends EventSender {
    private static final Logger logger = LoggerFactory.getLogger((String)HttpAsyncEventSender.class.getSimpleName());
    private static final Slf4jLogger slf4jLogger = new Slf4jLogger();
    private String table;
    private IFoxtrotCluster client;
    private FoxtrotHttpClient httpClient;
    private ListeningExecutorService executorService = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));

    public HttpAsyncEventSender(FoxtrotClientConfig config, IFoxtrotCluster client, EventSerializationHandler serializationHandler) {
        super(serializationHandler);
        this.table = config.getTable();
        this.client = client;
        OkHttpClient okHttpClient = new OkHttpClient();
        okHttpClient.setConnectionPool(new ConnectionPool(config.getMaxConnections(), config.getKeepAliveTimeMillis()));
        this.httpClient = (FoxtrotHttpClient)Feign.builder().client((Client)new feign.okhttp.OkHttpClient(okHttpClient)).logger((feign.Logger)slf4jLogger).logLevel(Logger.Level.BASIC).target(new FoxtrotTarget<FoxtrotHttpClient>(FoxtrotHttpClient.class, "foxtrot", client));
    }

    @Override
    public void send(Document document) throws Exception {
        this.send(this.table, document);
    }

    @Override
    public void send(String table, Document document) throws Exception {
        this.send(table, Collections.singletonList(document));
    }

    @Override
    public void send(List<Document> documents) throws Exception {
        this.send(this.table, documents);
    }

    @Override
    public void send(String table, List<Document> documents) throws Exception {
        try {
            this.send(table, this.getSerializationHandler().serialize(documents));
        }
        catch (SerializationException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() throws Exception {
    }

    public void send(final String table, final byte[] payload) {
        final FoxtrotClusterMember clusterMember = this.client.member();
        Preconditions.checkNotNull((Object)clusterMember, (Object)"No members found in foxtrot cluster");
        ListenableFuture response = this.executorService.submit((Callable)new Callable<Response>(){

            @Override
            public Response call() throws Exception {
                return HttpAsyncEventSender.this.httpClient.send(table, payload);
            }
        });
        Futures.addCallback((ListenableFuture)response, (FutureCallback)new FutureCallback<Response>(){

            public void onSuccess(Response response) {
                logger.debug("table={} messages_sent host={} port={}", new Object[]{table, clusterMember.getHost(), clusterMember.getPort()});
            }

            public void onFailure(Throwable throwable) {
                logger.error("table={} message_sending_failed", (Object)new Object[]{table}, (Object)throwable);
            }
        });
    }
}

