/*
 * Decompiled with CFR 0.152.
 */
package com.mdn.tools.sparketl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.mdn.tools.sparketl.Config;
import com.mdn.tools.sparketl.models.Plan;
import com.mdn.tools.sparketl.models.transform.Transformation;
import com.mdn.tools.sparketl.models.transform.impl.LookupTransformation;
import com.mdn.tools.sparketl.models.transform.impl.SqlTransformation;
import com.mdn.tools.sparketl.utils.SparkUtils;
import java.io.File;
import java.io.IOException;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class PlanExecutor {
    SparkUtils sparkUtils = null;

    public void executePlan(String planFilePath) throws IOException, AnalysisException {
        this.sparkUtils = SparkUtils.getSparkUtils();
        File planFile = new File(planFilePath);
        Plan plan = (Plan)Config.objectMapper.readValue(planFile, Plan.class);
        this.executePlan(plan);
    }

    public void executePlan(Plan plan) throws AnalysisException, JsonProcessingException {
        SparkUtils sparkUtils = SparkUtils.getSparkUtils();
        SparkSession session = sparkUtils.getSparkSession();
        Dataset<Row> transformedDataset = this.transform(session, plan, plan.getTransformation());
        sparkUtils.writeDataset(transformedDataset, plan.getTarget());
    }

    private Dataset<Row> transform(SparkSession session, Plan plan, Transformation transformation) throws AnalysisException, JsonProcessingException {
        Dataset sourceDataset = this.sparkUtils.readDataset(plan.getSource(), session);
        Dataset transformedDataset = null;
        if (transformation == null) {
            return sourceDataset;
        }
        switch (transformation.getType()) {
            case SQL: {
                SqlTransformation sqlTransformation = (SqlTransformation)transformation;
                sourceDataset.createGlobalTempView("source");
                for (String sql : sqlTransformation.getSqlList()) {
                    sourceDataset = transformedDataset = session.sql(sql.replace("source", "global_temp.source"));
                    sourceDataset.createOrReplaceGlobalTempView("source");
                }
                return transformedDataset;
            }
            case FIELD_MAPPING: {
                return transformedDataset;
            }
            case LOOKUP: {
                LookupTransformation lookupTransformation = (LookupTransformation)transformation;
                Dataset lookupDataset = this.sparkUtils.readDataset(plan.getLookup(), session);
                sourceDataset.createGlobalTempView("source");
                lookupDataset.createGlobalTempView("lookup");
                String sqlString = lookupTransformation.getSql().replace("from source left join lookup", "from global_temp.source as source left join global_temp.lookup as lookup");
                transformedDataset = session.sql(sqlString);
                return transformedDataset;
            }
        }
        return null;
    }
}

