/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.workflow;

import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowJobEndNotificationService
implements FalconService {
    private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
    public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
    private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
    private Map<String, Properties> contextMap = new ConcurrentHashMap<String, Properties>();

    @Override
    public String getName() {
        return SERVICE_NAME;
    }

    Map<String, Properties> getContextMap() {
        return this.contextMap;
    }

    @Override
    public void init() throws FalconException {
        String listenerClassNames = StartupProperties.get().getProperty("workflow.execution.listeners");
        if (StringUtils.isEmpty(listenerClassNames)) {
            return;
        }
        for (String listenerClassName : listenerClassNames.split(",")) {
            if ((listenerClassName = listenerClassName.trim()).isEmpty()) continue;
            WorkflowExecutionListener listener = (WorkflowExecutionListener)ReflectionUtils.getInstanceByClassName(listenerClassName);
            this.registerListener(listener);
        }
    }

    @Override
    public void destroy() throws FalconException {
        this.listeners.clear();
    }

    public void registerListener(WorkflowExecutionListener listener) {
        this.listeners.add(listener);
    }

    public void unregisterListener(WorkflowExecutionListener listener) {
        this.listeners.remove(listener);
    }

    public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
        this.notifyWorkflowEnd(context);
    }

    public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
        this.notifyWorkflowEnd(context);
    }

    public void notifyStart(WorkflowExecutionContext context) throws FalconException {
        if (!this.updateContextFromWFConf(context)) {
            return;
        }
        LOG.debug("Sending workflow start notification to listeners with context : {} ", (Object)context);
        for (WorkflowExecutionListener listener : this.listeners) {
            try {
                listener.onStart(context);
            }
            catch (Throwable t) {
                LOG.error("Error in listener {}", (Object)listener.getClass().getName(), (Object)t);
            }
        }
    }

    public void notifySuspend(WorkflowExecutionContext context) throws FalconException {
        if (!this.updateContextFromWFConf(context)) {
            return;
        }
        LOG.debug("Sending workflow suspend notification to listeners with context : {} ", (Object)context);
        for (WorkflowExecutionListener listener : this.listeners) {
            try {
                listener.onSuspend(context);
            }
            catch (Throwable t) {
                LOG.error("Error in listener {}", (Object)listener.getClass().getName(), (Object)t);
            }
        }
        this.instrumentAlert(context);
        this.contextMap.remove(context.getWorkflowId());
    }

    public void notifyWait(WorkflowExecutionContext context) throws FalconException {
        LOG.debug("Sending workflow wait notification to listeners with context : {} ", (Object)context);
        for (WorkflowExecutionListener listener : this.listeners) {
            try {
                listener.onWait(context);
            }
            catch (Throwable t) {
                LOG.error("Error in listener {}", (Object)listener.getClass().getName(), (Object)t);
            }
        }
    }

    private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
        Properties wfProps = this.contextMap.get(context.getWorkflowId());
        if (wfProps == null) {
            Entity entity = null;
            try {
                entity = (Entity)EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
            }
            catch (EntityNotRegisteredException e) {
                LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.", (Object)context.getEntityName(), (Object)context.getEntityType());
                this.contextMap.remove(context.getWorkflowId());
                return false;
            }
            for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
                block8: {
                    try {
                        InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity).getJobDetails(cluster, context.getWorkflowId()).getInstances();
                        if (instances == null || instances.length <= 0) break block8;
                        wfProps = this.getWFProps(instances[0].getWfParams());
                        wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), Integer.toString(instances[0].getRunId()));
                    }
                    catch (FalconException e) {
                        continue;
                    }
                }
                this.contextMap.put(context.getWorkflowId(), wfProps);
            }
        }
        if (wfProps == null || wfProps.isEmpty()) {
            return true;
        }
        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
            if (!wfProps.containsKey(arg.getName())) continue;
            context.setValue(arg, wfProps.getProperty(arg.getName()));
        }
        return true;
    }

    private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
        Properties props = new Properties();
        for (InstancesResult.KeyValuePair kv : wfParams) {
            props.put(kv.getKey(), kv.getValue());
        }
        return props;
    }

    private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException {
        if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
            boolean engineNotifEnabled = false;
            try {
                engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine().isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
            }
            catch (FalconException e) {
                LOG.debug("Received error while checking if notification is enabled. Hence, assuming notification is not enabled.");
            }
            if (engineNotifEnabled) {
                LOG.info("Ignoring message from post processing as engine notification is enabled.");
                return;
            }
            this.updateContextWithTime(context);
        } else if (!this.updateContextFromWFConf(context)) {
            return;
        }
        LOG.debug("Sending workflow end notification to listeners with context : {} ", (Object)context);
        for (WorkflowExecutionListener listener : this.listeners) {
            try {
                if (context.hasWorkflowSucceeded()) {
                    listener.onSuccess(context);
                    this.instrumentAlert(context);
                    continue;
                }
                listener.onFailure(context);
                if (!context.hasWorkflowBeenKilled() && !context.hasWorkflowFailed()) continue;
                this.instrumentAlert(context);
            }
            catch (Throwable t) {
                LOG.error("Error in listener {}", (Object)listener.getClass().getName(), (Object)t);
            }
        }
        this.contextMap.remove(context.getWorkflowId());
    }

    private void updateContextWithTime(WorkflowExecutionContext context) {
        try {
            InstancesResult result = WorkflowEngineFactory.getWorkflowEngine().getJobDetails(context.getClusterName(), context.getWorkflowId());
            Date startTime = result.getInstances()[0].startTime;
            Date endTime = result.getInstances()[0].endTime;
            Date now = new Date();
            if (startTime == null) {
                startTime = now;
            }
            if (endTime == null) {
                endTime = now;
            }
            context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime()));
            context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime()));
        }
        catch (FalconException e) {
            LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster " + context.getClusterName(), (Throwable)e);
        }
    }

    private void instrumentAlert(WorkflowExecutionContext context) {
        String clusterName = context.getClusterName();
        String entityName = context.getEntityName();
        String entityType = context.getEntityType();
        String operation = context.getOperation().name();
        String workflowId = context.getWorkflowId();
        String workflowUser = context.getWorkflowUser();
        String nominalTime = context.getNominalTimeAsISO8601();
        String runId = String.valueOf(context.getWorkflowRunId());
        Date now = new Date();
        Date endTime = context.getWorkflowEndTime() == 0L ? now : new Date(context.getWorkflowEndTime());
        Date startTime = context.getWorkflowStartTime() == 0L ? now : new Date(context.getWorkflowStartTime());
        Long duration = (endTime.getTime() - startTime.getTime()) * 1000000L;
        if (!context.hasWorkflowSucceeded()) {
            GenericAlert.instrumentFailedInstance(clusterName, entityType, entityName, nominalTime, workflowId, workflowUser, runId, operation, SchemaHelper.formatDateUTC(startTime), "", "", duration);
        } else {
            GenericAlert.instrumentSucceededInstance(clusterName, entityType, entityName, nominalTime, workflowId, workflowUser, runId, operation, SchemaHelper.formatDateUTC(startTime), duration);
        }
    }
}

