/*
 * Decompiled with CFR 0.152.
 */
package com.mdn.tools.sparketl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mdn.tools.sparketl.models.Plan;
import com.mdn.tools.sparketl.models.datastore.DataStore;
import com.mdn.tools.sparketl.models.datastore.impl.Csv;
import com.mdn.tools.sparketl.models.datastore.impl.Jdbc;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;

public class Trial {
    static UDF1 toUpper;
    static UDF2 suffix;
    private static final String POSTGRES_CONNECTION_URL = "jdbc:postgresql://localhost:5432/testdb";
    public static String planJson;
    public static String planJson2;

    public static void main(String[] args) throws IOException, AnalysisException {
        ObjectMapper mapper = new ObjectMapper();
        Plan plan = (Plan)mapper.readValue(new File("plans/csv_jdbc_transform.json"), Plan.class);
        Trial.executePlan(plan);
    }

    public static void testSpark() {
        Properties prop = new Properties();
        prop.setProperty("driver", "org.postgresql.Driver");
        prop.setProperty("user", "user01");
        prop.setProperty("password", "123");
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Word Count");
        conf.set("spark.driver.allowMultipleContexts", "true");
        SparkSession session = SparkSession.builder().config(conf).getOrCreate();
        session.udf().register("toUpper", toUpper, DataTypes.StringType);
        Dataset csv = session.read().format("csv").option("header", "true").load("sample.csv");
        Dataset csv2 = session.read().jdbc(POSTGRES_CONNECTION_URL, "test", prop);
    }

    public static void executePlan(Plan plan) throws AnalysisException {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Word Count");
        SparkSession session = SparkSession.builder().config(conf).getOrCreate();
        session.udf().register("toUpper", toUpper, DataTypes.StringType);
        session.udf().register("suffix", suffix, DataTypes.StringType);
        Dataset source = Trial.readDataset(plan.getSource(), session);
        Dataset lookup = session.read().format("csv").option("header", "true").load("gender-lookup.csv");
        source.createOrReplaceGlobalTempView("source");
        lookup.createOrReplaceGlobalTempView("lookup");
        String sqlString = "select source.id,source.name,lookup.desc as gender from global_temp.source as source left join global_temp.lookup as lookup on source.gender==lookup.gender";
        System.out.println(sqlString);
        session.sql(sqlString).show();
    }

    private static Dataset readDataset(DataStore dataStore, SparkSession session) {
        switch (dataStore.getDataStoreType()) {
            case CSV: {
                Csv csv = (Csv)dataStore;
                return session.read().format("csv").option("header", "true").load(csv.getFilePath());
            }
            case JDBC: {
                Jdbc jdbc = (Jdbc)dataStore;
                Properties prop = new Properties();
                prop.setProperty("driver", jdbc.getDriver());
                prop.setProperty("user", jdbc.getUserName());
                prop.setProperty("password", jdbc.getPassword());
                return session.read().jdbc(jdbc.getUrl(), jdbc.getTable(), prop);
            }
        }
        return null;
    }

    private static Boolean writeDataset(Dataset ds, DataStore target) {
        switch (target.getDataStoreType()) {
            case CSV: {
                return true;
            }
            case JDBC: {
                Jdbc jdbc = (Jdbc)target;
                Properties prop = new Properties();
                prop.setProperty("driver", jdbc.getDriver());
                prop.setProperty("user", jdbc.getUserName());
                prop.setProperty("password", jdbc.getPassword());
                ds.write().mode("append").jdbc(POSTGRES_CONNECTION_URL, "test", prop);
                return true;
            }
        }
        return null;
    }

    static {
        planJson = "{\n  \"source\" : {\n    \"dataStoreType\" : \"CSV\",\n    \"filePath\" : \"sample.csv\"\n  },\n  \"target\" : {\n    \"dataStoreType\" : \"JDBC\",\n    \"url\" : \"jdbc:postgresql://localhost:5432/testdb\",\n    \"userName\" : \"user01\",\n    \"password\" : \"123\",\n    \"driver\" : \"org.postgresql.Driver\",\n    \"hostName\" : \"localhost\",\n    \"table\" : \"test\"\n  },\n  \"fieldMappingList\" : [ {\n    \"sourceFieldName\" : \"name\",\n    \"targetFieldName\" : \"name\"\n  } ]\n}";
        planJson2 = "{\n  \"source\": {\n    \"dataStoreType\": \"CSV\",\n    \"filePath\": \"sample.csv\"\n  },\n  \"target\": {\n    \"dataStoreType\": \"JDBC\",\n    \"url\": \"jdbc:postgresql://localhost:5432/testdb\",\n    \"userName\": \"user01\",\n    \"password\": \"123\",\n    \"driver\": \"org.postgresql.Driver\",\n    \"hostName\": \"localhost\",\n    \"table\": null\n  },\n  \"fieldMappingList\": [{\n    \"sourceFieldName\": \"name\",\n    \"targetFieldName\": \"name\",\n    \"transformation\": {\n      \"type\": \"SUFFIX\",\n      \"args\": [\"_abc\"]\n    }\n  }]\n}";
    }
}

