/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.impl.redriver;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.remote.RemoteActorRefProvider;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;
import akka.routing.RoutingLogic;
import com.flipkart.flux.impl.message.TaskRedriverDetails;
import com.flipkart.flux.impl.redriver.AkkaRedriverWorker;
import com.flipkart.flux.redriver.model.ScheduledMessage;
import com.flipkart.flux.redriver.scheduler.MessageScheduler;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;

public class AkkaRedriverTask
extends UntypedActor {
    private LoggingAdapter logger = Logging.getLogger((ActorSystem)this.getContext().system(), (Object)((Object)this));
    private AtomicBoolean isLeader = new AtomicBoolean(false);
    private Cluster cluster = Cluster.get((ActorSystem)this.getContext().system());
    private Router router;
    @Inject
    private static MessageScheduler messageScheduler;

    public AkkaRedriverTask(int noOfRedriverWorkers) {
        ArrayList<ActorRefRoutee> routees = new ArrayList<ActorRefRoutee>();
        for (int i = 0; i < noOfRedriverWorkers; ++i) {
            ActorRef r = this.getContext().actorOf(Props.create(AkkaRedriverWorker.class, (Object[])new Object[0]));
            this.getContext().watch(r);
            routees.add(new ActorRefRoutee(r));
        }
        this.router = new Router((RoutingLogic)new RoundRobinRoutingLogic(), routees);
    }

    public void preStart() {
        this.cluster.subscribe(this.getSelf(), new Class[]{ClusterEvent.LeaderChanged.class});
    }

    public void postStop() {
        this.cluster.unsubscribe(this.getSelf());
    }

    public void onReceive(Object message) throws Exception {
        if (message instanceof ClusterEvent.LeaderChanged) {
            this.checkAndSetAsLeader(((ClusterEvent.LeaderChanged)message).getLeader());
        } else if (message instanceof ClusterEvent.CurrentClusterState) {
            this.checkAndSetAsLeader(((ClusterEvent.CurrentClusterState)message).getLeader());
        } else if (message instanceof Terminated) {
            this.router = this.router.removeRoutee(((Terminated)message).actor());
            ActorRef r = this.getContext().actorOf(Props.create(AkkaRedriverWorker.class, (Object[])new Object[0]));
            this.getContext().watch(r);
            this.router = this.router.addRoutee((Routee)new ActorRefRoutee(r));
        } else if (message instanceof TaskRedriverDetails) {
            TaskRedriverDetails redriverDetails = (TaskRedriverDetails)message;
            switch (redriverDetails.getAction()) {
                case Register: {
                    this.logger.debug("Register task : {} for redriver with time : {}", (Object)redriverDetails.getTaskId(), (Object)redriverDetails.getRedriverDelay());
                    messageScheduler.addMessage(new ScheduledMessage(redriverDetails.getTaskId(), Long.valueOf(System.currentTimeMillis() + redriverDetails.getRedriverDelay())));
                    break;
                }
                case Deregister: {
                    this.logger.debug("DeRegister task : {} with redriver", (Object)redriverDetails.getTaskId());
                    messageScheduler.removeMessage(redriverDetails.getTaskId());
                    break;
                }
                case Redrive: {
                    this.logger.debug("Redrive task with Id : {} ", (Object)redriverDetails.getTaskId());
                    this.router.route(message, this.getSelf());
                }
            }
        } else {
            this.logger.error("Redriver Task received a message that it cannot process. Message type received is : {}", (Object)message.getClass().getName());
            this.unhandled(message);
        }
    }

    private void checkAndSetAsLeader(Address leaderAddress) {
        Address selfAddress = ((RemoteActorRefProvider)this.getContext().system().provider()).transport().defaultAddress();
        if (leaderAddress.equals((Object)selfAddress)) {
            if (!this.isLeader.get()) {
                this.isLeader.set(true);
                this.logger.info("Promoting Actor at address : {} as the leader for cluster-wide redriver Scheduler", (Object)leaderAddress);
                messageScheduler.start();
            }
        } else {
            this.isLeader.set(false);
            this.logger.info("Demoting Actor at address : {} as the leader for cluster-wide redriver Scheduler", (Object)leaderAddress);
            messageScheduler.stop();
        }
    }
}

