/*
 * Decompiled with CFR 0.152.
 */
package io.dropwizard.actors.actor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import io.dropwizard.actors.actor.ActorConfig;
import io.dropwizard.actors.actor.DelayType;
import io.dropwizard.actors.actor.MessageHandlingFunction;
import io.dropwizard.actors.connectivity.RMQConnection;
import io.dropwizard.actors.retry.RetryStrategy;
import io.dropwizard.actors.retry.RetryStrategyFactory;
import io.dropwizard.lifecycle.Managed;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnmanagedBaseActor<Message>
implements Managed {
    private static final Logger log = LoggerFactory.getLogger(UnmanagedBaseActor.class);
    private final String name;
    private final ActorConfig config;
    private final RMQConnection connection;
    private final ObjectMapper mapper;
    private final Class<? extends Message> clazz;
    private final int prefetchCount;
    private final MessageHandlingFunction<Message, Boolean> handlerFunction;
    private final Function<Throwable, Boolean> errorCheckFunction;
    private final String queueName;
    private final RetryStrategy retryStrategy;
    private Channel publishChannel;
    private List<Handler> handlers = Lists.newArrayList();

    protected UnmanagedBaseActor(String name, ActorConfig config, RMQConnection connection, ObjectMapper mapper, RetryStrategyFactory retryStrategyFactory, Class<? extends Message> clazz, MessageHandlingFunction<Message, Boolean> handlerFunction, Function<Throwable, Boolean> errorCheckFunction) {
        this.name = name;
        this.config = config;
        this.connection = connection;
        this.mapper = mapper;
        this.clazz = clazz;
        this.prefetchCount = config.getPrefetchCount();
        this.handlerFunction = handlerFunction;
        this.errorCheckFunction = errorCheckFunction;
        this.queueName = String.format("%s.%s", config.getPrefix(), name);
        this.retryStrategy = retryStrategyFactory.create(config.getRetryConfig());
    }

    private boolean handle(Message message) throws Exception {
        return this.handlerFunction.apply(message);
    }

    public final void publishWithDelay(Message message, long delayMilliseconds) throws Exception {
        log.info("Publishing message to exchange with delay: {}", (Object)delayMilliseconds);
        if (!this.config.isDelayed()) {
            log.warn("Publishing delayed message to non-delayed queue queue:{}", (Object)this.queueName);
        }
        if (this.config.getDelayType() == DelayType.TTL) {
            this.publishChannel.basicPublish(this.ttlExchange(this.config), this.queueName, new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayMilliseconds)).deliveryMode(Integer.valueOf(2)).build(), this.mapper().writeValueAsBytes(message));
        } else {
            this.publish(message, new AMQP.BasicProperties.Builder().headers(Collections.singletonMap("x-delay", delayMilliseconds)).deliveryMode(Integer.valueOf(2)).build());
        }
    }

    public final void publish(Message message) throws Exception {
        this.publish(message, MessageProperties.MINIMAL_PERSISTENT_BASIC);
    }

    public final void publish(Message message, AMQP.BasicProperties properties) throws Exception {
        this.publishChannel.basicPublish(this.config.getExchange(), this.queueName, properties, this.mapper().writeValueAsBytes(message));
    }

    public void start() throws Exception {
        String exchange = this.config.getExchange();
        String dlx = this.config.getExchange() + "_SIDELINE";
        if (this.config.isDelayed()) {
            this.ensureDelayedExchange(exchange);
        } else {
            this.ensureExchange(exchange);
        }
        this.ensureExchange(dlx);
        this.publishChannel = this.connection.newChannel();
        this.connection.ensure(this.queueName + "_SIDELINE", this.queueName, dlx);
        this.connection.ensure(this.queueName, this.config.getExchange(), this.connection.rmqOpts(dlx));
        if (this.config.getDelayType() == DelayType.TTL) {
            this.connection.ensure(this.ttlQueue(this.queueName), this.queueName, this.ttlExchange(this.config), this.connection.rmqOpts(exchange));
        }
        for (int i = 1; i <= this.config.getConcurrency(); ++i) {
            Channel consumeChannel = this.connection.newChannel();
            Handler handler = new Handler(consumeChannel, this.mapper, this.clazz, this.prefetchCount);
            String tag = consumeChannel.basicConsume(this.queueName, false, (Consumer)handler);
            handler.setTag(tag);
            this.handlers.add(handler);
            log.info("Started consumer {} of type {}", (Object)i, (Object)this.name);
        }
    }

    private void ensureExchange(String exchange) throws IOException {
        this.connection.channel().exchangeDeclare(exchange, "direct", true, false, (Map)ImmutableMap.builder().put((Object)"x-ha-policy", (Object)"all").put((Object)"ha-mode", (Object)"all").build());
    }

    private void ensureDelayedExchange(String exchange) throws IOException {
        if (this.config.getDelayType() == DelayType.TTL) {
            this.ensureExchange(this.ttlExchange(this.config));
        } else {
            this.connection.channel().exchangeDeclare(exchange, "x-delayed-message", true, false, (Map)ImmutableMap.builder().put((Object)"x-ha-policy", (Object)"all").put((Object)"ha-mode", (Object)"all").put((Object)"x-delayed-type", (Object)"direct").build());
        }
    }

    private String ttlExchange(ActorConfig actorConfig) {
        return String.format("%s_TTL", actorConfig.getExchange());
    }

    private String ttlQueue(String queueName) {
        return String.format("%s_TTL", queueName);
    }

    public void stop() throws Exception {
        try {
            this.publishChannel.close();
        }
        catch (Exception e) {
            log.error(String.format("Error closing publisher:%s", this.name), (Throwable)e);
        }
        this.handlers.forEach(handler -> {
            try {
                Channel channel = handler.getChannel();
                channel.basicCancel(handler.getTag());
                channel.close();
            }
            catch (Exception e) {
                log.error(String.format("Error cancelling consumer: %s", handler.getTag()), (Throwable)e);
            }
        });
    }

    protected final RMQConnection connection() throws Exception {
        return this.connection;
    }

    protected final ObjectMapper mapper() {
        return this.mapper;
    }

    public String getName() {
        return this.name;
    }

    public ActorConfig getConfig() {
        return this.config;
    }

    public RMQConnection getConnection() {
        return this.connection;
    }

    public ObjectMapper getMapper() {
        return this.mapper;
    }

    public Class<? extends Message> getClazz() {
        return this.clazz;
    }

    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    public MessageHandlingFunction<Message, Boolean> getHandlerFunction() {
        return this.handlerFunction;
    }

    public Function<Throwable, Boolean> getErrorCheckFunction() {
        return this.errorCheckFunction;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public RetryStrategy getRetryStrategy() {
        return this.retryStrategy;
    }

    public Channel getPublishChannel() {
        return this.publishChannel;
    }

    public List<Handler> getHandlers() {
        return this.handlers;
    }

    public void setPublishChannel(Channel publishChannel) {
        this.publishChannel = publishChannel;
    }

    public void setHandlers(List<Handler> handlers) {
        this.handlers = handlers;
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof UnmanagedBaseActor)) {
            return false;
        }
        UnmanagedBaseActor other = (UnmanagedBaseActor)o;
        if (!other.canEqual(this)) {
            return false;
        }
        String this$name = this.getName();
        String other$name = other.getName();
        if (this$name == null ? other$name != null : !this$name.equals(other$name)) {
            return false;
        }
        ActorConfig this$config = this.getConfig();
        ActorConfig other$config = other.getConfig();
        if (this$config == null ? other$config != null : !((Object)this$config).equals(other$config)) {
            return false;
        }
        RMQConnection this$connection = this.getConnection();
        RMQConnection other$connection = other.getConnection();
        if (this$connection == null ? other$connection != null : !this$connection.equals(other$connection)) {
            return false;
        }
        ObjectMapper this$mapper = this.getMapper();
        ObjectMapper other$mapper = other.getMapper();
        if (this$mapper == null ? other$mapper != null : !this$mapper.equals(other$mapper)) {
            return false;
        }
        Class<Message> this$clazz = this.getClazz();
        Class<Message> other$clazz = other.getClazz();
        if (this$clazz == null ? other$clazz != null : !this$clazz.equals(other$clazz)) {
            return false;
        }
        if (this.getPrefetchCount() != other.getPrefetchCount()) {
            return false;
        }
        MessageHandlingFunction<Message, Boolean> this$handlerFunction = this.getHandlerFunction();
        MessageHandlingFunction<Message, Boolean> other$handlerFunction = other.getHandlerFunction();
        if (this$handlerFunction == null ? other$handlerFunction != null : !this$handlerFunction.equals(other$handlerFunction)) {
            return false;
        }
        Function<Throwable, Boolean> this$errorCheckFunction = this.getErrorCheckFunction();
        Function<Throwable, Boolean> other$errorCheckFunction = other.getErrorCheckFunction();
        if (this$errorCheckFunction == null ? other$errorCheckFunction != null : !this$errorCheckFunction.equals(other$errorCheckFunction)) {
            return false;
        }
        String this$queueName = this.getQueueName();
        String other$queueName = other.getQueueName();
        if (this$queueName == null ? other$queueName != null : !this$queueName.equals(other$queueName)) {
            return false;
        }
        RetryStrategy this$retryStrategy = this.getRetryStrategy();
        RetryStrategy other$retryStrategy = other.getRetryStrategy();
        if (this$retryStrategy == null ? other$retryStrategy != null : !this$retryStrategy.equals(other$retryStrategy)) {
            return false;
        }
        Channel this$publishChannel = this.getPublishChannel();
        Channel other$publishChannel = other.getPublishChannel();
        if (this$publishChannel == null ? other$publishChannel != null : !this$publishChannel.equals(other$publishChannel)) {
            return false;
        }
        List<Handler> this$handlers = this.getHandlers();
        List<Handler> other$handlers = other.getHandlers();
        return !(this$handlers == null ? other$handlers != null : !((Object)this$handlers).equals(other$handlers));
    }

    protected boolean canEqual(Object other) {
        return other instanceof UnmanagedBaseActor;
    }

    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        String $name = this.getName();
        result = result * 59 + ($name == null ? 43 : $name.hashCode());
        ActorConfig $config = this.getConfig();
        result = result * 59 + ($config == null ? 43 : ((Object)$config).hashCode());
        RMQConnection $connection = this.getConnection();
        result = result * 59 + ($connection == null ? 43 : $connection.hashCode());
        ObjectMapper $mapper = this.getMapper();
        result = result * 59 + ($mapper == null ? 43 : $mapper.hashCode());
        Class<Message> $clazz = this.getClazz();
        result = result * 59 + ($clazz == null ? 43 : $clazz.hashCode());
        result = result * 59 + this.getPrefetchCount();
        MessageHandlingFunction<Message, Boolean> $handlerFunction = this.getHandlerFunction();
        result = result * 59 + ($handlerFunction == null ? 43 : $handlerFunction.hashCode());
        Function<Throwable, Boolean> $errorCheckFunction = this.getErrorCheckFunction();
        result = result * 59 + ($errorCheckFunction == null ? 43 : $errorCheckFunction.hashCode());
        String $queueName = this.getQueueName();
        result = result * 59 + ($queueName == null ? 43 : $queueName.hashCode());
        RetryStrategy $retryStrategy = this.getRetryStrategy();
        result = result * 59 + ($retryStrategy == null ? 43 : $retryStrategy.hashCode());
        Channel $publishChannel = this.getPublishChannel();
        result = result * 59 + ($publishChannel == null ? 43 : $publishChannel.hashCode());
        List<Handler> $handlers = this.getHandlers();
        result = result * 59 + ($handlers == null ? 43 : ((Object)$handlers).hashCode());
        return result;
    }

    public String toString() {
        return "UnmanagedBaseActor(name=" + this.getName() + ", config=" + this.getConfig() + ", connection=" + this.getConnection() + ", mapper=" + this.getMapper() + ", clazz=" + this.getClazz() + ", prefetchCount=" + this.getPrefetchCount() + ", handlerFunction=" + this.getHandlerFunction() + ", errorCheckFunction=" + this.getErrorCheckFunction() + ", queueName=" + this.getQueueName() + ", retryStrategy=" + this.getRetryStrategy() + ", publishChannel=" + this.getPublishChannel() + ", handlers=" + this.getHandlers() + ")";
    }

    private class Handler
    extends DefaultConsumer {
        private final ObjectMapper mapper;
        private final Class<? extends Message> clazz;
        private String tag;

        private Handler(Channel channel, ObjectMapper mapper, Class<? extends Message> clazz, int prefetchCount) throws Exception {
            super(channel);
            this.mapper = mapper;
            this.clazz = clazz;
            this.getChannel().basicQos(prefetchCount);
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            try {
                Object message = this.mapper.readValue(body, this.clazz);
                boolean success = UnmanagedBaseActor.this.retryStrategy.execute(() -> UnmanagedBaseActor.this.handle(message));
                if (success) {
                    this.getChannel().basicAck(envelope.getDeliveryTag(), false);
                } else {
                    this.getChannel().basicReject(envelope.getDeliveryTag(), false);
                }
            }
            catch (Throwable t) {
                log.error("Error processing message...", t);
                if (((Boolean)UnmanagedBaseActor.this.errorCheckFunction.apply(t)).booleanValue()) {
                    log.warn("Acked message due to exception: ", t);
                    this.getChannel().basicAck(envelope.getDeliveryTag(), false);
                }
                this.getChannel().basicReject(envelope.getDeliveryTag(), false);
            }
        }

        public String getTag() {
            return this.tag;
        }

        public void setTag(String tag) {
            this.tag = tag;
        }
    }
}

