/*
 * Decompiled with CFR 0.152.
 */
package com.github.quintona;

import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONArray;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class RFunction
extends BaseFunction {
    Process process;
    DataOutputStream rInput;
    String rExecutable;
    List<String> libraries;
    String functionName;
    BufferedReader reader;
    static String ls = System.getProperty("line.separator");
    private String initCode = null;
    public static final String START_LINE = "<s>";
    public static final String END_LINE = "<e>";

    public RFunction(String rExecutable, List<String> libraries, String functionName) {
        this.rExecutable = rExecutable;
        this.functionName = functionName;
        this.libraries = libraries;
    }

    public RFunction(List<String> libraries, String functionName) {
        this.rExecutable = "/usr/bin/R";
        this.functionName = functionName;
        this.libraries = libraries;
    }

    public RFunction withInitCode(String rCode) {
        this.initCode = rCode;
        return this;
    }

    public RFunction withNamedInitCode(String name) {
        this.initCode = RFunction.readFile("/" + name + ".R");
        return this;
    }

    public void prepare(Map conf, TridentOperationContext context) {
        ProcessBuilder builder = new ProcessBuilder(this.rExecutable, "--vanilla", "-q", "--slave");
        try {
            this.process = builder.start();
            this.rInput = new DataOutputStream(this.process.getOutputStream());
            this.reader = new BufferedReader(new InputStreamReader(this.process.getInputStream()));
            this.loadLibraries();
            if (this.initCode != null) {
                this.rInput.writeBytes(this.initCode + "\n");
                this.rInput.flush();
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Could not start R, please check install and settings" + e);
        }
    }

    private void loadLibraries() throws IOException {
        this.rInput.writeBytes("library('rjson')\n");
        for (String lib : this.libraries) {
            this.rInput.writeBytes("library('" + lib + "')\n");
        }
        this.rInput.flush();
    }

    public static String trimOutput(String output) {
        output = output.replace("[1]", "");
        output = output.replace("\\", "");
        output = output.trim();
        return output.substring(1, output.length() - 1);
    }

    private JSONArray getResult() throws ParseException {
        StringBuilder stringBuilder = new StringBuilder();
        boolean awaitingStart = true;
        try {
            String line = this.reader.readLine();
            while (line != null) {
                System.out.println(line);
                if (line.equals(START_LINE)) {
                    awaitingStart = false;
                } else {
                    if (line.equals(END_LINE)) {
                        if (awaitingStart) {
                            throw new RuntimeException("Something went wrong. Received response ending before beginning!");
                        }
                        break;
                    }
                    if (!awaitingStart) {
                        stringBuilder.append(line);
                        stringBuilder.append(ls);
                    }
                }
                line = this.reader.readLine();
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        if (awaitingStart) {
            return null;
        }
        String trimmedContent = RFunction.trimOutput(stringBuilder.toString());
        if (trimmedContent == null) {
            return null;
        }
        if ("[]".equals(trimmedContent)) {
            return null;
        }
        return (JSONArray)JSONValue.parseWithException((String)trimmedContent);
    }

    public static String readFile(String file) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(RFunction.class.getResourceAsStream(file)));
        String line = null;
        StringBuilder stringBuilder = new StringBuilder();
        try {
            while ((line = reader.readLine()) != null) {
                stringBuilder.append(line);
                stringBuilder.append(ls);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Could not load resource: " + e);
        }
        return stringBuilder.toString();
    }

    public void cleanup() {
        this.process.destroy();
    }

    public JSONArray coerceTuple(TridentTuple tuple) {
        JSONArray array = new JSONArray();
        array.addAll((Collection)tuple);
        return array;
    }

    public Values coerceResponce(JSONArray array) {
        return new Values(array.toArray());
    }

    public JSONArray performFunction(JSONArray functionInput) {
        try {
            String input = functionInput.toJSONString();
            input = input.replace("\\", "");
            this.rInput.writeBytes("list <- fromJSON('" + input + "')\n");
            this.rInput.writeBytes("output <- " + this.functionName + "(list)\n");
            this.rInput.writeBytes("write('<s>', stdout())\n");
            this.rInput.writeBytes("toJSON(output)\n");
            this.rInput.writeBytes("write('<e>', stdout())\n");
            this.rInput.flush();
            return this.getResult();
        }
        catch (IOException | ParseException e) {
            throw new RuntimeException("Exception handling response from R" + e);
        }
    }

    public void execute(TridentTuple tuple, TridentCollector collector) {
        JSONArray functionInput = this.coerceTuple(tuple);
        JSONArray result = this.performFunction(functionInput);
        if (result != null) {
            collector.emit((List)this.coerceResponce(result));
        }
    }
}

