/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.drift.api.service;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
import com.flipkart.drift.api.exception.ApiException;
import com.flipkart.drift.commons.utils.MetricsRegistry;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.dropwizard.lifecycle.Managed;
import io.temporal.client.WorkflowNotFoundException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisSentinelPool;

@Singleton
public class RedisPubSubService
implements Managed {
    private static final Logger log = LoggerFactory.getLogger(RedisPubSubService.class);
    private final JedisSentinelPool jedisSentinelPool;
    private final ExecutorService redisThreadPool;

    @Inject
    public RedisPubSubService(JedisSentinelPool jedisSentinelPool) {
        this.jedisSentinelPool = jedisSentinelPool;
        this.redisThreadPool = new ThreadPoolExecutor(10, 50, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy());
        this.publishGaugeMetrics();
    }

    private void publishGaugeMetrics() {
        MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)() -> ((JedisSentinelPool)this.jedisSentinelPool).getNumActive()), (String[])new String[]{"jedis", "numActiveConnections"});
        MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)() -> ((JedisSentinelPool)this.jedisSentinelPool).getNumIdle()), (String[])new String[]{"jedis", "numIdleConnections"});
        MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)() -> ((JedisSentinelPool)this.jedisSentinelPool).getNumWaiters()), (String[])new String[]{"jedis", "numWaiters"});
        if (this.redisThreadPool instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)this.redisThreadPool;
            MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)threadPoolExecutor::getActiveCount), (String[])new String[]{"threadpool", "activeThreads"});
            MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)threadPoolExecutor::getPoolSize), (String[])new String[]{"threadpool", "poolSize"});
            MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)threadPoolExecutor::getCompletedTaskCount), (String[])new String[]{"threadpool", "completedTasks"});
            MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)threadPoolExecutor::getTaskCount), (String[])new String[]{"threadpool", "totalTasks"});
            MetricsRegistry.registerGauge(this.getClass(), (Metric)((Gauge)threadPoolExecutor::getLargestPoolSize), (String[])new String[]{"threadpool", "largestPoolSize"});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribeAndExecute(String workflowId, Callable<Void> onSubscribeAction, String action) {
        CompletableFuture<Void> redisFuture = new CompletableFuture<Void>();
        String channelName = "+async-await:" + workflowId;
        JedisPubSub pubSub = this.createPubSubListener(redisFuture, onSubscribeAction, action);
        Future<?> redisTask = null;
        try {
            redisTask = this.submitRedisSubscriptionTask(channelName, pubSub, redisFuture);
            this.waitForRedisResponse(pubSub, redisFuture, channelName);
        }
        finally {
            if (redisTask != null) {
                redisTask.cancel(true);
            }
        }
    }

    private JedisPubSub createPubSubListener(final CompletableFuture<Void> redisFuture, final Callable<Void> onSubscribeAction, final String action) {
        return new JedisPubSub(){

            public void onMessage(String channel, String message) {
                log.info("Message received on channel {}:{}", (Object)channel, (Object)message);
                redisFuture.complete(null);
                RedisPubSubService.this.safeUnsubscribe(this, "receiving message", channel);
            }

            public void onSubscribe(String channel, int subscribedChannels) {
                log.info("Subscribed to channel: {}", (Object)channel);
                try (Timer.Context ignored = MetricsRegistry.timerContext(((Object)((Object)this)).getClass(), (String[])new String[]{action, "latency"});){
                    onSubscribeAction.call();
                }
                catch (Exception e) {
                    MetricsRegistry.markMeter(((Object)((Object)this)).getClass(), (String[])new String[]{"onSubscribeAction", "exception"});
                    log.error("Exception during onSubscribe action for channel: {}", (Object)channel, (Object)e);
                    RedisPubSubService.this.safeUnsubscribe(this, "onSubscribeAction error", channel);
                    redisFuture.completeExceptionally(e);
                }
            }
        };
    }

    private Future<?> submitRedisSubscriptionTask(String channelName, JedisPubSub pubSub, CompletableFuture<Void> redisFuture) {
        try {
            return this.redisThreadPool.submit(() -> {
                try (Jedis jedis = this.jedisSentinelPool.getResource();){
                    jedis.subscribe(pubSub, new String[]{channelName});
                }
                catch (Exception e) {
                    MetricsRegistry.markMeter(this.getClass(), (String[])new String[]{"redis", "exception"});
                    log.error("Error subscribing to Redis channel: {}: {}", new Object[]{channelName, e.getMessage(), e});
                    this.safeUnsubscribe(pubSub, "Redis exception", channelName);
                    redisFuture.completeExceptionally(e);
                }
            });
        }
        catch (RejectedExecutionException e) {
            MetricsRegistry.markMeter(this.getClass(), (String[])new String[]{"threadPool", "rejected", "exception"});
            log.error("Redis subscription task was rejected for channel: {}: {}", new Object[]{channelName, e.getMessage(), e});
            throw new ApiException(Response.Status.INTERNAL_SERVER_ERROR, "Redis subscription task was rejected");
        }
    }

    private void waitForRedisResponse(JedisPubSub pubSub, CompletableFuture<Void> redisFuture, String channelName) {
        try {
            redisFuture.get(5L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            MetricsRegistry.markMeter(this.getClass(), (String[])new String[]{"subscribe", "timeout"});
            log.warn("Timeout waiting for Redis event on channel: {}. Unsubscribing and cancelling task.", (Object)channelName);
            this.safeUnsubscribe(pubSub, "timeout", channelName);
            throw new ApiException(Response.Status.REQUEST_TIMEOUT, "Timeout waiting for workflow response");
        }
        catch (InterruptedException e) {
            this.logAndMarkMeter(channelName, e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof WorkflowNotFoundException) {
                throw (WorkflowNotFoundException)cause;
            }
            this.logAndMarkMeter(channelName, e);
        }
    }

    private void logAndMarkMeter(String channelName, Exception e) {
        MetricsRegistry.markMeter(this.getClass(), (String[])new String[]{"subscribe", e.getClass().getName()});
        log.error("Error waiting for Redis event on channel: {}: {}", new Object[]{channelName, e.getMessage(), e});
        throw new ApiException(Response.Status.INTERNAL_SERVER_ERROR, "Error waiting for workflow response");
    }

    private void safeUnsubscribe(JedisPubSub pubSub, String context, String channel) {
        try {
            if (pubSub.isSubscribed()) {
                pubSub.unsubscribe();
            }
        }
        catch (Exception ex) {
            log.warn("Unsubscribe failed after {} for channel: {}", new Object[]{context, channel, ex});
        }
    }

    public void start() {
        log.info("RedisPubSubService started");
    }

    public void stop() {
        this.shutdown();
        log.info("RedisPubSubService stopped");
    }

    public void shutdown() {
        this.redisThreadPool.shutdownNow();
    }

    public ExecutorService getRedisThreadPool() {
        return this.redisThreadPool;
    }
}

