/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.spark;

import com.google.gson.Gson;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.spark.DepInterpreter;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.apache.zeppelin.spark.ZeppelinContext;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;

public class PySparkInterpreter
extends Interpreter
implements ExecuteResultHandler {
    Logger logger = LoggerFactory.getLogger(PySparkInterpreter.class);
    private GatewayServer gatewayServer;
    private DefaultExecutor executor;
    private int port;
    private InterpreterOutputStream outputStream;
    private BufferedWriter ins;
    private PipedInputStream in;
    private ByteArrayOutputStream input;
    private String scriptPath;
    boolean pythonscriptRunning = false;
    private static final int MAX_TIMEOUT_SEC = 10;
    private long pythonPid = -1L;
    PythonInterpretRequest pythonInterpretRequest = null;
    Integer statementSetNotifier = new Integer(0);
    String statementOutput = null;
    boolean statementError = false;
    Integer statementFinishedNotifier = new Integer(0);
    boolean pythonScriptInitialized = false;
    Integer pythonScriptInitializeNotifier = new Integer(0);

    public PySparkInterpreter(Properties property) {
        super(property);
        try {
            File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py");
            this.scriptPath = scriptFile.getAbsolutePath();
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    private void createPythonScript() {
        ClassLoader classLoader = this.getClass().getClassLoader();
        File out = new File(this.scriptPath);
        if (out.exists() && out.isDirectory()) {
            throw new InterpreterException("Can't create python script " + out.getAbsolutePath());
        }
        try {
            FileOutputStream outStream = new FileOutputStream(out);
            IOUtils.copy((InputStream)classLoader.getResourceAsStream("python/zeppelin_pyspark.py"), (OutputStream)outStream);
            outStream.close();
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
        this.logger.info("File {} created", (Object)this.scriptPath);
    }

    @Override
    public void open() {
        File[] files;
        File localRepoDir;
        String localRepo;
        List<File> files2;
        SparkDependencyContext depc;
        InterpreterGroup intpGroup = this.getInterpreterGroup();
        if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
            this.registerHook("post_exec_dev", "__zeppelin__._displayhook()");
        }
        DepInterpreter depInterpreter = this.getDepInterpreter();
        URL[] urls = new URL[]{};
        LinkedList<URL> urlList = new LinkedList<URL>();
        if (depInterpreter != null && (depc = depInterpreter.getDependencyContext()) != null && (files2 = depc.getFiles()) != null) {
            for (File f : files2) {
                try {
                    urlList.add(f.toURI().toURL());
                }
                catch (MalformedURLException e) {
                    this.logger.error("Error", e);
                }
            }
        }
        if ((localRepo = this.getProperty("zeppelin.interpreter.localRepo")) != null && (localRepoDir = new File(localRepo)).exists() && (files = localRepoDir.listFiles()) != null) {
            for (File f : files) {
                try {
                    urlList.add(f.toURI().toURL());
                }
                catch (MalformedURLException e) {
                    this.logger.error("Error", e);
                }
            }
        }
        urls = urlList.toArray(urls);
        ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
        try {
            URLClassLoader newCl = new URLClassLoader(urls, oldCl);
            Thread.currentThread().setContextClassLoader(newCl);
            this.createGatewayServerAndStartScript();
        }
        catch (Exception e) {
            this.logger.error("Error", e);
            throw new InterpreterException(e);
        }
        finally {
            Thread.currentThread().setContextClassLoader(oldCl);
        }
    }

    private Map setupPySparkEnv() throws IOException {
        String sparkSubmitJars;
        Map<String, String> env = EnvironmentUtils.getProcEnvironment();
        SparkConf conf = this.getSparkConf();
        if (!env.containsKey("PYTHONPATH")) {
            env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + ":../interpreter/lib/python");
        }
        if (SparkInterpreter.useSparkSubmit() && !this.getSparkInterpreter().isYarnMode() && !"".equals(sparkSubmitJars = conf.get("spark.jars").replace(",", ":"))) {
            env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars);
        }
        this.logger.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
        return env;
    }

    private void createGatewayServerAndStartScript() {
        this.createPythonScript();
        this.port = this.findRandomOpenPortOnAllLocalInterfaces();
        this.gatewayServer = new GatewayServer((Object)this, this.port);
        this.gatewayServer.start();
        CommandLine cmd = CommandLine.parse(this.getProperty("zeppelin.pyspark.python"));
        cmd.addArgument(this.scriptPath, false);
        cmd.addArgument(Integer.toString(this.port), false);
        cmd.addArgument(Integer.toString(this.getSparkInterpreter().getSparkVersion().toNumber()), false);
        this.executor = new DefaultExecutor();
        this.outputStream = new InterpreterOutputStream(this.logger);
        PipedOutputStream ps = new PipedOutputStream();
        this.in = null;
        try {
            this.in = new PipedInputStream(ps);
        }
        catch (IOException e1) {
            throw new InterpreterException(e1);
        }
        this.ins = new BufferedWriter(new OutputStreamWriter(ps));
        this.input = new ByteArrayOutputStream();
        PumpStreamHandler streamHandler = new PumpStreamHandler(this.outputStream, this.outputStream, this.in);
        this.executor.setStreamHandler(streamHandler);
        this.executor.setWatchdog(new ExecuteWatchdog(-1L));
        try {
            Map env = this.setupPySparkEnv();
            this.executor.execute(cmd, env, this);
            this.pythonscriptRunning = true;
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
        try {
            this.input.write("import sys, getopt\n".getBytes());
            this.ins.flush();
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    private int findRandomOpenPortOnAllLocalInterfaces() {
        int port;
        try (ServerSocket socket = new ServerSocket(0);){
            port = socket.getLocalPort();
            socket.close();
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
        return port;
    }

    @Override
    public void close() {
        this.executor.getWatchdog().destroyProcess();
        new File(this.scriptPath).delete();
        this.gatewayServer.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PythonInterpretRequest getStatements() {
        Integer n = this.statementSetNotifier;
        synchronized (n) {
            while (this.pythonInterpretRequest == null) {
                try {
                    this.statementSetNotifier.wait(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
            PythonInterpretRequest req = this.pythonInterpretRequest;
            this.pythonInterpretRequest = null;
            return req;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setStatementsFinished(String out, boolean error) {
        Integer n = this.statementFinishedNotifier;
        synchronized (n) {
            this.statementOutput = out;
            this.statementError = error;
            this.statementFinishedNotifier.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPythonScriptInitialized(long pid) {
        this.pythonPid = pid;
        Integer n = this.pythonScriptInitializeNotifier;
        synchronized (n) {
            this.pythonScriptInitialized = true;
            this.pythonScriptInitializeNotifier.notifyAll();
        }
    }

    public void appendOutput(String message) throws IOException {
        this.outputStream.getInterpreterOutput().write(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public InterpreterResult interpret(String st, InterpreterContext context) {
        List<InterpreterResultMessage> errorMessage;
        SparkInterpreter sparkInterpreter = this.getSparkInterpreter();
        sparkInterpreter.populateSparkWebUrl(context);
        if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
            return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark " + sparkInterpreter.getSparkVersion().toString() + " is not supported");
        }
        if (!this.pythonscriptRunning) {
            return new InterpreterResult(InterpreterResult.Code.ERROR, "python process not running" + this.outputStream.toString());
        }
        this.outputStream.setInterpreterOutput(context.out);
        Integer n = this.pythonScriptInitializeNotifier;
        synchronized (n) {
            long startTime = System.currentTimeMillis();
            while (!this.pythonScriptInitialized && this.pythonscriptRunning && System.currentTimeMillis() - startTime < 10000L) {
                try {
                    this.pythonScriptInitializeNotifier.wait(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        try {
            context.out.flush();
            errorMessage = context.out.toInterpreterResultMessage();
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
        if (!this.pythonscriptRunning) {
            errorMessage.add(new InterpreterResultMessage(InterpreterResult.Type.TEXT, "failed to start pyspark"));
            return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage);
        }
        if (!this.pythonScriptInitialized) {
            errorMessage.add(new InterpreterResultMessage(InterpreterResult.Type.TEXT, "pyspark is not responding"));
            return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage);
        }
        if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
            errorMessage.add(new InterpreterResultMessage(InterpreterResult.Type.TEXT, "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
            return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage);
        }
        String jobGroup = sparkInterpreter.getJobGroup(context);
        ZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
        __zeppelin__.setInterpreterContext(context);
        __zeppelin__.setGui(context.getGui());
        this.pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup);
        this.statementOutput = null;
        Integer n2 = this.statementSetNotifier;
        synchronized (n2) {
            this.statementSetNotifier.notify();
        }
        n2 = this.statementFinishedNotifier;
        synchronized (n2) {
            while (this.statementOutput == null) {
                try {
                    this.statementFinishedNotifier.wait(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        if (this.statementError) {
            return new InterpreterResult(InterpreterResult.Code.ERROR, this.statementOutput);
        }
        try {
            context.out.flush();
        }
        catch (IOException e) {
            throw new InterpreterException(e);
        }
        return new InterpreterResult(InterpreterResult.Code.SUCCESS);
    }

    public void interrupt() throws IOException {
        if (this.pythonPid > -1L) {
            this.logger.info("Sending SIGINT signal to PID : " + this.pythonPid);
            Runtime.getRuntime().exec("kill -SIGINT " + this.pythonPid);
        } else {
            this.logger.warn("Non UNIX/Linux system, close the interpreter");
            this.close();
        }
    }

    @Override
    public void cancel(InterpreterContext context) {
        SparkInterpreter sparkInterpreter = this.getSparkInterpreter();
        sparkInterpreter.cancel(context);
        try {
            this.interrupt();
        }
        catch (IOException e) {
            this.logger.error("Error", e);
        }
    }

    @Override
    public Interpreter.FormType getFormType() {
        return Interpreter.FormType.NATIVE;
    }

    @Override
    public int getProgress(InterpreterContext context) {
        SparkInterpreter sparkInterpreter = this.getSparkInterpreter();
        return sparkInterpreter.getProgress(context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<InterpreterCompletion> completion(String buf, int cursor, InterpreterContext interpreterContext) {
        if (buf.length() < cursor) {
            cursor = buf.length();
        }
        String completionString = this.getCompletionTargetString(buf, cursor);
        String completionCommand = "completion.getCompletion('" + completionString + "')";
        SparkInterpreter sparkInterpreter = this.getSparkInterpreter();
        if (!sparkInterpreter.getSparkVersion().isUnsupportedVersion() && !this.pythonscriptRunning) {
            return new LinkedList<InterpreterCompletion>();
        }
        this.pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "");
        this.statementOutput = null;
        Integer n = this.statementSetNotifier;
        synchronized (n) {
            this.statementSetNotifier.notify();
        }
        String[] completionList = null;
        Integer n2 = this.statementFinishedNotifier;
        synchronized (n2) {
            long startTime = System.currentTimeMillis();
            while (this.statementOutput == null && this.pythonscriptRunning) {
                try {
                    if (System.currentTimeMillis() - startTime > 10000L) {
                        this.logger.error("pyspark completion didn't have response for {}sec.", (Object)10);
                        break;
                    }
                    this.statementFinishedNotifier.wait(1000L);
                }
                catch (InterruptedException e) {
                    this.logger.info("wait drop");
                    return new LinkedList<InterpreterCompletion>();
                }
            }
            if (this.statementError) {
                return new LinkedList<InterpreterCompletion>();
            }
            Gson gson = new Gson();
            completionList = gson.fromJson(this.statementOutput, String[].class);
        }
        if (completionList == null) {
            return new LinkedList<InterpreterCompletion>();
        }
        LinkedList<InterpreterCompletion> results = new LinkedList<InterpreterCompletion>();
        for (String name : completionList) {
            results.add(new InterpreterCompletion(name, name, ""));
        }
        return results;
    }

    private String getCompletionTargetString(String text, int cursor) {
        String[] completionSeqCharaters = new String[]{" ", "\n", "\t"};
        int completionEndPosition = cursor;
        int completionStartPosition = cursor;
        int indexOfReverseSeqPostion = cursor;
        String resultCompletionText = "";
        String completionScriptText = "";
        try {
            completionScriptText = text.substring(0, cursor);
        }
        catch (Exception e) {
            this.logger.error(e.toString());
            return null;
        }
        completionEndPosition = completionScriptText.length();
        String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
        for (String seqCharacter : completionSeqCharaters) {
            indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
            if (indexOfReverseSeqPostion >= completionStartPosition || indexOfReverseSeqPostion <= 0) continue;
            completionStartPosition = indexOfReverseSeqPostion;
        }
        completionStartPosition = completionStartPosition == completionEndPosition ? 0 : completionEndPosition - completionStartPosition;
        resultCompletionText = completionScriptText.substring(completionStartPosition, completionEndPosition);
        return resultCompletionText;
    }

    private SparkInterpreter getSparkInterpreter() {
        LazyOpenInterpreter lazy = null;
        SparkInterpreter spark = null;
        Interpreter p = this.getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
        while (p instanceof WrappedInterpreter) {
            if (p instanceof LazyOpenInterpreter) {
                lazy = (LazyOpenInterpreter)p;
            }
            p = ((WrappedInterpreter)((Object)p)).getInnerInterpreter();
        }
        spark = (SparkInterpreter)p;
        if (lazy != null) {
            lazy.open();
        }
        return spark;
    }

    public ZeppelinContext getZeppelinContext() {
        SparkInterpreter sparkIntp = this.getSparkInterpreter();
        if (sparkIntp != null) {
            return this.getSparkInterpreter().getZeppelinContext();
        }
        return null;
    }

    public JavaSparkContext getJavaSparkContext() {
        SparkInterpreter intp = this.getSparkInterpreter();
        if (intp == null) {
            return null;
        }
        return new JavaSparkContext(intp.getSparkContext());
    }

    public Object getSparkSession() {
        SparkInterpreter intp = this.getSparkInterpreter();
        if (intp == null) {
            return null;
        }
        return intp.getSparkSession();
    }

    public SparkConf getSparkConf() {
        JavaSparkContext sc = this.getJavaSparkContext();
        if (sc == null) {
            return null;
        }
        return this.getJavaSparkContext().getConf();
    }

    public SQLContext getSQLContext() {
        SparkInterpreter intp = this.getSparkInterpreter();
        if (intp == null) {
            return null;
        }
        return intp.getSQLContext();
    }

    private DepInterpreter getDepInterpreter() {
        Interpreter p = this.getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
        if (p == null) {
            return null;
        }
        while (p instanceof WrappedInterpreter) {
            p = ((WrappedInterpreter)((Object)p)).getInnerInterpreter();
        }
        return (DepInterpreter)p;
    }

    @Override
    public void onProcessComplete(int exitValue) {
        this.pythonscriptRunning = false;
        this.logger.info("python process terminated. exit code " + exitValue);
    }

    @Override
    public void onProcessFailed(ExecuteException e) {
        this.pythonscriptRunning = false;
        this.logger.error("python process failed", e);
    }

    public class PythonInterpretRequest {
        public String statements;
        public String jobGroup;

        public PythonInterpretRequest(String statements, String jobGroup) {
            this.statements = statements;
            this.jobGroup = jobGroup;
        }

        public String statements() {
            return this.statements;
        }

        public String jobGroup() {
            return this.jobGroup;
        }
    }
}

