package org.apache.flink.client.python;

import java.io.File;
import java.io.IOException;
import java.lang.ProcessBuilder;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.util.ResourceUtil;
import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/client/python/PythonDriverEnvUtils.class */
public final class PythonDriverEnvUtils {
    private static final Logger LOG = LoggerFactory.getLogger(PythonDriverEnvUtils.class);

    @VisibleForTesting
    public static final String PYFLINK_PY_FILES = "PYFLINK_PY_FILES";

    @VisibleForTesting
    public static final String PYFLINK_PY_REQUIREMENTS = "PYFLINK_PY_REQUIREMENTS";

    @VisibleForTesting
    public static final String PYFLINK_PY_EXECUTABLE = "PYFLINK_PY_EXECUTABLE";

    @VisibleForTesting
    public static final String PYFLINK_PY_ARCHIVES = "PYFLINK_PY_ARCHIVES";

    /* loaded from: input_file:org/apache/flink/client/python/PythonDriverEnvUtils$PythonEnvironment.class */
    public static class PythonEnvironment {
        public String tempDirectory;
        public String pythonPath;
        public String pythonExec = "python";
        Map<String, String> systemEnv = new HashMap();
    }

    /* loaded from: input_file:org/apache/flink/client/python/PythonDriverEnvUtils$ShutDownPythonHook.class */
    private static class ShutDownPythonHook extends Thread {
        private Process p;
        private String pyFileDir;

        public ShutDownPythonHook(Process process, String str) {
            this.p = process;
            this.pyFileDir = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.p.destroyForcibly();
            if (this.pyFileDir != null) {
                FileUtils.deleteDirectoryQuietly(new File(this.pyFileDir));
            }
        }
    }

    public static PythonEnvironment preparePythonEnvironment(PythonDriverOptions pythonDriverOptions, String str) throws IOException, InterruptedException {
        PythonEnvironment pythonEnvironment = new PythonEnvironment();
        String absolutePath = new File(str).getAbsolutePath();
        Path path = new Path(absolutePath);
        path.getFileSystem().mkdirs(path);
        pythonEnvironment.tempDirectory = absolutePath;
        ArrayList arrayList = new ArrayList();
        for (File file : ResourceUtil.extractBuiltInDependencies(absolutePath, UUID.randomUUID().toString(), true)) {
            arrayList.add(file.getAbsolutePath());
            file.deleteOnExit();
        }
        for (Path path2 : pythonDriverOptions.getPythonLibFiles()) {
            String name = path2.getName();
            Path path3 = new Path(path, String.join(File.separator, UUID.randomUUID().toString(), name));
            if (path2.getFileSystem().isDistributedFS()) {
                FileUtils.copy(path2, path3, true);
            } else {
                new File(path3.getParent().toString()).mkdir();
                createSymbolicLinkForPyflinkLib(Paths.get(new File(path2.getPath()).getAbsolutePath(), new String[0]), Paths.get(path3.toString(), new String[0]));
            }
            if (Files.isRegularFile(Paths.get(path3.toString(), new String[0]).toRealPath(new LinkOption[0]), new LinkOption[0]) && name.endsWith(".py")) {
                arrayList.add(path3.getParent().toString());
            } else {
                arrayList.add(path3.toString());
            }
        }
        pythonEnvironment.pythonPath = String.join(File.pathSeparator, arrayList);
        if (!pythonDriverOptions.getPyFiles().isEmpty()) {
            pythonEnvironment.systemEnv.put(PYFLINK_PY_FILES, String.join("\n", pythonDriverOptions.getPyFiles()));
        }
        if (!pythonDriverOptions.getPyArchives().isEmpty()) {
            pythonEnvironment.systemEnv.put(PYFLINK_PY_ARCHIVES, joinTuples(pythonDriverOptions.getPyArchives()));
        }
        pythonDriverOptions.getPyRequirements().ifPresent(tuple2 -> {
            pythonEnvironment.systemEnv.put(PYFLINK_PY_REQUIREMENTS, joinTuples(Collections.singleton(tuple2)));
        });
        pythonDriverOptions.getPyExecutable().ifPresent(str2 -> {
            pythonEnvironment.systemEnv.put(PYFLINK_PY_EXECUTABLE, pythonDriverOptions.getPyExecutable().get());
        });
        return pythonEnvironment;
    }

    private static String joinTuples(Collection<Tuple2<String, String>> collection) {
        ArrayList arrayList = new ArrayList();
        for (Tuple2<String, String> tuple2 : collection) {
            arrayList.add(String.join("\n", tuple2.f0 == null ? "" : (String) tuple2.f0, tuple2.f1 == null ? "" : (String) tuple2.f1));
        }
        return String.join("\n", arrayList);
    }

    public static void createSymbolicLinkForPyflinkLib(java.nio.file.Path path, java.nio.file.Path path2) throws IOException {
        try {
            Files.createSymbolicLink(path2, path, new FileAttribute[0]);
        } catch (IOException e) {
            LOG.error("Create symbol link for pyflink lib failed.", e);
            LOG.info("Try to copy pyflink lib to working directory");
            Files.copy(path, path2, new CopyOption[0]);
        }
    }

    public static Process startPythonProcess(PythonEnvironment pythonEnvironment, List<String> list) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        Map<String, String> environment = processBuilder.environment();
        environment.put("PYTHONPATH", pythonEnvironment.pythonPath);
        Map<String, String> map = pythonEnvironment.systemEnv;
        environment.getClass();
        map.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        list.add(0, pythonEnvironment.pythonExec);
        processBuilder.command(list);
        processBuilder.redirectErrorStream(true);
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        Process start = processBuilder.start();
        if (!start.isAlive()) {
            throw new RuntimeException("Failed to start Python process. ");
        }
        Runtime.getRuntime().addShutdownHook(new ShutDownPythonHook(start, pythonEnvironment.tempDirectory));
        return start;
    }
}
