/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.entity.parser;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.parser.CrossEntityValidations;
import org.apache.falcon.entity.parser.EntityParser;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.ACL;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.process.SparkAttributes;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.HadoopQueueUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessEntityParser
extends EntityParser<Process> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class);

    public ProcessEntityParser() {
        super(EntityType.PROCESS);
    }

    @Override
    public void validate(Process process) throws FalconException {
        if (process.getTimezone() == null) {
            process.setTimezone(TimeZone.getTimeZone("UTC"));
        }
        this.validateACL(process);
        HashSet<String> clusters = new HashSet<String>();
        for (Cluster cluster : process.getClusters().getClusters()) {
            Feed feed;
            String clusterName = cluster.getName();
            if (!clusters.add(cluster.getName())) {
                throw new ValidationException("Cluster: " + cluster.getName() + " is defined more than once for process: " + process.getName());
            }
            this.validateEntityExists(EntityType.CLUSTER, clusterName);
            if (cluster.getValidity().getEnd() == null) {
                cluster.getValidity().setEnd(DateUtil.NEVER);
            }
            int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
            if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
                throw new ValidationException("Process should not set cluster to a version that does not exist");
            }
            cluster.setVersion(clusterVersion);
            this.validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
            this.validateHDFSPaths(process, clusterName);
            this.validateProperties(process);
            if (process.getInputs() != null) {
                for (Input input : process.getInputs().getInputs()) {
                    this.validateEntityExists(EntityType.FEED, input.getFeed());
                    feed = (Feed)ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
                    CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
                    CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
                    CrossEntityValidations.validateInstanceRange(process, input, feed);
                    this.validateInputPartition(input, feed);
                    this.validateOptionalInputsForTableStorage(feed, input);
                }
            }
            if (process.getOutputs() == null) continue;
            for (Output output : process.getOutputs().getOutputs()) {
                this.validateEntityExists(EntityType.FEED, output.getFeed());
                feed = (Feed)ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
                CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
                CrossEntityValidations.validateInstance(process, output, feed);
            }
        }
        this.validateDatasetName(process.getInputs(), process.getOutputs());
        this.validateLateInputs(process);
        this.validateProcessSLA(process);
        this.validateHadoopQueue(process);
        this.validateProcessEntity(process);
    }

    private void validateProcessSLA(Process process) throws FalconException {
        if (process.getSla() != null) {
            ExpressionHelper evaluator = ExpressionHelper.get();
            ExpressionHelper.setReferenceDate(new Date());
            Frequency shouldStartExpression = process.getSla().getShouldStartIn();
            Frequency shouldEndExpression = process.getSla().getShouldEndIn();
            Frequency timeoutExpression = process.getTimeout();
            if (shouldStartExpression != null) {
                Date timeout;
                Date shouldEnd;
                Date shouldStart = new Date(evaluator.evaluate(shouldStartExpression.toString(), Long.class));
                if (shouldEndExpression != null && shouldStart.after(shouldEnd = new Date(evaluator.evaluate(shouldEndExpression.toString(), Long.class)))) {
                    throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression + "is greater than shouldEndIn: " + shouldEndExpression);
                }
                if (timeoutExpression != null && (timeout = new Date(evaluator.evaluate(timeoutExpression.toString(), Long.class))).before(shouldStart)) {
                    throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression + " is greater than timeout: " + process.getTimeout());
                }
            }
        }
    }

    private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
        org.apache.falcon.entity.v0.cluster.Cluster cluster = (org.apache.falcon.entity.v0.cluster.Cluster)ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
        if (!EntityUtil.responsibleFor(cluster.getColo())) {
            return;
        }
        String workflowPath = process.getWorkflow().getPath();
        String libPath = process.getWorkflow().getLib();
        String nameNode = this.getNameNode(cluster);
        try {
            Configuration configuration = ClusterHelper.getConfiguration(cluster);
            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
            if (!fs.exists(new Path(workflowPath))) {
                throw new ValidationException("Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
            }
            if (StringUtils.isNotBlank(libPath)) {
                String[] libPaths;
                for (String path : libPaths = libPath.split(",")) {
                    if (fs.exists(new Path(path))) continue;
                    throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode);
                }
            }
        }
        catch (IOException e) {
            throw new FalconException("Error validating workflow path " + workflowPath, e);
        }
    }

    private String getNameNode(org.apache.falcon.entity.v0.cluster.Cluster cluster) throws ValidationException {
        if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
            throw new ValidationException("Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName());
        }
        return ClusterHelper.getStorageUrl(cluster);
    }

    private void validateProcessValidity(Date start, Date end) throws FalconException {
        try {
            if (!start.before(end)) {
                throw new ValidationException("Process start time: " + start + " should be before process end time: " + end);
            }
        }
        catch (ValidationException e) {
            throw new ValidationException(e);
        }
        catch (Exception e) {
            throw new FalconException(e);
        }
    }

    private void validateInputPartition(Input input, Feed feed) throws FalconException {
        if (input.getPartition() == null) {
            return;
        }
        Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
            CrossEntityValidations.validateInputPartition(input, feed);
        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
            throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
        }
    }

    private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
        HashSet<String> datasetNames = new HashSet<String>();
        if (inputs != null) {
            for (Input input : inputs.getInputs()) {
                if (datasetNames.add(input.getName())) continue;
                throw new ValidationException("Input name: " + input.getName() + " is already used");
            }
        }
        if (outputs != null) {
            for (Output output : outputs.getOutputs()) {
                if (datasetNames.add(output.getName())) continue;
                throw new ValidationException("Output name: " + output.getName() + " is already used");
            }
        }
    }

    private void validateLateInputs(Process process) throws ValidationException {
        if (process.getLateProcess() == null) {
            return;
        }
        HashMap<String, String> feeds = new HashMap<String, String>();
        if (process.getInputs() != null) {
            for (Input in : process.getInputs().getInputs()) {
                feeds.put(in.getName(), in.getFeed());
            }
        }
        for (LateInput lp : process.getLateProcess().getLateInputs()) {
            if (!feeds.keySet().contains(lp.getInput())) {
                throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
            }
            try {
                Feed feed = (Feed)ConfigurationStore.get().get(EntityType.FEED, (String)feeds.get(lp.getInput()));
                if (feed.getLateArrival() != null) continue;
                throw new ValidationException("Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
            }
            catch (FalconException e) {
                throw new ValidationException(e);
            }
        }
    }

    private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
        if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
            throw new ValidationException("Optional Input is not supported for feeds with table storage! " + input.getName());
        }
    }

    protected void validateACL(Process process) throws FalconException {
        if (this.isAuthorizationDisabled) {
            return;
        }
        ACL processACL = process.getACL();
        if (processACL == null) {
            throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
        }
        this.validateACLOwnerAndGroup(processACL);
        try {
            this.authorize(process.getName(), processACL);
        }
        catch (AuthorizationException e) {
            throw new ValidationException((Exception)((Object)e));
        }
    }

    protected void validateProperties(Process process) throws ValidationException {
        org.apache.falcon.entity.v0.process.Properties properties = process.getProperties();
        if (properties == null) {
            return;
        }
        List<Property> propertyList = process.getProperties().getProperties();
        HashSet<String> propertyKeys = new HashSet<String>();
        for (Property prop : propertyList) {
            if (StringUtils.isBlank(prop.getName())) {
                throw new ValidationException("Property name and value cannot be empty for Process : " + process.getName());
            }
            if (propertyKeys.add(prop.getName())) continue;
            throw new ValidationException("Multiple properties with same name found for Process : " + process.getName());
        }
    }

    private void validateHadoopQueue(Process process) throws FalconException {
        String processQueueName = null;
        Properties props = EntityUtil.getEntityProperties(process);
        if (props == null || !props.containsKey("queueName")) {
            return;
        }
        processQueueName = props.getProperty("queueName");
        for (Cluster cluster : process.getClusters().getClusters()) {
            String clusterName = cluster.getName();
            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = (org.apache.falcon.entity.v0.cluster.Cluster)ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
            String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address");
            if (rmURL == null) {
                rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address");
            }
            if (rmURL == null) continue;
            LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", (Object)cluster.getName(), (Object)rmURL);
            Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
            if (queueNames.contains(processQueueName)) {
                LOG.info("Validated presence of queue {} specified in process entity for cluster {}", (Object)processQueueName, (Object)clusterName);
                continue;
            }
            String strMsg = String.format("The hadoop queue name %s specified in process entity for cluster %s is invalid.", processQueueName, cluster.getName());
            LOG.info(strMsg);
            throw new FalconException(strMsg);
        }
    }

    protected void validateProcessEntity(Process process) throws FalconException {
        this.validateSparkProcessEntity(process, process.getSparkAttributes());
    }

    private void validateSparkProcessEntity(Process process, SparkAttributes sparkAttributes) throws FalconException {
        Workflow workflow = process.getWorkflow();
        if (workflow.getEngine() == EngineType.SPARK) {
            String sparkMaster;
            if (sparkAttributes == null) {
                throw new ValidationException("For Spark Workflow engine Spark Attributes in Process Entity can't be null");
            }
            String clusterName = process.getClusters().getClusters().get(0).getName();
            org.apache.falcon.entity.v0.cluster.Cluster cluster = (org.apache.falcon.entity.v0.cluster.Cluster)ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
            String clusterEntitySparkMaster = ClusterHelper.getSparkMasterEndPoint(cluster);
            String processEntitySparkMaster = sparkAttributes.getMaster();
            String string = sparkMaster = processEntitySparkMaster == null ? clusterEntitySparkMaster : processEntitySparkMaster;
            if (StringUtils.isEmpty(sparkMaster) || StringUtils.isEmpty(sparkAttributes.getJar())) {
                throw new ValidationException("Spark master and jar/python file can't be null");
            }
        }
    }
}

