package org.apache.livy.rsc;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Promise;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.Thread;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.livy.client.common.TestUtils;
import org.apache.livy.rsc.BaseProtocol;
import org.apache.livy.rsc.RSCConf;
import org.apache.livy.rsc.driver.RSCDriverBootstrapper;
import org.apache.livy.rsc.rpc.Rpc;
import org.apache.livy.rsc.rpc.RpcDispatcher;
import org.apache.livy.rsc.rpc.RpcServer;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/livy/rsc/ContextLauncher.class */
public class ContextLauncher {
    private static final Logger LOG = LoggerFactory.getLogger(ContextLauncher.class);
    private static final AtomicInteger CHILD_IDS = new AtomicInteger();
    private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
    private static final String SPARK_JARS_KEY = "spark.jars";
    private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
    private static final String SPARK_HOME_ENV = "SPARK_HOME";
    private final Promise<ContextInfo> promise;
    private final ScheduledFuture<?> timeout;
    private final String clientId = UUID.randomUUID().toString();
    private final String secret;
    private final ChildProcess child;
    private final RSCConf conf;
    private final RSCClientFactory factory;
    static Process mockSparkSubmit;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/livy/rsc/ContextLauncher$ChildProcess.class */
    public static class ChildProcess {
        private final RSCConf conf;
        private final Promise<?> promise;
        private final Process child;
        private final Thread monitor;
        private final File confFile;

        public ChildProcess(RSCConf rSCConf, Promise<?> promise, Runnable runnable, File file) {
            this.conf = rSCConf;
            this.promise = promise;
            this.monitor = monitor(runnable, ContextLauncher.CHILD_IDS.incrementAndGet());
            this.child = null;
            this.confFile = file;
        }

        public ChildProcess(RSCConf rSCConf, Promise<?> promise, Process process, File file) {
            int incrementAndGet = ContextLauncher.CHILD_IDS.incrementAndGet();
            this.conf = rSCConf;
            this.promise = promise;
            this.child = process;
            this.confFile = file;
            this.monitor = monitor(new Runnable() { // from class: org.apache.livy.rsc.ContextLauncher.ChildProcess.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        int waitFor = ChildProcess.this.child.waitFor();
                        if (waitFor != 0) {
                            ContextLauncher.LOG.warn("Child process exited with code {}.", Integer.valueOf(waitFor));
                            ChildProcess.this.fail(new IOException(String.format("Child process exited with code %d.", Integer.valueOf(waitFor))));
                        }
                    } catch (InterruptedException e) {
                        ContextLauncher.LOG.warn("Waiting thread interrupted, killing child process.");
                        Thread.interrupted();
                        ChildProcess.this.child.destroy();
                    } catch (Exception e2) {
                        ContextLauncher.LOG.warn("Exception while waiting for child process.", e2);
                    }
                }
            }, incrementAndGet);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fail(Throwable th) {
            this.promise.tryFailure(th);
        }

        public void kill() {
            if (this.child != null) {
                this.child.destroy();
            }
            this.monitor.interrupt();
            detach();
            if (this.monitor.isAlive() && this.monitor.isAlive()) {
                ContextLauncher.LOG.warn("Timed out shutting down remote driver, interrupting...");
                this.monitor.interrupt();
            }
        }

        public void detach() {
            try {
                this.monitor.join(this.conf.getTimeAsMs(RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT));
            } catch (InterruptedException e) {
                ContextLauncher.LOG.debug("Interrupted before driver thread was finished.");
            }
        }

        private Thread monitor(final Runnable runnable, int i) {
            Thread thread = new Thread(new Runnable() { // from class: org.apache.livy.rsc.ContextLauncher.ChildProcess.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        runnable.run();
                        ChildProcess.this.confFile.delete();
                    } catch (Throwable th) {
                        ChildProcess.this.confFile.delete();
                        throw th;
                    }
                }
            });
            thread.setDaemon(true);
            thread.setName("ContextLauncher-" + i);
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.livy.rsc.ContextLauncher.ChildProcess.3
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread2, Throwable th) {
                    ContextLauncher.LOG.warn("Child task threw exception.", th);
                    ChildProcess.this.fail(th);
                }
            });
            thread.start();
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/livy/rsc/ContextLauncher$RegistrationHandler.class */
    public class RegistrationHandler extends BaseProtocol implements RpcServer.ClientCallback {
        volatile BaseProtocol.RemoteDriverAddress driverAddress;
        private Rpc client;

        private RegistrationHandler() {
        }

        @Override // org.apache.livy.rsc.rpc.RpcServer.ClientCallback
        public RpcDispatcher onNewClient(Rpc rpc) {
            ContextLauncher.LOG.debug("New RPC client connected from {}.", rpc.getChannel());
            this.client = rpc;
            return this;
        }

        @Override // org.apache.livy.rsc.rpc.RpcServer.ClientCallback
        public void onSaslComplete(Rpc rpc) {
        }

        void dispose() {
            if (this.client != null) {
                this.client.close();
            }
        }

        private void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.RemoteDriverAddress remoteDriverAddress) {
            if (ContextLauncher.this.promise.trySuccess(new ContextInfo(remoteDriverAddress.host, remoteDriverAddress.port, ContextLauncher.this.clientId, ContextLauncher.this.secret))) {
                ContextLauncher.this.timeout.cancel(true);
                ContextLauncher.LOG.debug("Received driver info for client {}: {}/{}.", new Object[]{this.client.getChannel(), remoteDriverAddress.host, Integer.valueOf(remoteDriverAddress.port)});
            } else {
                ContextLauncher.LOG.warn("Connection established but promise is already finalized.");
            }
            channelHandlerContext.executor().submit(new Runnable() { // from class: org.apache.livy.rsc.ContextLauncher.RegistrationHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    RegistrationHandler.this.dispose();
                    ContextLauncher.this.dispose(false);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DriverProcessInfo create(RSCClientFactory rSCClientFactory, RSCConf rSCConf) throws IOException {
        ContextLauncher contextLauncher = new ContextLauncher(rSCClientFactory, rSCConf);
        return new DriverProcessInfo(contextLauncher.promise, contextLauncher.child.child);
    }

    private ContextLauncher(RSCClientFactory rSCClientFactory, RSCConf rSCConf) throws IOException {
        this.promise = rSCClientFactory.getServer().getEventLoopGroup().next().newPromise();
        this.secret = rSCClientFactory.getServer().createSecret();
        this.conf = rSCConf;
        this.factory = rSCClientFactory;
        final RegistrationHandler registrationHandler = new RegistrationHandler();
        try {
            rSCClientFactory.getServer().registerClient(this.clientId, this.secret, registrationHandler);
            String str = rSCConf.get("repl");
            boolean z = str != null && str.equals("true");
            rSCConf.set(RSCConf.Entry.LAUNCHER_ADDRESS, rSCClientFactory.getServer().getAddress());
            rSCConf.set(RSCConf.Entry.LAUNCHER_PORT, Integer.valueOf(rSCClientFactory.getServer().getPort()));
            rSCConf.set(RSCConf.Entry.CLIENT_ID, this.clientId);
            rSCConf.set(RSCConf.Entry.CLIENT_SECRET, this.secret);
            Utils.addListener(this.promise, new FutureListener<ContextInfo>() { // from class: org.apache.livy.rsc.ContextLauncher.1
                @Override // org.apache.livy.rsc.FutureListener
                public void onFailure(Throwable th) throws Exception {
                    if (ContextLauncher.this.child != null) {
                        ContextLauncher.this.child.kill();
                    }
                }
            });
            this.child = startDriver(rSCConf, this.promise);
            this.timeout = rSCClientFactory.getServer().getEventLoopGroup().schedule(new Runnable() { // from class: org.apache.livy.rsc.ContextLauncher.2
                @Override // java.lang.Runnable
                public void run() {
                    ContextLauncher.this.connectTimeout(registrationHandler);
                }
            }, rSCConf.getTimeAsMs(RSCConf.Entry.RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            dispose(true);
            throw Utils.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connectTimeout(RegistrationHandler registrationHandler) {
        if (this.promise.tryFailure(new TimeoutException("Timed out waiting for context to start."))) {
            registrationHandler.dispose();
        }
        dispose(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispose(boolean z) {
        this.factory.getServer().unregisterClient(this.clientId);
        try {
            if (this.child != null) {
                if (z) {
                    this.child.kill();
                } else {
                    this.child.detach();
                }
            }
        } finally {
            this.factory.unref();
        }
    }

    private static ChildProcess startDriver(RSCConf rSCConf, Promise<?> promise) throws IOException {
        String jacocoArgs;
        String str = rSCConf.get(RSCConf.Entry.LIVY_JARS);
        if (str == null) {
            String str2 = System.getenv("LIVY_HOME");
            Utils.checkState(str2 != null, "Need one of LIVY_HOME or %s set.", RSCConf.Entry.LIVY_JARS.key());
            File file = new File(str2, "rsc-jars");
            if (!file.isDirectory()) {
                file = new File(str2, "rsc/target/jars");
            }
            Utils.checkState(file.isDirectory(), "Cannot find 'client-jars' directory under LIVY_HOME.", new Object[0]);
            ArrayList arrayList = new ArrayList();
            for (File file2 : file.listFiles()) {
                arrayList.add(file2.getAbsolutePath());
            }
            str = Utils.join(arrayList, ",");
        }
        merge(rSCConf, SPARK_JARS_KEY, str, ",");
        String str3 = rSCConf.get(RSCConf.Entry.SESSION_KIND);
        if ("sparkr".equals(str3)) {
            merge(rSCConf, SPARK_ARCHIVES_KEY, rSCConf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
        } else if ("pyspark".equals(str3)) {
            merge(rSCConf, "spark.submit.pyFiles", rSCConf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
        }
        rSCConf.set("spark.yarn.maxAppAttempts", "1");
        rSCConf.set("spark.yarn.submit.waitAppCompletion", "false");
        if (!rSCConf.getBoolean(RSCConf.Entry.CLIENT_IN_PROCESS) && !rSCConf.getBoolean(RSCConf.Entry.TEST_STUCK_END_SESSION) && (jacocoArgs = TestUtils.getJacocoArgs()) != null) {
            merge(rSCConf, "spark.driver.extraJavaOptions", jacocoArgs, " ");
        }
        final File writeConfToFile = writeConfToFile(rSCConf);
        if (mockSparkSubmit != null) {
            LOG.warn("!!!! Using mock spark-submit. !!!!");
            return new ChildProcess(rSCConf, promise, mockSparkSubmit, writeConfToFile);
        }
        if (rSCConf.getBoolean(RSCConf.Entry.CLIENT_IN_PROCESS)) {
            LOG.warn("!!!! Running remote driver in-process. !!!!");
            return new ChildProcess(rSCConf, promise, new Runnable() { // from class: org.apache.livy.rsc.ContextLauncher.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        RSCDriverBootstrapper.main(new String[]{writeConfToFile.getAbsolutePath()});
                    } catch (Exception e) {
                        throw Utils.propagate(e);
                    }
                }
            }, writeConfToFile);
        }
        SparkLauncher sparkLauncher = new SparkLauncher();
        String str4 = rSCConf.get(SPARK_DEPLOY_MODE);
        if (str4 != null) {
            sparkLauncher.setDeployMode(str4);
        }
        sparkLauncher.setSparkHome(System.getenv(SPARK_HOME_ENV));
        sparkLauncher.setAppResource("spark-internal");
        sparkLauncher.setPropertiesFile(writeConfToFile.getAbsolutePath());
        sparkLauncher.setMainClass(RSCDriverBootstrapper.class.getName());
        if (rSCConf.get(RSCConf.Entry.PROXY_USER) != null) {
            sparkLauncher.addSparkArg("--proxy-user", rSCConf.get(RSCConf.Entry.PROXY_USER));
        }
        return new ChildProcess(rSCConf, promise, sparkLauncher.launch(), writeConfToFile);
    }

    private static void merge(RSCConf rSCConf, String str, String str2, String str3) {
        rSCConf.set(str, Utils.join(Arrays.asList(str2, rSCConf.get(str)), str3));
    }

    private static File writeConfToFile(RSCConf rSCConf) throws IOException {
        Properties properties = new Properties();
        Iterator it = rSCConf.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String str = (String) entry.getKey();
            if (!str.startsWith(RSCConf.SPARK_CONF_PREFIX)) {
                str = RSCConf.LIVY_SPARK_PREFIX + str;
            }
            properties.setProperty(str, (String) entry.getValue());
        }
        String str2 = System.getenv("SPARK_CONF_DIR");
        if (str2 == null && System.getenv(SPARK_HOME_ENV) != null) {
            str2 = System.getenv(SPARK_HOME_ENV) + File.separator + "conf";
        }
        if (str2 != null) {
            File file = new File(str2 + File.separator + "spark-defaults.conf");
            if (file.isFile()) {
                Properties properties2 = new Properties();
                InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8);
                try {
                    properties2.load(inputStreamReader);
                    inputStreamReader.close();
                    for (String str3 : properties2.stringPropertyNames()) {
                        if (!properties.containsKey(str3)) {
                            properties.put(str3, properties2.getProperty(str3));
                        }
                    }
                } catch (Throwable th) {
                    inputStreamReader.close();
                    throw th;
                }
            }
        }
        File createTempFile = File.createTempFile("livyConf", ".properties");
        Files.setPosixFilePermissions(createTempFile.toPath(), EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE));
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new FileOutputStream(createTempFile), StandardCharsets.UTF_8);
        try {
            properties.store(outputStreamWriter, "Livy App Context Configuration");
            outputStreamWriter.close();
            return createTempFile;
        } catch (Throwable th2) {
            outputStreamWriter.close();
            throw th2;
        }
    }
}
