/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.pipeline;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.olacabs.fabric.compute.comms.ChannelFactory;
import com.olacabs.fabric.compute.comms.CommsChannel;
import com.olacabs.fabric.compute.pipeline.MessageSource;
import com.olacabs.fabric.compute.pipeline.PipelineMessage;
import com.olacabs.fabric.compute.pipeline.PipelineStage;
import com.olacabs.fabric.compute.source.PipelineStreamSource;
import com.olacabs.fabric.compute.tracking.SimpleBitSet;
import com.olacabs.fabric.model.event.EventSet;
import java.lang.invoke.LambdaMetafactory;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationBus {
    private static final Logger log = LoggerFactory.getLogger(NotificationBus.class);
    private final Map<Long, SimpleBitSet> tracker = Maps.newConcurrentMap();
    private Properties properties;
    private Map<Integer, Connection> connections = Maps.newHashMap();
    private Map<Integer, Communicator> comms = Maps.newHashMap();
    private Map<Integer, PipelineStreamSource> sources = Maps.newHashMap();

    public NotificationBus(Properties properties) {
        this.properties = properties;
        log.info("Notification bus created...");
    }

    public NotificationBus source(PipelineStreamSource streamSource) {
        this.sources.put(streamSource.communicationId(), streamSource);
        return this;
    }

    public NotificationBus connect(MessageSource to, PipelineStage ... pipelineStages) {
        if (!this.connections.containsKey(to.communicationId())) {
            this.connections.put(to.communicationId(), new Connection());
        }
        for (PipelineStage pipelineStage : pipelineStages) {
            this.connections.get(to.communicationId()).addConnection(pipelineStage.communicationId());
            if (this.comms.containsKey(pipelineStage.communicationId())) continue;
            this.comms.put(pipelineStage.communicationId(), Communicator.builder().commsChannel(ChannelFactory.create(this.properties, pipelineStage.name(), false, pipelineStage)).pipelineStage(pipelineStage).build());
        }
        return this;
    }

    public synchronized void publish(PipelineMessage message, int from) {
        this.publish(message, from, true);
    }

    /*
     * Unable to fully structure code
     */
    public synchronized void publish(PipelineMessage message, int from, boolean forward) {
        switch (1.$SwitchMap$com$olacabs$fabric$compute$pipeline$PipelineMessage$Type[message.getMessageType().ordinal()]) {
            case 1: {
                Communicator.access$000(this.comms.get(from)).publish(message);
                break;
            }
            case 2: {
                actionableMessage = message;
                ackCandidatesBuilder = ImmutableSet.builder();
                if (message.getMessages().isAggregate()) ** GOTO lbl32
                while (null != actionableMessage.getParent()) {
                    actionableMessage = actionableMessage.getParent();
                }
                if (!this.tracker.containsKey(actionableMessage.getMessages().getId())) {
                    this.tracker.put(actionableMessage.getMessages().getId(), new SimpleBitSet(64));
                }
                msgBitSet = this.tracker.get(actionableMessage.getMessages().getId());
                try {
                    if (!forward || !this.connections.containsKey(from)) ** GOTO lbl28
                    if (!this.comms.containsKey(from)) ** GOTO lbl23
                    if (!Communicator.access$100(this.comms.get(from)).sendsNormalMessage()) ** GOTO lbl28
lbl23:
                    // 2 sources

                    Connection.access$200(this.connections.get(from)).forEach((Consumer<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, set(int ), (Ljava/lang/Integer;)V)((SimpleBitSet)msgBitSet));
                }
                catch (Exception e) {
                    NotificationBus.log.error("Error setting tracking bits for generator: " + Integer.toString(from), (Throwable)e);
                }
lbl28:
                // 4 sources

                msgBitSet.unset(from);
                if (!msgBitSet.hasSetBits() && actionableMessage.getMessages().isSourceGenerated()) {
                    ackCandidatesBuilder.add((Object)actionableMessage.getMessages());
                }
lbl32:
                // 4 sources

                try {
                    if (forward && this.connections.containsKey(from)) {
                        Connection.access$200(this.connections.get(from)).forEach((Consumer<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$publish$0(com.olacabs.fabric.compute.pipeline.PipelineMessage java.lang.Integer ), (Ljava/lang/Integer;)V)((NotificationBus)this, (PipelineMessage)message));
                    }
                }
                catch (Throwable t) {
                    NotificationBus.log.error("Error sending event to children for " + Integer.toString(from), t);
                }
                ackSet = ackCandidatesBuilder.build();
                ackSet.forEach((Consumer<EventSet>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$publish$1(com.olacabs.fabric.model.event.EventSet ), (Lcom/olacabs/fabric/model/event/EventSet;)V)((NotificationBus)this));
                if (ackSet.isEmpty()) break;
                this.tracker.remove(actionableMessage.getMessages().getId());
                break;
            }
        }
    }

    public void start() {
        this.connections = ImmutableMap.copyOf(this.connections);
        this.comms.values().forEach(communicator -> ((Communicator)communicator).commsChannel.start());
    }

    public void stop() {
        this.comms.values().forEach(communicator -> ((Communicator)communicator).commsChannel.stop());
    }

    private /* synthetic */ void lambda$publish$1(EventSet eventSet) {
        this.sources.get(eventSet.getSourceId()).ackMessage(eventSet);
    }

    private /* synthetic */ void lambda$publish$0(PipelineMessage message, Integer pipeLineStage) {
        this.comms.get(pipeLineStage).commsChannel.publish(message);
    }

    public static class Communicator {
        private MessageSource pipelineStage;
        private CommsChannel<PipelineMessage> commsChannel;

        Communicator(MessageSource pipelineStage, CommsChannel<PipelineMessage> commsChannel) {
            this.pipelineStage = pipelineStage;
            this.commsChannel = commsChannel;
        }

        public static CommunicatorBuilder builder() {
            return new CommunicatorBuilder();
        }

        static /* synthetic */ MessageSource access$100(Communicator x0) {
            return x0.pipelineStage;
        }

        public static class CommunicatorBuilder {
            private MessageSource pipelineStage;
            private CommsChannel<PipelineMessage> commsChannel;

            CommunicatorBuilder() {
            }

            public CommunicatorBuilder pipelineStage(MessageSource pipelineStage) {
                this.pipelineStage = pipelineStage;
                return this;
            }

            public CommunicatorBuilder commsChannel(CommsChannel<PipelineMessage> commsChannel) {
                this.commsChannel = commsChannel;
                return this;
            }

            public Communicator build() {
                return new Communicator(this.pipelineStage, this.commsChannel);
            }

            public String toString() {
                return "NotificationBus.Communicator.CommunicatorBuilder(pipelineStage=" + this.pipelineStage + ", commsChannel=" + this.commsChannel + ")";
            }
        }
    }

    private static class Connection {
        private final Set<Integer> pipelineStages = Sets.newHashSet();

        Connection() {
        }

        Connection addConnection(int pipelineStage) {
            this.pipelineStages.add(pipelineStage);
            return this;
        }

        static /* synthetic */ Set access$200(Connection x0) {
            return x0.pipelineStages;
        }
    }
}

