/*
 * Decompiled with CFR 0.152.
 */
package org.trpr.platform.integration.impl.messaging;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.List;
import org.springframework.beans.factory.DisposableBean;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;
import org.trpr.platform.core.util.PlatformUtils;
import org.trpr.platform.integration.impl.messaging.RabbitConnectionHolder;
import org.trpr.platform.integration.impl.messaging.RabbitMQConfiguration;
import org.trpr.platform.integration.spi.messaging.MessagePublisher;
import org.trpr.platform.integration.spi.messaging.MessagingException;

public class RabbitMQMessagePublisherImpl
implements MessagePublisher,
DisposableBean {
    private static final String ENCODING = "UTF-8";
    private static final Logger LOGGER = LogFactory.getLogger(RabbitMQMessagePublisherImpl.class);
    private List<RabbitMQConfiguration> rabbitMQConfigurations;
    private RabbitConnectionHolder[] rabbitConnectionHolders;
    private long totNoOfMessagesQueued;

    public void initialize() {
    }

    public List<RabbitMQConfiguration> getRabbitMQConfigurations() {
        return this.rabbitMQConfigurations;
    }

    public void setRabbitMQConfigurations(List<RabbitMQConfiguration> rabbitMQConfigurations) {
        this.rabbitMQConfigurations = rabbitMQConfigurations;
        this.rabbitConnectionHolders = new RabbitConnectionHolder[rabbitMQConfigurations.size()];
    }

    public void publishString(String string) throws MessagingException {
        this.publish(string);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void publish(Object message) throws MessagingException {
        if (null == message) {
            throw new MessagingException("Message parameter cannot be null");
        }
        int noOfQueues = this.rabbitMQConfigurations.size();
        int attempt = 0;
        RabbitMQConfiguration lastUsedConfiguration = null;
        while (attempt < noOfQueues) {
            int connectionIndex = (int)(this.totNoOfMessagesQueued % (long)noOfQueues);
            RabbitMQConfiguration rabbitMQConfiguration = lastUsedConfiguration = this.rabbitMQConfigurations.get(connectionIndex);
            if (this.rabbitConnectionHolders[connectionIndex] == null || !this.rabbitConnectionHolders[connectionIndex].isValid()) {
                try {
                    RabbitMQConfiguration rabbitMQConfiguration2 = rabbitMQConfiguration;
                    synchronized (rabbitMQConfiguration2) {
                        if (this.rabbitConnectionHolders[connectionIndex] == null) {
                            this.rabbitConnectionHolders[connectionIndex] = new RabbitConnectionHolder(rabbitMQConfiguration);
                            this.rabbitConnectionHolders[connectionIndex].createConnection();
                        }
                    }
                }
                catch (Exception e) {
                    LOGGER.error("Error while initializing Rabbit connection. Will try others. Error is : " + e.getMessage(), (Throwable)e);
                    ++attempt;
                    ++this.totNoOfMessagesQueued;
                    continue;
                }
            }
            try {
                AMQP.BasicProperties msgProps;
                byte[] body;
                boolean isMessageOfTypeString = message instanceof String;
                byte[] byArray = body = isMessageOfTypeString ? ((String)message).getBytes(ENCODING) : PlatformUtils.toBytes((Object)message);
                AMQP.BasicProperties basicProperties = rabbitMQConfiguration.isDurable() ? (isMessageOfTypeString ? MessageProperties.PERSISTENT_TEXT_PLAIN : MessageProperties.PERSISTENT_BASIC) : (msgProps = isMessageOfTypeString ? MessageProperties.TEXT_PLAIN : MessageProperties.BASIC);
                if (rabbitMQConfiguration.isDurable()) {
                    Channel channel = this.rabbitConnectionHolders[connectionIndex].getChannel();
                    synchronized (channel) {
                        this.rabbitConnectionHolders[connectionIndex].getChannel().basicPublish(rabbitMQConfiguration.getExchangeName(), rabbitMQConfiguration.getRoutingKey(), msgProps, body);
                        if ((this.totNoOfMessagesQueued + 1L) % (long)rabbitMQConfiguration.getDurableMessageCommitCount() != 0L) return;
                        if (rabbitMQConfiguration.isDisableTX()) {
                            LOGGER.error("Configuration conflict. TX disabled for message publishing on durable queue. Message will not be published.");
                            return;
                        }
                        this.rabbitConnectionHolders[connectionIndex].getChannel().txCommit();
                        return;
                    }
                } else {
                    this.rabbitConnectionHolders[connectionIndex].getChannel().basicPublish(rabbitMQConfiguration.getExchangeName(), rabbitMQConfiguration.getRoutingKey(), msgProps, body);
                }
                return;
            }
            catch (Exception e) {
                this.rabbitConnectionHolders[connectionIndex] = null;
                LOGGER.error("Error while publishing message into queue. Will try other configurations. Error is : " + e.getMessage(), (Throwable)e);
            }
            finally {
                ++attempt;
                ++this.totNoOfMessagesQueued;
            }
        }
        throw new MessagingException("Error while publishing message into queue. All configurations failed!. Last failed configuration : " + lastUsedConfiguration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeConnections() throws MessagingException {
        for (int i = 0; i < this.rabbitConnectionHolders.length; ++i) {
            if (this.rabbitConnectionHolders[i] == null || !this.rabbitConnectionHolders[i].isValid()) continue;
            if (this.rabbitMQConfigurations.get(i).isDurable() && !this.rabbitMQConfigurations.get(i).isDisableTX()) {
                try {
                    Channel channel = this.rabbitConnectionHolders[i].getChannel();
                    synchronized (channel) {
                        this.rabbitConnectionHolders[i].getChannel().txCommit();
                    }
                }
                catch (IOException e) {
                    LOGGER.error("Error committing remaining durable messages. Messages will be lost. Continuing to close connection for this configuration : " + this.rabbitMQConfigurations.get(i));
                }
            }
            this.rabbitConnectionHolders[i].closeConnection();
            this.rabbitConnectionHolders[i] = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getQueueDepth() throws MessagingException {
        int noOfQueues = this.rabbitMQConfigurations.size();
        int attempt = 0;
        RabbitMQConfiguration lastUsedConfiguration = null;
        while (attempt < noOfQueues) {
            int connectionIndex = (int)(this.totNoOfMessagesQueued % (long)noOfQueues);
            RabbitMQConfiguration RabbitMQConfiguration2 = lastUsedConfiguration = this.rabbitMQConfigurations.get(connectionIndex);
            try {
                int count;
                if (this.rabbitConnectionHolders[connectionIndex] == null || !this.rabbitConnectionHolders[connectionIndex].isValid()) {
                    RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration2;
                    synchronized (rabbitMQConfiguration) {
                        if (this.rabbitConnectionHolders[connectionIndex] == null) {
                            this.rabbitConnectionHolders[connectionIndex] = new RabbitConnectionHolder(RabbitMQConfiguration2);
                            this.rabbitConnectionHolders[connectionIndex].createConnection();
                        }
                    }
                }
                int n = count = this.rabbitConnectionHolders[connectionIndex].getMessageCount();
                return n;
            }
            catch (Exception e) {
                LOGGER.error("Error while initializing Rabbit connection / getting message count. Will try others. Error is : " + e.getMessage(), (Throwable)e);
                this.rabbitConnectionHolders[connectionIndex] = null;
            }
            finally {
                ++attempt;
                ++this.totNoOfMessagesQueued;
            }
        }
        throw new MessagingException("Error while getting queue depth. All configurations failed!. Last failed configuration : " + lastUsedConfiguration);
    }

    public void destroy() throws Exception {
        this.closeConnections();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RabbitConnectionHolder[] getRabbitConnectionHolders() {
        for (int i = 0; i < this.rabbitConnectionHolders.length; ++i) {
            if (this.rabbitConnectionHolders[i] != null && this.rabbitConnectionHolders[i].isValid()) continue;
            try {
                RabbitMQConfiguration rabbitMQConfiguration = this.rabbitMQConfigurations.get(i);
                synchronized (rabbitMQConfiguration) {
                    if (this.rabbitConnectionHolders[i] == null) {
                        this.rabbitConnectionHolders[i] = new RabbitConnectionHolder(this.rabbitMQConfigurations.get(i));
                        this.rabbitConnectionHolders[i].createConnection();
                    }
                    continue;
                }
            }
            catch (MessagingException e) {
                LOGGER.error("Error initiazlizing Rabbit connection. Connection not available for configuration : " + this.rabbitMQConfigurations.get(i), (Throwable)e);
            }
        }
        return this.rabbitConnectionHolders;
    }
}

