package org.apache.zeppelin.spark;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.codehaus.plexus.util.xml.pull.XmlPullParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;

/* loaded from: input_file:org/apache/zeppelin/spark/PySparkInterpreter.class */
public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler {
    Logger logger;
    private GatewayServer gatewayServer;
    private DefaultExecutor executor;
    private int port;
    private InterpreterOutputStream outputStream;
    private BufferedWriter ins;
    private PipedInputStream in;
    private ByteArrayOutputStream input;
    private String scriptPath;
    boolean pythonscriptRunning;
    private static final int MAX_TIMEOUT_SEC = 10;
    PythonInterpretRequest pythonInterpretRequest;
    Integer statementSetNotifier;
    String statementOutput;
    boolean statementError;
    Integer statementFinishedNotifier;
    boolean pythonScriptInitialized;
    Integer pythonScriptInitializeNotifier;

    /* loaded from: input_file:org/apache/zeppelin/spark/PySparkInterpreter$PythonInterpretRequest.class */
    public class PythonInterpretRequest {
        public String statements;
        public String jobGroup;

        public PythonInterpretRequest(String str, String str2) {
            this.statements = str;
            this.jobGroup = str2;
        }

        public String statements() {
            return this.statements;
        }

        public String jobGroup() {
            return this.jobGroup;
        }
    }

    public PySparkInterpreter(Properties properties) {
        super(properties);
        this.logger = LoggerFactory.getLogger((Class<?>) PySparkInterpreter.class);
        this.pythonscriptRunning = false;
        this.pythonInterpretRequest = null;
        this.statementSetNotifier = new Integer(0);
        this.statementOutput = null;
        this.statementError = false;
        this.statementFinishedNotifier = new Integer(0);
        this.pythonScriptInitialized = false;
        this.pythonScriptInitializeNotifier = new Integer(0);
        try {
            this.scriptPath = File.createTempFile("zeppelin_pyspark-", ".py").getAbsolutePath();
        } catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    private void createPythonScript() {
        ClassLoader classLoader = getClass().getClassLoader();
        File file = new File(this.scriptPath);
        if (file.exists() && file.isDirectory()) {
            throw new InterpreterException("Can't create python script " + file.getAbsolutePath());
        }
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            IOUtils.copy(classLoader.getResourceAsStream("python/zeppelin_pyspark.py"), fileOutputStream);
            fileOutputStream.close();
            this.logger.info("File {} created", this.scriptPath);
        } catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.zeppelin.interpreter.Interpreter
    public void open() {
        File[] listFiles;
        SparkDependencyContext dependencyContext;
        List<File> files;
        InterpreterGroup interpreterGroup = getInterpreterGroup();
        if (interpreterGroup != null && interpreterGroup.getInterpreterHookRegistry() != null) {
            registerHook(InterpreterHookRegistry.HookType.POST_EXEC_DEV, "z._displayhook()");
        }
        DepInterpreter depInterpreter = getDepInterpreter();
        URL[] urlArr = new URL[0];
        LinkedList linkedList = new LinkedList();
        if (depInterpreter != null && (dependencyContext = depInterpreter.getDependencyContext()) != null && (files = dependencyContext.getFiles()) != null) {
            Iterator<File> it = files.iterator();
            while (it.hasNext()) {
                try {
                    linkedList.add(it.next().toURI().toURL());
                } catch (MalformedURLException e) {
                    this.logger.error("Error", (Throwable) e);
                }
            }
        }
        String property = getProperty("zeppelin.interpreter.localRepo");
        if (property != null) {
            File file = new File(property);
            if (file.exists() && (listFiles = file.listFiles()) != null) {
                for (File file2 : listFiles) {
                    try {
                        linkedList.add(file2.toURI().toURL());
                    } catch (MalformedURLException e2) {
                        this.logger.error("Error", (Throwable) e2);
                    }
                }
            }
        }
        URL[] urlArr2 = (URL[]) linkedList.toArray(urlArr);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            try {
                Thread.currentThread().setContextClassLoader(new URLClassLoader(urlArr2, contextClassLoader));
                createGatewayServerAndStartScript();
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Exception e3) {
                this.logger.error("Error", (Throwable) e3);
                throw new InterpreterException(e3);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private Map setupPySparkEnv() throws IOException {
        Map<String, String> procEnvironment = EnvironmentUtils.getProcEnvironment();
        if (!procEnvironment.containsKey("PYTHONPATH")) {
            procEnvironment.put("PYTHONPATH", getSparkConf().get("spark.submit.pyFiles").replaceAll(",", TMultiplexedProtocol.SEPARATOR) + ":../interpreter/lib/python");
        }
        if (SparkInterpreter.useSparkSubmit() && !getSparkInterpreter().isYarnMode()) {
            String replace = getSparkConf().get("spark.jars").replace(",", TMultiplexedProtocol.SEPARATOR);
            if (!XmlPullParser.NO_NAMESPACE.equals(replace)) {
                procEnvironment.put("PYTHONPATH", ((Object) procEnvironment.get("PYTHONPATH")) + replace);
            }
        }
        return procEnvironment;
    }

    private void createGatewayServerAndStartScript() {
        createPythonScript();
        this.port = findRandomOpenPortOnAllLocalInterfaces();
        this.gatewayServer = new GatewayServer(this, this.port);
        this.gatewayServer.start();
        CommandLine parse = CommandLine.parse(getProperty("zeppelin.pyspark.python"));
        parse.addArgument(this.scriptPath, false);
        parse.addArgument(Integer.toString(this.port), false);
        parse.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
        this.executor = new DefaultExecutor();
        this.outputStream = new InterpreterOutputStream(this.logger);
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        this.in = null;
        try {
            this.in = new PipedInputStream(pipedOutputStream);
            this.ins = new BufferedWriter(new OutputStreamWriter(pipedOutputStream));
            this.input = new ByteArrayOutputStream();
            this.executor.setStreamHandler(new PumpStreamHandler(this.outputStream, this.outputStream, this.in));
            this.executor.setWatchdog(new ExecuteWatchdog(-1L));
            try {
                this.executor.execute(parse, setupPySparkEnv(), this);
                this.pythonscriptRunning = true;
                try {
                    this.input.write("import sys, getopt\n".getBytes());
                    this.ins.flush();
                } catch (IOException e) {
                    throw new InterpreterException(e);
                }
            } catch (IOException e2) {
                throw new InterpreterException(e2);
            }
        } catch (IOException e3) {
            throw new InterpreterException(e3);
        }
    }

    private int findRandomOpenPortOnAllLocalInterfaces() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            Throwable th = null;
            try {
                try {
                    int localPort = serverSocket.getLocalPort();
                    serverSocket.close();
                    if (serverSocket != null) {
                        if (0 != 0) {
                            try {
                                serverSocket.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            serverSocket.close();
                        }
                    }
                    return localPort;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new InterpreterException(e);
        }
    }

    @Override // org.apache.zeppelin.interpreter.Interpreter
    public void close() {
        this.executor.getWatchdog().destroyProcess();
        new File(this.scriptPath).delete();
        this.gatewayServer.shutdown();
    }

    public PythonInterpretRequest getStatements() {
        PythonInterpretRequest pythonInterpretRequest;
        synchronized (this.statementSetNotifier) {
            while (this.pythonInterpretRequest == null) {
                try {
                    this.statementSetNotifier.wait(1000L);
                } catch (InterruptedException e) {
                }
            }
            pythonInterpretRequest = this.pythonInterpretRequest;
            this.pythonInterpretRequest = null;
        }
        return pythonInterpretRequest;
    }

    public void setStatementsFinished(String str, boolean z) {
        synchronized (this.statementFinishedNotifier) {
            this.statementOutput = str;
            this.statementError = z;
            this.statementFinishedNotifier.notify();
        }
    }

    public void onPythonScriptInitialized() {
        synchronized (this.pythonScriptInitializeNotifier) {
            this.pythonScriptInitialized = true;
            this.pythonScriptInitializeNotifier.notifyAll();
        }
    }

    public void appendOutput(String str) throws IOException {
        this.outputStream.getInterpreterOutput().write(str);
    }

    @Override // org.apache.zeppelin.interpreter.Interpreter
    public InterpreterResult interpret(String str, InterpreterContext interpreterContext) {
        SparkInterpreter sparkInterpreter = getSparkInterpreter();
        sparkInterpreter.populateSparkWebUrl(interpreterContext);
        if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
            return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark " + sparkInterpreter.getSparkVersion().toString() + " is not supported");
        }
        if (!this.pythonscriptRunning) {
            return new InterpreterResult(InterpreterResult.Code.ERROR, "python process not running" + this.outputStream.toString());
        }
        this.outputStream.setInterpreterOutput(interpreterContext.out);
        synchronized (this.pythonScriptInitializeNotifier) {
            long currentTimeMillis = System.currentTimeMillis();
            while (!this.pythonScriptInitialized && this.pythonscriptRunning && System.currentTimeMillis() - currentTimeMillis < 10000) {
                try {
                    this.pythonScriptInitializeNotifier.wait(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
        try {
            interpreterContext.out.flush();
            List<InterpreterResultMessage> interpreterResultMessage = interpreterContext.out.toInterpreterResultMessage();
            if (!this.pythonscriptRunning) {
                interpreterResultMessage.add(new InterpreterResultMessage(InterpreterResult.Type.TEXT, "failed to start pyspark"));
                return new InterpreterResult(InterpreterResult.Code.ERROR, interpreterResultMessage);
            }
            if (!this.pythonScriptInitialized) {
                interpreterResultMessage.add(new InterpreterResultMessage(InterpreterResult.Type.TEXT, "pyspark is not responding"));
                return new InterpreterResult(InterpreterResult.Code.ERROR, interpreterResultMessage);
            }
            if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
                interpreterResultMessage.add(new InterpreterResultMessage(InterpreterResult.Type.TEXT, "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
                return new InterpreterResult(InterpreterResult.Code.ERROR, interpreterResultMessage);
            }
            String jobGroup = sparkInterpreter.getJobGroup(interpreterContext);
            ZeppelinContext zeppelinContext = sparkInterpreter.getZeppelinContext();
            zeppelinContext.setInterpreterContext(interpreterContext);
            zeppelinContext.setGui(interpreterContext.getGui());
            this.pythonInterpretRequest = new PythonInterpretRequest(str, jobGroup);
            this.statementOutput = null;
            synchronized (this.statementSetNotifier) {
                this.statementSetNotifier.notify();
            }
            synchronized (this.statementFinishedNotifier) {
                while (this.statementOutput == null) {
                    try {
                        this.statementFinishedNotifier.wait(1000L);
                    } catch (InterruptedException e2) {
                    }
                }
            }
            if (this.statementError) {
                return new InterpreterResult(InterpreterResult.Code.ERROR, this.statementOutput);
            }
            try {
                interpreterContext.out.flush();
                return new InterpreterResult(InterpreterResult.Code.SUCCESS);
            } catch (IOException e3) {
                throw new InterpreterException(e3);
            }
        } catch (IOException e4) {
            throw new InterpreterException(e4);
        }
    }

    @Override // org.apache.zeppelin.interpreter.Interpreter
    public void cancel(InterpreterContext interpreterContext) {
        getSparkInterpreter().cancel(interpreterContext);
    }

    @Override // org.apache.zeppelin.interpreter.Interpreter
    public Interpreter.FormType getFormType() {
        return Interpreter.FormType.NATIVE;
    }

    @Override // org.apache.zeppelin.interpreter.Interpreter
    public int getProgress(InterpreterContext interpreterContext) {
        return getSparkInterpreter().getProgress(interpreterContext);
    }

    /* JADX WARN: Code restructure failed: missing block: B:30:0x00ab, code lost:
    
        r7.logger.error("pyspark completion didn't have response for {}sec.", (java.lang.Object) 10);
     */
    @Override // org.apache.zeppelin.interpreter.Interpreter
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.util.List<org.apache.zeppelin.interpreter.thrift.InterpreterCompletion> completion(java.lang.String r8, int r9) {
        /*
            Method dump skipped, instructions count: 360
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.zeppelin.spark.PySparkInterpreter.completion(java.lang.String, int):java.util.List");
    }

    private String getCompletionTargetString(String str, int i) {
        String[] strArr = {" ", org.apache.commons.io.IOUtils.LINE_SEPARATOR_UNIX, "\t"};
        int i2 = i;
        try {
            String substring = str.substring(0, i);
            int length = substring.length();
            String sb = new StringBuilder(substring).reverse().toString();
            for (String str2 : strArr) {
                int indexOf = sb.indexOf(str2);
                if (indexOf < i2 && indexOf > 0) {
                    i2 = indexOf;
                }
            }
            return substring.substring(i2 == length ? 0 : length - i2, length);
        } catch (Exception e) {
            this.logger.error(e.toString());
            return null;
        }
    }

    private SparkInterpreter getSparkInterpreter() {
        Object obj;
        LazyOpenInterpreter lazyOpenInterpreter = null;
        Interpreter interpreterInTheSameSessionByClassName = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
        while (true) {
            obj = interpreterInTheSameSessionByClassName;
            if (!(obj instanceof WrappedInterpreter)) {
                break;
            }
            if (obj instanceof LazyOpenInterpreter) {
                lazyOpenInterpreter = (LazyOpenInterpreter) obj;
            }
            interpreterInTheSameSessionByClassName = ((WrappedInterpreter) obj).getInnerInterpreter();
        }
        SparkInterpreter sparkInterpreter = (SparkInterpreter) obj;
        if (lazyOpenInterpreter != null) {
            lazyOpenInterpreter.open();
        }
        return sparkInterpreter;
    }

    public ZeppelinContext getZeppelinContext() {
        if (getSparkInterpreter() != null) {
            return getSparkInterpreter().getZeppelinContext();
        }
        return null;
    }

    public JavaSparkContext getJavaSparkContext() {
        SparkInterpreter sparkInterpreter = getSparkInterpreter();
        if (sparkInterpreter == null) {
            return null;
        }
        return new JavaSparkContext(sparkInterpreter.getSparkContext());
    }

    public Object getSparkSession() {
        SparkInterpreter sparkInterpreter = getSparkInterpreter();
        if (sparkInterpreter == null) {
            return null;
        }
        return sparkInterpreter.getSparkSession();
    }

    public SparkConf getSparkConf() {
        if (getJavaSparkContext() == null) {
            return null;
        }
        return getJavaSparkContext().getConf();
    }

    public SQLContext getSQLContext() {
        SparkInterpreter sparkInterpreter = getSparkInterpreter();
        if (sparkInterpreter == null) {
            return null;
        }
        return sparkInterpreter.getSQLContext();
    }

    private DepInterpreter getDepInterpreter() {
        Interpreter interpreterInTheSameSessionByClassName = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
        if (interpreterInTheSameSessionByClassName == null) {
            return null;
        }
        while (interpreterInTheSameSessionByClassName instanceof WrappedInterpreter) {
            interpreterInTheSameSessionByClassName = ((WrappedInterpreter) interpreterInTheSameSessionByClassName).getInnerInterpreter();
        }
        return (DepInterpreter) interpreterInTheSameSessionByClassName;
    }

    @Override // org.apache.commons.exec.ExecuteResultHandler
    public void onProcessComplete(int i) {
        this.pythonscriptRunning = false;
        this.logger.info("python process terminated. exit code " + i);
    }

    @Override // org.apache.commons.exec.ExecuteResultHandler
    public void onProcessFailed(ExecuteException executeException) {
        this.pythonscriptRunning = false;
        this.logger.error("python process failed", (Throwable) executeException);
    }
}
