/*
 * Decompiled with CFR 0.152.
 */
package org.trpr.platform.integration.impl.messaging;

import com.rabbitmq.client.QueueingConsumer;
import java.util.List;
import org.springframework.beans.factory.DisposableBean;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;
import org.trpr.platform.core.util.PlatformUtils;
import org.trpr.platform.integration.impl.messaging.RabbitConnectionHolder;
import org.trpr.platform.integration.impl.messaging.RabbitMQConfiguration;
import org.trpr.platform.integration.spi.messaging.MessageConsumer;
import org.trpr.platform.integration.spi.messaging.MessagingException;

public class RabbitMQMessageConsumerImpl
implements MessageConsumer,
DisposableBean {
    private static final String ENCODING = "UTF-8";
    private static final long DEFAULT_WAIT_TIMEOUT = -1L;
    private static final Logger LOGGER = LogFactory.getLogger(RabbitMQMessageConsumerImpl.class);
    private List<RabbitMQConfiguration> rabbitMQConfigurations;
    private RabbitConnectionHolder[] rabbitConnectionHolders;
    private long totNoOfMessagesConsumed;
    private long waitTimeoutMillis = -1L;

    public void initialize() {
    }

    public List<RabbitMQConfiguration> getRabbitMQConfigurations() {
        return this.rabbitMQConfigurations;
    }

    public void setRabbitMQConfigurations(List<RabbitMQConfiguration> rabbitMQConfigurations) {
        this.rabbitMQConfigurations = rabbitMQConfigurations;
        this.rabbitConnectionHolders = new RabbitConnectionHolder[rabbitMQConfigurations.size()];
    }

    public long getWaitTimeoutMillis() {
        return this.waitTimeoutMillis;
    }

    public void setWaitTimeoutMillis(long waitTimeoutMillis) {
        this.waitTimeoutMillis = waitTimeoutMillis;
    }

    public String consumeString() throws MessagingException {
        return (String)this.consumeWithRoundRobinPolicy((boolean)true).message;
    }

    public Object consume() throws MessagingException {
        return this.consumeWithRoundRobinPolicy((boolean)false).message;
    }

    public void closeConnections() throws MessagingException {
        int i = 0;
        while (i < this.rabbitConnectionHolders.length) {
            if (this.rabbitConnectionHolders[i] != null && this.rabbitConnectionHolders[i].isValid()) {
                this.rabbitConnectionHolders[i].closeConnection();
                this.rabbitConnectionHolders[i] = null;
            }
            ++i;
        }
    }

    public int getQueueDepth() throws MessagingException {
        int noOfQueues = this.rabbitMQConfigurations.size();
        int attempt = 0;
        RabbitMQConfiguration lastUsedConfiguration = null;
        while (attempt < noOfQueues) {
            int connectionIndex = (int)(this.totNoOfMessagesConsumed % (long)noOfQueues);
            RabbitMQConfiguration rabbitMQConfiguration = lastUsedConfiguration = this.rabbitMQConfigurations.get(connectionIndex);
            try {
                int count;
                if (this.rabbitConnectionHolders[connectionIndex] == null || !this.rabbitConnectionHolders[connectionIndex].isValid()) {
                    this.validateAndInitConnection(connectionIndex, rabbitMQConfiguration);
                }
                int n = count = this.rabbitConnectionHolders[connectionIndex].getMessageCount();
                return n;
            }
            catch (Exception e) {
                LOGGER.error("Error while initializing Rabbit connection / getting message count. Will try others. Error is : " + e.getMessage(), (Throwable)e);
                this.rabbitConnectionHolders[connectionIndex] = null;
            }
            finally {
                ++attempt;
                ++this.totNoOfMessagesConsumed;
            }
        }
        throw new MessagingException("Unable to queue depth. All configurations failed!. Last failed configuration : " + lastUsedConfiguration, 100);
    }

    public void destroy() throws Exception {
        this.closeConnections();
    }

    protected MessageHolder consumeWithRoundRobinPolicy(boolean isString) throws MessagingException {
        MessageHolder messageHolder = null;
        int noOfQueues = this.rabbitMQConfigurations.size();
        int attempt = 0;
        RabbitMQConfiguration lastUsedConfiguration = null;
        Exception consumptionRootCause = null;
        while (attempt < noOfQueues) {
            int connectionIndex = (int)(this.totNoOfMessagesConsumed % (long)noOfQueues);
            RabbitMQConfiguration msgPubConfig = lastUsedConfiguration = this.rabbitMQConfigurations.get(connectionIndex);
            if (this.rabbitConnectionHolders[connectionIndex] == null || !this.rabbitConnectionHolders[connectionIndex].isValid()) {
                try {
                    this.validateAndInitConnection(connectionIndex, msgPubConfig);
                }
                catch (Exception e) {
                    LOGGER.error("Error while initializing Rabbit connection. Will try others. Error is : " + e.getMessage(), (Throwable)e);
                    ++attempt;
                    ++this.totNoOfMessagesConsumed;
                    consumptionRootCause = e;
                    continue;
                }
            }
            try {
                messageHolder = this.consumeFromConnection(isString, connectionIndex);
                if (messageHolder == null) continue;
                MessageHolder messageHolder2 = messageHolder;
                return messageHolder2;
            }
            catch (Exception e) {
                this.rabbitConnectionHolders[connectionIndex] = null;
                LOGGER.error("Error while consuming message from queue. Will try other configurations. Error is : " + e.getMessage(), (Throwable)e);
                consumptionRootCause = e;
            }
            finally {
                ++attempt;
                ++this.totNoOfMessagesConsumed;
            }
        }
        if (consumptionRootCause != null) {
            throw new MessagingException("Error consuming message from queue. Last used configuration is : " + lastUsedConfiguration, consumptionRootCause, 100);
        }
        throw new MessagingException("No messages available for consumption in queue.", 101);
    }

    protected MessageHolder consumeFromConnection(boolean isString, int connectionIndex) throws Exception {
        QueueingConsumer.Delivery delivery;
        MessageHolder messageHolder = null;
        RabbitMQConfiguration msgPubConfig = this.rabbitMQConfigurations.get(connectionIndex);
        QueueingConsumer.Delivery delivery2 = delivery = this.getWaitTimeoutMillis() > 0L ? this.rabbitConnectionHolders[connectionIndex].getConsumer().nextDelivery(this.getWaitTimeoutMillis()) : this.rabbitConnectionHolders[connectionIndex].getConsumer().nextDelivery();
        if (delivery != null) {
            MessageHolder messageHolder2 = messageHolder = isString ? new MessageHolder(connectionIndex, new String(delivery.getBody(), ENCODING)) : new MessageHolder(connectionIndex, PlatformUtils.toObject((byte[])delivery.getBody()));
            if (!msgPubConfig.isNoAck()) {
                this.rabbitConnectionHolders[connectionIndex].getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
        return messageHolder;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validateAndInitConnection(int connectionIndex, RabbitMQConfiguration rabbitMQConfiguration) {
        RabbitMQConfiguration rabbitMQConfiguration2 = rabbitMQConfiguration;
        synchronized (rabbitMQConfiguration2) {
            if (this.rabbitConnectionHolders[connectionIndex] == null || !this.rabbitConnectionHolders[connectionIndex].isValid()) {
                this.rabbitConnectionHolders[connectionIndex] = new RabbitConnectionHolder(rabbitMQConfiguration);
                this.rabbitConnectionHolders[connectionIndex].createConnection();
            }
        }
    }

    class MessageHolder {
        int connectionIndex;
        Object message;

        MessageHolder(int connectionIndex, Object message) {
            this.connectionIndex = connectionIndex;
            this.message = message;
        }
    }
}

