package org.apache.zeppelin.spark;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.codehaus.plexus.classworlds.launcher.ConfigurationParser;
import org.codehaus.plexus.util.xml.pull.XmlPullParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/spark/ZeppelinR.class */
public class ZeppelinR implements ExecuteResultHandler {
    private final String rCmdPath;
    private final SparkVersion sparkVersion;
    private DefaultExecutor executor;
    private InterpreterOutputStream outputStream;
    private PipedOutputStream input;
    private final String scriptPath;
    private final String libPath;
    static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap(new HashMap());
    private InterpreterOutput initialOutput;
    private final int port;
    private boolean rScriptRunning;
    Logger logger = LoggerFactory.getLogger((Class<?>) ZeppelinR.class);
    boolean rScriptInitialized = false;
    Integer rScriptInitializeNotifier = new Integer(0);
    Request rRequestObject = null;
    Integer rRequestNotifier = new Integer(0);
    Object rResponseValue = null;
    boolean rResponseError = false;
    Integer rResponseNotifier = new Integer(0);

    /* loaded from: input_file:org/apache/zeppelin/spark/ZeppelinR$Request.class */
    public static class Request {
        String type;
        String stmt;
        Object value;

        public Request(String str, String str2, Object obj) {
            this.type = str;
            this.stmt = str2;
            this.value = obj;
        }

        public String getType() {
            return this.type;
        }

        public String getStmt() {
            return this.stmt;
        }

        public Object getValue() {
            return this.value;
        }
    }

    public ZeppelinR(String str, String str2, int i, SparkVersion sparkVersion) {
        this.rCmdPath = str;
        this.libPath = str2;
        this.sparkVersion = sparkVersion;
        this.port = i;
        try {
            this.scriptPath = File.createTempFile("zeppelin_sparkr-", ".R").getAbsolutePath();
        } catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    public void open() throws IOException {
        createRScript();
        zeppelinR.put(Integer.valueOf(hashCode()), this);
        CommandLine parse = CommandLine.parse(this.rCmdPath);
        parse.addArgument("--no-save");
        parse.addArgument("--no-restore");
        parse.addArgument("-f");
        parse.addArgument(this.scriptPath);
        parse.addArgument("--args");
        parse.addArgument(Integer.toString(hashCode()));
        parse.addArgument(Integer.toString(this.port));
        parse.addArgument(this.libPath);
        parse.addArgument(Integer.toString(this.sparkVersion.toNumber()));
        this.logger.debug(parse.toString());
        this.executor = new DefaultExecutor();
        this.outputStream = new InterpreterOutputStream(this.logger);
        this.input = new PipedOutputStream();
        PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(this.outputStream, this.outputStream, new PipedInputStream(this.input));
        this.executor.setWatchdog(new ExecuteWatchdog(-1L));
        this.executor.setStreamHandler(pumpStreamHandler);
        Map<String, String> procEnvironment = EnvironmentUtils.getProcEnvironment();
        this.initialOutput = new InterpreterOutput(null);
        this.outputStream.setInterpreterOutput(this.initialOutput);
        this.executor.execute(parse, procEnvironment, this);
        this.rScriptRunning = true;
        eval("cat('')");
    }

    public Object eval(String str) {
        Object request;
        synchronized (this) {
            this.rRequestObject = new Request("eval", str, null);
            request = request();
        }
        return request;
    }

    public void set(String str, Object obj) {
        synchronized (this) {
            this.rRequestObject = new Request(ConfigurationParser.SET_PREFIX, str, obj);
            request();
        }
    }

    public Object get(String str) {
        Object request;
        synchronized (this) {
            this.rRequestObject = new Request("get", str, null);
            request = request();
        }
        return request;
    }

    public String getS0(String str) {
        String str2;
        synchronized (this) {
            this.rRequestObject = new Request("getS", str, null);
            str2 = (String) request();
        }
        return str2;
    }

    private Object request() throws RuntimeException {
        Object obj;
        if (!this.rScriptRunning) {
            throw new RuntimeException("r repl is not running");
        }
        if (!this.rScriptInitialized) {
            waitForRScriptInitialized();
        }
        this.rResponseValue = null;
        synchronized (this.rRequestNotifier) {
            this.rRequestNotifier.notify();
        }
        synchronized (this.rResponseNotifier) {
            while (this.rResponseValue == null && this.rScriptRunning) {
                try {
                    this.rResponseNotifier.wait(1000L);
                } catch (InterruptedException e) {
                    this.logger.error(e.getMessage(), (Throwable) e);
                }
            }
            obj = this.rResponseValue;
            this.rResponseValue = null;
        }
        if (this.rResponseError) {
            throw new RuntimeException(obj.toString());
        }
        return obj;
    }

    private void waitForRScriptInitialized() throws InterpreterException {
        synchronized (this.rScriptInitializeNotifier) {
            long nanoTime = System.nanoTime();
            while (!this.rScriptInitialized && this.rScriptRunning && System.nanoTime() - nanoTime < 10000000000L) {
                try {
                    this.rScriptInitializeNotifier.wait(1000L);
                } catch (InterruptedException e) {
                    this.logger.error(e.getMessage(), (Throwable) e);
                }
            }
        }
        String str = XmlPullParser.NO_NAMESPACE;
        try {
            this.initialOutput.flush();
            str = new String(this.initialOutput.toByteArray());
        } catch (IOException e2) {
            e2.printStackTrace();
        }
        if (!this.rScriptInitialized) {
            throw new InterpreterException("sparkr is not responding " + str);
        }
    }

    public Request getRequest() {
        Request request;
        synchronized (this.rRequestNotifier) {
            while (this.rRequestObject == null) {
                try {
                    this.rRequestNotifier.wait(1000L);
                } catch (InterruptedException e) {
                    this.logger.error(e.getMessage(), (Throwable) e);
                }
            }
            request = this.rRequestObject;
            this.rRequestObject = null;
        }
        return request;
    }

    public void setResponse(Object obj, boolean z) {
        synchronized (this.rResponseNotifier) {
            this.rResponseValue = obj;
            this.rResponseError = z;
            this.rResponseNotifier.notify();
        }
    }

    public void onScriptInitialized() {
        synchronized (this.rScriptInitializeNotifier) {
            this.rScriptInitialized = true;
            this.rScriptInitializeNotifier.notifyAll();
        }
    }

    private void createRScript() {
        ClassLoader classLoader = getClass().getClassLoader();
        File file = new File(this.scriptPath);
        if (file.exists() && file.isDirectory()) {
            throw new InterpreterException("Can't create r script " + file.getAbsolutePath());
        }
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            IOUtils.copy(classLoader.getResourceAsStream("R/zeppelin_sparkr.R"), fileOutputStream);
            fileOutputStream.close();
            this.logger.info("File {} created", this.scriptPath);
        } catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    public void close() {
        this.executor.getWatchdog().destroyProcess();
        new File(this.scriptPath).delete();
        zeppelinR.remove(Integer.valueOf(hashCode()));
    }

    public static ZeppelinR getZeppelinR(int i) {
        return zeppelinR.get(Integer.valueOf(i));
    }

    public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
        this.outputStream.setInterpreterOutput(interpreterOutput);
    }

    @Override // org.apache.commons.exec.ExecuteResultHandler
    public void onProcessComplete(int i) {
        this.logger.info("process complete {}", Integer.valueOf(i));
        this.rScriptRunning = false;
    }

    @Override // org.apache.commons.exec.ExecuteResultHandler
    public void onProcessFailed(ExecuteException executeException) {
        this.logger.error(executeException.getMessage(), (Throwable) executeException);
        this.rScriptRunning = false;
    }
}
