/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.impl.task;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.DiagnosticLoggingAdapter;
import akka.event.Logging;
import akka.routing.ActorRefRoutee;
import akka.routing.Routee;
import akka.routing.Router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.flipkart.flux.api.EventData;
import com.flipkart.flux.api.EventDefinition;
import com.flipkart.flux.api.ExecutionUpdateData;
import com.flipkart.flux.api.Status;
import com.flipkart.flux.api.core.FluxError;
import com.flipkart.flux.client.exception.FluxRetriableException;
import com.flipkart.flux.client.runtime.FluxRuntimeConnector;
import com.flipkart.flux.client.runtime.RuntimeCommunicationException;
import com.flipkart.flux.domain.Event;
import com.flipkart.flux.impl.message.HookAndEvents;
import com.flipkart.flux.impl.message.TaskAndEvents;
import com.flipkart.flux.impl.task.AbstractHook;
import com.flipkart.flux.impl.task.AbstractTask;
import com.flipkart.flux.impl.task.AkkaHook;
import com.flipkart.flux.impl.task.HookExecutor;
import com.flipkart.flux.impl.task.TaskExecutor;
import com.flipkart.flux.impl.task.TaskRegistry;
import com.flipkart.flux.metrics.iface.MetricsClient;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public class AkkaTask
extends UntypedActor {
    private final DiagnosticLoggingAdapter logger = Logging.getLogger((UntypedActor)this);
    @Inject
    private static TaskRegistry taskRegistry;
    @Inject
    private static FluxRuntimeConnector fluxRuntimeConnector;
    @Inject
    private static MetricsClient metricsClient;
    @Inject
    @Named(value="HookRouter")
    private Router hookRouter;
    private static final ObjectMapper objectMapper;

    public void onReceive(Object message) throws Exception {
        block22: {
            if (TaskAndEvents.class.isAssignableFrom(message.getClass())) {
                try {
                    AbstractTask task;
                    TaskAndEvents taskAndEvent = (TaskAndEvents)message;
                    metricsClient.decCounter("stateMachine." + taskAndEvent.getStateMachineName() + ".task." + taskAndEvent.getTaskName() + ".queueSize");
                    HashMap<String, Object> mdc = new HashMap<String, Object>();
                    mdc.put("stateMachineId", taskAndEvent.getStateMachineId().toString());
                    mdc.put("taskId", taskAndEvent.getTaskId());
                    this.logger.setMDC(mdc);
                    this.logger.info("Akka task processing state machine: {} task: {}", (Object)taskAndEvent.getStateMachineId(), (Object)taskAndEvent.getTaskId());
                    this.logger.debug("Actor {} received directive {}", (Object)this.getSelf(), (Object)taskAndEvent);
                    if (!taskAndEvent.getIsFirstTimeExecution()) {
                        taskAndEvent.setCurrentRetryCount(taskAndEvent.getCurrentRetryCount() + 1L);
                        fluxRuntimeConnector.incrementExecutionRetries(taskAndEvent.getStateMachineId(), taskAndEvent.getTaskId());
                    }
                    if ((task = taskRegistry.retrieveTask(taskAndEvent.getTaskIdentifier())) != null) {
                        try {
                            this.updateExecutionStatus(taskAndEvent, Status.running, null, false);
                        }
                        catch (RuntimeCommunicationException e) {
                            this.logger.error("Error occurred while updating task: {} status to running. Error: {}", (Object)taskAndEvent.getTaskId(), (Object)e.getMessage());
                            throw new FluxError(FluxError.ErrorType.retriable, e.getMessage(), e, false, new FluxError.ExecutionContextMeta(taskAndEvent.getStateMachineId(), taskAndEvent.getStateMachineName(), taskAndEvent.getTaskName(), taskAndEvent.getTaskId(), taskAndEvent.getRetryCount(), taskAndEvent.getCurrentRetryCount()));
                        }
                        String outputEventName = this.getOutputEventName(taskAndEvent);
                        TaskExecutor taskExecutor = new TaskExecutor(task, taskAndEvent.getEvents(), taskAndEvent.getStateMachineId(), outputEventName);
                        Event outputEvent = null;
                        try {
                            long startTime = System.currentTimeMillis();
                            outputEvent = (Event)taskExecutor.execute();
                            long endTime = System.currentTimeMillis();
                            if (outputEvent != null) {
                                fluxRuntimeConnector.submitEventAndUpdateStatus(new EventData(outputEvent.getName(), outputEvent.getType(), outputEvent.getEventData(), outputEvent.getEventSource()), outputEvent.getStateMachineInstanceId(), new ExecutionUpdateData(taskAndEvent.getStateMachineId(), taskAndEvent.getStateMachineName(), taskAndEvent.getTaskName(), taskAndEvent.getTaskId(), Status.completed, taskAndEvent.getRetryCount(), taskAndEvent.getCurrentRetryCount(), null, true));
                            } else {
                                this.updateExecutionStatus(taskAndEvent, Status.completed, null, true);
                            }
                            this.logger.info("State machine: {} task: {} execution time: {}ms event/status submission time: {}ms", (Object)taskAndEvent.getStateMachineId(), (Object)taskAndEvent.getTaskId(), (Object)(endTime - startTime), (Object)(System.currentTimeMillis() - endTime));
                            break block22;
                        }
                        catch (HystrixRuntimeException hre) {
                            HystrixRuntimeException.FailureType ft = hre.getFailureType();
                            if (ft.equals((Object)HystrixRuntimeException.FailureType.REJECTED_THREAD_EXECUTION) || ft.equals((Object)HystrixRuntimeException.FailureType.SHORTCIRCUIT) || ft.equals((Object)HystrixRuntimeException.FailureType.TIMEOUT)) {
                                this.updateExecutionStatus(taskAndEvent, Status.errored, ft.toString().toLowerCase(), false);
                                throw new FluxError(FluxError.ErrorType.timeout, ft.toString().toLowerCase(), null, false, new FluxError.ExecutionContextMeta(taskAndEvent.getStateMachineId(), taskAndEvent.getStateMachineName(), taskAndEvent.getTaskName(), taskAndEvent.getTaskId(), taskAndEvent.getRetryCount(), taskAndEvent.getCurrentRetryCount()));
                            }
                            boolean isFluxRetriableException = false;
                            Throwable cause = hre;
                            while (cause.getCause() != null || cause.getClass().getName().equals(FluxRetriableException.class.getName())) {
                                if (cause.getClass().getName().equals(FluxRetriableException.class.getName())) {
                                    isFluxRetriableException = true;
                                    break;
                                }
                                cause = cause.getCause();
                            }
                            if (isFluxRetriableException) {
                                metricsClient.incCounter("stateMachine." + taskAndEvent.getStateMachineName() + ".task." + taskAndEvent.getTaskName() + ".queueSize");
                                this.updateExecutionStatus(taskAndEvent, Status.errored, cause.getClass().getName() + " : " + cause.getMessage(), false);
                                throw new FluxError(FluxError.ErrorType.retriable, cause.getClass().getName() + " : " + cause.getMessage(), null, false, new FluxError.ExecutionContextMeta(taskAndEvent.getStateMachineId(), taskAndEvent.getStateMachineName(), taskAndEvent.getTaskName(), taskAndEvent.getTaskId(), taskAndEvent.getRetryCount(), taskAndEvent.getCurrentRetryCount()));
                            }
                            this.updateExecutionStatus(taskAndEvent, Status.errored, cause.getClass().getName() + " : " + cause.getMessage(), true);
                            break block22;
                        }
                        catch (RuntimeCommunicationException e) {
                            this.logger.error("Task completed but updateStatus/submit failed. State machine: {} task: {} ErrorMsg: {}", (Object)taskAndEvent.getStateMachineId(), (Object)taskAndEvent.getTaskId(), (Object)e.getMessage());
                            this.updateExecutionStatus(taskAndEvent, Status.errored, e.getMessage(), false);
                            throw new FluxError(FluxError.ErrorType.retriable, e.getMessage(), e, false, new FluxError.ExecutionContextMeta(taskAndEvent.getStateMachineId(), taskAndEvent.getStateMachineName(), taskAndEvent.getTaskName(), taskAndEvent.getTaskId(), taskAndEvent.getRetryCount(), taskAndEvent.getCurrentRetryCount()));
                        }
                        catch (Exception e) {
                            this.updateExecutionStatus(taskAndEvent, Status.errored, e.getMessage(), true);
                            break block22;
                        }
                    }
                    this.logger.error("Task received EventS that it cannot process. State machine: {} task: {} Events received are : {}", (Object)taskAndEvent.getStateMachineId(), (Object)taskAndEvent.getTaskId(), (Object)TaskRegistry.getEventsKey(taskAndEvent.getEvents()));
                }
                catch (FluxError fe) {
                    if (!fe.getType().equals((Object)FluxError.ErrorType.timeout) && !fe.getType().equals((Object)FluxError.ErrorType.retriable)) break block22;
                    if (fe.getExecutionContextMeta().getAttemptedNoOfRetries() < fe.getExecutionContextMeta().getMaxRetries()) {
                        this.logger.info("Scheduling Task Id: {} for retry. Current retry count = {}, Cause = {} ", (Object)fe.getExecutionContextMeta().getTaskId(), (Object)fe.getExecutionContextMeta().getAttemptedNoOfRetries(), (Object)fe.getMessage());
                        ((TaskAndEvents)message).setFirstTimeExecution(false);
                        this.getContext().system().scheduler().scheduleOnce(FiniteDuration.create((long)((int)Math.pow(2.0, fe.getExecutionContextMeta().getAttemptedNoOfRetries() + 1L)), (TimeUnit)TimeUnit.SECONDS), this.getSelf(), message, (ExecutionContext)this.getContext().system().dispatcher(), null);
                        break block22;
                    }
                    this.logger.warning("Aborting retries for Task Id : {}. Retry count exceeded : {}", (Object)fe.getExecutionContextMeta().getTaskId(), (Object)fe.getExecutionContextMeta().getAttemptedNoOfRetries());
                    fluxRuntimeConnector.updateExecutionStatus(new ExecutionUpdateData(fe.getExecutionContextMeta().getStateMachineId(), fe.getExecutionContextMeta().getStateMachineName(), fe.getExecutionContextMeta().getTaskName(), fe.getExecutionContextMeta().getTaskId(), Status.sidelined, fe.getExecutionContextMeta().getMaxRetries().longValue(), fe.getExecutionContextMeta().getAttemptedNoOfRetries().longValue(), fe.getMessage(), true));
                }
            } else if (!HookExecutor.STATUS.class.isAssignableFrom(message.getClass())) {
                if (message instanceof Terminated) {
                    this.hookRouter = this.hookRouter.removeRoutee(((Terminated)message).actor());
                    ActorRef r = this.getContext().actorOf(Props.create(AkkaHook.class, (Object[])new Object[0]));
                    this.getContext().watch(r);
                    this.hookRouter = this.hookRouter.addRoutee((Routee)new ActorRefRoutee(r));
                } else {
                    this.logger.error("Task received a message that it cannot process. Only com.flipkart.flux.impl.message.TaskAndEvents is supported. Message type received is : {}", (Object)message.getClass().getName());
                    this.unhandled(message);
                }
            }
        }
    }

    private void executeHooks(List<AbstractHook> hooks, EventData[] events) {
        if (hooks != null) {
            for (AbstractHook hook : hooks) {
                HookAndEvents hookAndEvents = new HookAndEvents(hook, events);
                this.hookRouter.route((Object)hookAndEvents, this.getSelf());
            }
        }
    }

    private String getOutputEventName(TaskAndEvents taskAndEvent) throws IOException {
        String outputEvent = taskAndEvent.getOutputEvent();
        return outputEvent != null ? ((EventDefinition)objectMapper.readValue(outputEvent, EventDefinition.class)).getName() : null;
    }

    private void updateExecutionStatus(TaskAndEvents taskAndEvent, Status status, String errorMsg, boolean deleteFromRedriver) {
        fluxRuntimeConnector.updateExecutionStatus(new ExecutionUpdateData(taskAndEvent.getStateMachineId(), taskAndEvent.getStateMachineName(), taskAndEvent.getTaskName(), taskAndEvent.getTaskId(), status, taskAndEvent.getRetryCount(), taskAndEvent.getCurrentRetryCount(), errorMsg, deleteFromRedriver));
    }

    static {
        objectMapper = new ObjectMapper();
    }
}

