package org.apache.flink.client.python;

import java.io.File;
import java.io.IOException;
import java.lang.ProcessBuilder;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.CopyOption;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.api.python.shaded.py4j.CallbackClient;
import org.apache.flink.api.python.shaded.py4j.Gateway;
import org.apache.flink.api.python.shaded.py4j.GatewayServer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.PythonDependencyUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/client/python/PythonEnvUtils.class */
final class PythonEnvUtils {
    static final String PYFLINK_CLIENT_EXECUTABLE = "PYFLINK_CLIENT_EXECUTABLE";
    private static final Logger LOG = LoggerFactory.getLogger(PythonEnvUtils.class);
    private static GatewayServer gatewayServer = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/client/python/PythonEnvUtils$PythonEnvironment.class */
    public static class PythonEnvironment {
        String tempDirectory;
        String pythonExec;
        String pythonPath;
        Map<String, String> systemEnv;

        PythonEnvironment() {
            this.pythonExec = OperatingSystem.isWindows() ? "python.exe" : "python";
            this.systemEnv = new HashMap();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/client/python/PythonEnvUtils$ShutDownPythonHook.class */
    public static class ShutDownPythonHook extends Thread {
        private Process p;
        private String pyFileDir;

        ShutDownPythonHook(Process process, String str) {
            this.p = process;
            this.pyFileDir = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.p.destroyForcibly();
            if (this.pyFileDir != null) {
                FileUtils.deleteDirectoryQuietly(new File(this.pyFileDir));
            }
        }
    }

    PythonEnvUtils() {
    }

    static PythonEnvironment preparePythonEnvironment(ReadableConfig readableConfig, String str, String str2) throws IOException {
        PythonEnvironment pythonEnvironment = new PythonEnvironment();
        String str3 = (String) readableConfig.getOptional(PythonOptions.PYTHON_CLIENT_EXECUTABLE).orElse(System.getenv(PYFLINK_CLIENT_EXECUTABLE));
        if (str3 != null) {
            pythonEnvironment.pythonExec = str3;
        }
        String absolutePath = new File(str2).getAbsolutePath();
        Path path = new Path(absolutePath);
        path.getFileSystem().mkdirs(path);
        pythonEnvironment.tempDirectory = absolutePath;
        if (System.getenv("FLINK_OPT_DIR") != null) {
            pythonEnvironment.pythonPath = (String) getLibFiles(System.getenv("FLINK_OPT_DIR") + File.separator + "python").stream().map(path2 -> {
                return path2.toFile().getAbsolutePath();
            }).collect(Collectors.joining(File.pathSeparator));
        }
        if (readableConfig.getOptional(PythonOptions.PYTHON_FILES).isPresent()) {
            addToPythonPath(pythonEnvironment, (List) Arrays.stream(((String) readableConfig.get(PythonOptions.PYTHON_FILES)).split(PythonDependencyUtils.FILE_DELIMITER)).map(Path::new).collect(Collectors.toList()));
        }
        if (str != null) {
            addToPythonPath(pythonEnvironment, Collections.singletonList(new Path(str)));
        }
        return pythonEnvironment;
    }

    private static void createSymbolicLink(java.nio.file.Path path, java.nio.file.Path path2) throws IOException {
        try {
            Files.createSymbolicLink(path2, path, new FileAttribute[0]);
        } catch (IOException e) {
            LOG.warn("Create symbol link from {} to {} failed and copy instead.", new Object[]{path2, path, e});
            Files.copy(path, path2, new CopyOption[0]);
        }
    }

    private static List<java.nio.file.Path> getLibFiles(String str) {
        final ArrayList arrayList = new ArrayList();
        try {
            Files.walkFileTree(FileSystems.getDefault().getPath(str, new String[0]), new SimpleFileVisitor<java.nio.file.Path>() { // from class: org.apache.flink.client.python.PythonEnvUtils.1
                @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                public FileVisitResult visitFile(java.nio.file.Path path, BasicFileAttributes basicFileAttributes) throws IOException {
                    if (path.toString().endsWith(".zip")) {
                        arrayList.add(path);
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
        } catch (IOException e) {
            LOG.error("Gets pyflink dependent libs failed.", e);
        }
        return arrayList;
    }

    private static void addToPythonPath(PythonEnvironment pythonEnvironment, List<Path> list) throws IOException {
        ArrayList arrayList = new ArrayList();
        Path path = new Path(pythonEnvironment.tempDirectory);
        for (Path path2 : list) {
            String name = path2.getName();
            Path path3 = new Path(path, String.join(File.separator, UUID.randomUUID().toString(), name));
            if (path2.getFileSystem().isDistributedFS()) {
                try {
                    FileUtils.copy(path2, path3, true);
                } catch (Exception e) {
                    LOG.error("Error occurred when copying {} to {}, skipping...", new Object[]{path2, path3, e});
                }
            } else {
                new File(path3.getParent().toString()).mkdir();
                createSymbolicLink(Paths.get(new File(path2.getPath()).getAbsolutePath(), new String[0]), Paths.get(path3.toString(), new String[0]));
            }
            if (Files.isRegularFile(Paths.get(path3.toString(), new String[0]).toRealPath(new LinkOption[0]), new LinkOption[0]) && name.endsWith(".py")) {
                arrayList.add(path3.getParent().toString());
            } else {
                arrayList.add(path3.toString());
            }
        }
        if (pythonEnvironment.pythonPath != null && !pythonEnvironment.pythonPath.isEmpty()) {
            arrayList.add(pythonEnvironment.pythonPath);
        }
        pythonEnvironment.pythonPath = String.join(File.pathSeparator, arrayList);
    }

    static Process startPythonProcess(PythonEnvironment pythonEnvironment, List<String> list) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        Map<String, String> environment = processBuilder.environment();
        if (pythonEnvironment.pythonPath != null) {
            environment.put("PYTHONPATH", pythonEnvironment.pythonPath);
        }
        Map<String, String> map = pythonEnvironment.systemEnv;
        environment.getClass();
        map.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        list.add(0, pythonEnvironment.pythonExec);
        processBuilder.command(list);
        processBuilder.redirectErrorStream(true);
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        Process start = processBuilder.start();
        if (!start.isAlive()) {
            throw new RuntimeException("Failed to start Python process. ");
        }
        Runtime.getRuntime().addShutdownHook(new ShutDownPythonHook(start, pythonEnvironment.tempDirectory));
        return start;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static GatewayServer startGatewayServer() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                GatewayServer build = new GatewayServer.GatewayServerBuilder().gateway(new Gateway(new ConcurrentHashMap(), new CallbackClient(NetUtils.getAvailablePort()))).javaPort(0).build();
                resetCallbackClientExecutorService(build);
                completableFuture.complete(build);
                build.start(true);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        thread.setName("org.apache.flink.api.python.shaded.py4j-gateway");
        thread.setDaemon(true);
        thread.start();
        thread.join();
        return (GatewayServer) completableFuture.get();
    }

    private static void resetCallbackClientExecutorService(GatewayServer gatewayServer2) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
        CallbackClient callbackClient = (CallbackClient) gatewayServer2.getCallbackClient();
        Field declaredField = CallbackClient.class.getDeclaredField("executor");
        declaredField.setAccessible(true);
        ((ScheduledExecutorService) declaredField.get(callbackClient)).shutdown();
        declaredField.set(callbackClient, Executors.newScheduledThreadPool(1, Thread::new));
        Method declaredMethod = CallbackClient.class.getDeclaredMethod("setupCleaner", new Class[0]);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(callbackClient, new Object[0]);
    }

    public static void resetCallbackClient(String str, int i) throws UnknownHostException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException {
        gatewayServer = getGatewayServer();
        gatewayServer.resetCallbackClient(InetAddress.getByName(str), i);
        resetCallbackClientExecutorService(gatewayServer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static GatewayServer getGatewayServer() {
        return gatewayServer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setGatewayServer(GatewayServer gatewayServer2) {
        Preconditions.checkArgument(gatewayServer2 == null || gatewayServer == null);
        gatewayServer = gatewayServer2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Process launchPy4jPythonClient(GatewayServer gatewayServer2, ReadableConfig readableConfig, List<String> list, String str, String str2) throws IOException {
        PythonEnvironment preparePythonEnvironment = preparePythonEnvironment(readableConfig, str, str2);
        preparePythonEnvironment.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer2.getListeningPort()));
        return startPythonProcess(preparePythonEnvironment, list);
    }
}
