/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.emp.connector;

import com.salesforce.emp.connector.BayeuxParameters;
import com.salesforce.emp.connector.CannotSubscribe;
import com.salesforce.emp.connector.ReplayExtension;
import com.salesforce.emp.connector.TopicSubscription;
import java.net.ConnectException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmpConnector {
    private static final String ERROR = "error";
    private static final String FAILURE = "failure";
    public static long REPLAY_FROM_EARLIEST = -2L;
    public static long REPLAY_FROM_TIP = -1L;
    private static String AUTHORIZATION = "Authorization";
    private static final Logger log = LoggerFactory.getLogger(EmpConnector.class);
    private volatile BayeuxClient client;
    private final HttpClient httpClient;
    private volatile ScheduledFuture<?> keepAlive;
    private final BayeuxParameters parameters;
    private final ConcurrentMap<String, Long> replay = new ConcurrentHashMap<String, Long>();
    private final AtomicBoolean running = new AtomicBoolean();
    private final ScheduledExecutorService scheduler;

    public EmpConnector(BayeuxParameters parameters) {
        this(parameters, Executors.newSingleThreadScheduledExecutor());
    }

    public EmpConnector(BayeuxParameters parameters, ScheduledExecutorService scheduler) {
        this.parameters = parameters;
        this.httpClient = new HttpClient(parameters.sslContextFactory());
        this.httpClient.getProxyConfiguration().getProxies().addAll(parameters.proxies());
        this.scheduler = scheduler;
    }

    public Future<Boolean> start() {
        if (this.running.compareAndSet(false, true)) {
            return this.connect();
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        future.complete(true);
        return future;
    }

    public void stop() {
        if (!this.running.compareAndSet(true, false)) {
            return;
        }
        if (this.keepAlive != null) {
            this.keepAlive.cancel(true);
            this.keepAlive = null;
        }
        if (this.client != null) {
            this.client.disconnect();
            this.client = null;
        }
        if (this.httpClient != null) {
            try {
                this.httpClient.stop();
            }
            catch (Exception e) {
                log.error("Unable to stop HTTP transport[{}]", (Object)this.parameters.endpoint(), (Object)e);
            }
        }
    }

    public Future<TopicSubscription> subscribe(String topic, long replayFrom, Consumer<Map<String, Object>> consumer) {
        if (!this.running.get()) {
            throw new IllegalStateException(String.format("Connector[%s} has not been started", this.parameters.endpoint()));
        }
        if (this.replay.putIfAbsent(topic, replayFrom) != null) {
            throw new IllegalStateException(String.format("Already subscribed to %s [%s]", topic, this.parameters.endpoint()));
        }
        ClientSessionChannel channel = this.client.getChannel(topic);
        SubscriptionImpl subscription = new SubscriptionImpl(topic);
        CompletableFuture<TopicSubscription> future = new CompletableFuture<TopicSubscription>();
        channel.subscribe((c, message) -> consumer.accept(message.getDataAsMap()), (c, message) -> {
            if (message.isSuccessful()) {
                future.complete(subscription);
            } else {
                Object error = message.get((Object)ERROR);
                if (error == null) {
                    error = message.get((Object)FAILURE);
                }
                future.completeExceptionally(new CannotSubscribe(this.parameters.endpoint(), topic, replayFrom, error != null ? error : message));
            }
        });
        return future;
    }

    public Future<TopicSubscription> subscribeEarliest(String topic, Consumer<Map<String, Object>> consumer) {
        return this.subscribe(topic, REPLAY_FROM_EARLIEST, consumer);
    }

    public Future<TopicSubscription> subscribeTip(String topic, Consumer<Map<String, Object>> consumer) {
        return this.subscribe(topic, REPLAY_FROM_TIP, consumer);
    }

    private Future<Boolean> connect() {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.replay.clear();
        try {
            this.httpClient.start();
        }
        catch (Exception e) {
            log.error("Unable to start HTTP transport[{}]", (Object)this.parameters.endpoint(), (Object)e);
            this.running.set(false);
            future.complete(false);
            return future;
        }
        LongPollingTransport httpTransport = new LongPollingTransport(this.parameters.longPollingOptions(), this.httpClient){

            protected void customize(Request request) {
                request.header(AUTHORIZATION, EmpConnector.this.parameters.bearerToken());
            }
        };
        this.client = new BayeuxClient(this.parameters.endpoint().toExternalForm(), (ClientTransport)httpTransport, new ClientTransport[0]){

            public void onFailure(Throwable failure, List<? extends Message> messages) {
                System.out.print("BayeuxClient.onFailure: ");
                failure.printStackTrace();
                System.out.println("Messages: " + messages);
                System.out.println("State: " + this.getState());
            }

            protected void onTransportFailure(String oldTransportName, String newTransportName, Throwable failure) {
                super.onTransportFailure(oldTransportName, newTransportName, failure);
                System.out.print("BayeuxClient.onTransportFailure: ");
                failure.printStackTrace();
                System.out.println("oldTransportName: " + oldTransportName);
                System.out.println("newTransportName: " + newTransportName);
            }
        };
        this.client.addExtension((ClientSession.Extension)new ReplayExtension(this.replay));
        this.client.handshake((c, m) -> {
            if (!m.isSuccessful()) {
                Object error = m.get((Object)ERROR);
                if (error == null) {
                    error = m.get((Object)FAILURE);
                }
                future.completeExceptionally(new ConnectException(String.format("Cannot connect [%s] : %s", this.parameters.endpoint(), error)));
                this.running.set(false);
            } else {
                this.keepAlive = this.scheduler.scheduleAtFixedRate(() -> {
                    log.debug("keepAlive1");
                    if (this.running.get()) {
                        log.debug("keepAlive2");
                        this.client.handshake();
                    }
                }, this.parameters.keepAlive(), this.parameters.keepAlive(), this.parameters.keepAliveUnit());
                future.complete(true);
            }
        });
        return future;
    }

    private class SubscriptionImpl
    implements TopicSubscription {
        private final String topic;

        private SubscriptionImpl(String topic) {
            this.topic = topic;
        }

        @Override
        public void cancel() {
            EmpConnector.this.replay.remove(this.topic);
            if (EmpConnector.this.running.get() && EmpConnector.this.client != null) {
                EmpConnector.this.client.getChannel(this.topic).unsubscribe();
            }
        }

        @Override
        public long getReplayFrom() {
            return EmpConnector.this.replay.getOrDefault(this.topic, REPLAY_FROM_EARLIEST);
        }

        @Override
        public String getTopic() {
            return this.topic;
        }

        public String toString() {
            return String.format("Subscription [%s:%s]", this.getTopic(), this.getReplayFrom());
        }
    }
}

