package org.apache.pulsar.functions.runtime.process;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback;
import org.apache.pulsar.shade.com.google.common.util.concurrent.Futures;
import org.apache.pulsar.shade.com.google.common.util.concurrent.MoreExecutors;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.protobuf.Empty;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/process/ProcessRuntime.class */
class ProcessRuntime implements Runtime {
    private static final Logger log = LoggerFactory.getLogger(ProcessRuntime.class);
    private Process process;
    private List<String> processArgs;
    private int instancePort;
    private int metricsPort;
    private Throwable deathException;
    private ManagedChannel channel;
    private InstanceControlGrpc.InstanceControlFutureStub stub;
    private ScheduledFuture timer;
    private InstanceConfig instanceConfig;
    private final Long expectedHealthCheckInterval;
    private final SecretsProviderConfigurator secretsProviderConfigurator;
    private final String extraDependenciesDir;
    private final String narExtractionDirectory;
    private static final long GRPC_TIMEOUT_SECS = 5;
    private final String funcLogDir;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProcessRuntime(InstanceConfig instanceConfig, String str, String str2, String str3, String str4, String str5, String str6, String str7, AuthenticationConfig authenticationConfig, SecretsProviderConfigurator secretsProviderConfigurator, Long l, String str8) throws Exception {
        this.instanceConfig = instanceConfig;
        this.instancePort = instanceConfig.getPort();
        this.metricsPort = instanceConfig.getMetricsPort();
        this.expectedHealthCheckInterval = l;
        this.secretsProviderConfigurator = secretsProviderConfigurator;
        this.funcLogDir = RuntimeUtils.genFunctionLogFolder(str4, instanceConfig);
        String str9 = null;
        String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
        String json = secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null ? new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails())) : null;
        switch (instanceConfig.getFunctionDetails().getRuntime()) {
            case JAVA:
                String property = System.getProperty("pulsar.functions.log.conf");
                if (log.isDebugEnabled()) {
                    log.debug("The loaded value of pulsar.functions.log.conf is {}", property);
                }
                if (property != null && Files.exists(Paths.get(property, new String[0]), new LinkOption[0])) {
                    str9 = property;
                    break;
                } else {
                    str9 = "java_instance_log4j2.xml";
                    break;
                }
                break;
            case PYTHON:
                str9 = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
                break;
        }
        this.extraDependenciesDir = str2;
        this.narExtractionDirectory = str3;
        this.processArgs = RuntimeUtils.composeCmd(instanceConfig, str, Function.FunctionDetails.Runtime.JAVA == instanceConfig.getFunctionDetails().getRuntime() ? str2 : null, str4, str5, str6, str7, authenticationConfig, instanceConfig.getInstanceName(), Integer.valueOf(instanceConfig.getPort()), l, str9, secretsProviderClassName, json, false, null, null, str3, null, str8);
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public void start() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.process.destroy();
        }));
        log.info("Creating function log directory {}", this.funcLogDir);
        try {
            Files.createDirectories(Paths.get(this.funcLogDir, new String[0]), new FileAttribute[0]);
            log.info("Created or found function log directory {}", this.funcLogDir);
            startProcess();
            if (this.channel == null && this.stub == null) {
                this.channel = ManagedChannelBuilder.forAddress("127.0.0.1", this.instancePort).usePlaintext().build();
                this.stub = InstanceControlGrpc.newFutureStub(this.channel);
                this.timer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
                    try {
                        healthCheck().get();
                    } catch (Exception e) {
                        log.error("Health check failed for {}-{}", new Object[]{this.instanceConfig.getFunctionDetails().getName(), Integer.valueOf(this.instanceConfig.getInstanceId()), e});
                    }
                }, this.expectedHealthCheckInterval.longValue(), this.expectedHealthCheckInterval.longValue(), TimeUnit.SECONDS);
            }
        } catch (IOException e) {
            log.info("Exception when creating log folder : {}", this.funcLogDir, e);
            throw new RuntimeException("Log folder creation error");
        }
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public void join() throws Exception {
        this.process.waitFor();
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public void stop() throws InterruptedException {
        if (this.timer != null) {
            this.timer.cancel(false);
        }
        if (this.channel != null) {
            this.channel.shutdown();
        }
        this.channel = null;
        this.stub = null;
        if (this.process != null) {
            this.process.destroy();
            int i = 0;
            while (this.process.isAlive()) {
                Thread.sleep(100L);
                if (i > 100) {
                    break;
                } else {
                    i++;
                }
            }
            if (this.process.isAlive()) {
                log.warn("Process for instance {} did not exit within timeout. Forcibly killing process...", FunctionCommon.getFullyQualifiedInstanceId(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName(), this.instanceConfig.getInstanceId()));
                this.process.destroyForcibly();
            }
        }
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int i) {
        final CompletableFuture<InstanceCommunication.FunctionStatus> completableFuture = new CompletableFuture<>();
        if (this.stub == null) {
            completableFuture.completeExceptionally(new RuntimeException("Not alive"));
            return completableFuture;
        }
        Futures.addCallback(this.stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getFunctionStatus(Empty.newBuilder().build()), new FutureCallback<InstanceCommunication.FunctionStatus>() { // from class: org.apache.pulsar.functions.runtime.process.ProcessRuntime.1
            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                InstanceCommunication.FunctionStatus.Builder newBuilder = InstanceCommunication.FunctionStatus.newBuilder();
                newBuilder.setRunning(false);
                if (ProcessRuntime.this.deathException != null) {
                    newBuilder.setFailureException(ProcessRuntime.this.deathException.getMessage());
                } else {
                    newBuilder.setFailureException(th.getMessage());
                }
                completableFuture.complete(newBuilder.build());
            }

            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(InstanceCommunication.FunctionStatus functionStatus) {
                completableFuture.complete(functionStatus);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
        final CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<>();
        if (this.stub == null) {
            completableFuture.completeExceptionally(new RuntimeException("Not alive"));
            return completableFuture;
        }
        Futures.addCallback(this.stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build()), new FutureCallback<InstanceCommunication.MetricsData>() { // from class: org.apache.pulsar.functions.runtime.process.ProcessRuntime.2
            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(InstanceCommunication.MetricsData metricsData) {
                completableFuture.complete(metricsData);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<Void> resetMetrics() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.stub == null) {
            completableFuture.completeExceptionally(new RuntimeException("Not alive"));
            return completableFuture;
        }
        Futures.addCallback(this.stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).resetMetrics(Empty.newBuilder().build()), new FutureCallback<Empty>() { // from class: org.apache.pulsar.functions.runtime.process.ProcessRuntime.3
            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Empty empty) {
                completableFuture.complete(null);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int i) {
        final CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<>();
        if (this.stub == null) {
            completableFuture.completeExceptionally(new RuntimeException("Not alive"));
            return completableFuture;
        }
        Futures.addCallback(this.stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()), new FutureCallback<InstanceCommunication.MetricsData>() { // from class: org.apache.pulsar.functions.runtime.process.ProcessRuntime.4
            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(InstanceCommunication.MetricsData metricsData) {
                completableFuture.complete(metricsData);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public String getPrometheusMetrics() throws IOException {
        return RuntimeUtils.getPrometheusMetrics(this.metricsPort);
    }

    public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() {
        final CompletableFuture<InstanceCommunication.HealthCheckResult> completableFuture = new CompletableFuture<>();
        if (this.stub == null) {
            completableFuture.completeExceptionally(new RuntimeException("Not alive"));
            return completableFuture;
        }
        Futures.addCallback(this.stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).healthCheck(Empty.newBuilder().build()), new FutureCallback<InstanceCommunication.HealthCheckResult>() { // from class: org.apache.pulsar.functions.runtime.process.ProcessRuntime.5
            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            @Override // org.apache.pulsar.shade.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(InstanceCommunication.HealthCheckResult healthCheckResult) {
                completableFuture.complete(healthCheckResult);
            }
        }, MoreExecutors.directExecutor());
        return completableFuture;
    }

    private void startProcess() {
        this.deathException = null;
        try {
            ProcessBuilder inheritIO = new ProcessBuilder(this.processArgs).inheritIO();
            if (StringUtils.isNotEmpty(this.extraDependenciesDir)) {
                inheritIO.environment().put("PYTHONPATH", "${PYTHONPATH}:" + this.extraDependenciesDir);
            }
            this.secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(inheritIO, this.instanceConfig.getFunctionDetails());
            log.info("ProcessBuilder starting the process with args {}", String.join(StringUtils.SPACE, inheritIO.command()));
            this.process = inheritIO.start();
            try {
                log.error("Instance Process quit unexpectedly with return value " + this.process.exitValue());
                tryExtractingDeathException();
            } catch (IllegalThreadStateException e) {
                log.info("Started process successfully");
            }
        } catch (Exception e2) {
            log.error("Starting process failed", e2);
            this.deathException = e2;
        }
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public boolean isAlive() {
        if (this.process == null) {
            return false;
        }
        if (this.process.isAlive()) {
            return true;
        }
        if (this.deathException != null) {
            return false;
        }
        tryExtractingDeathException();
        return false;
    }

    private void tryExtractingDeathException() {
        InputStream errorStream = this.process.getErrorStream();
        try {
            byte[] bArr = new byte[errorStream.available()];
            errorStream.read(bArr);
            this.deathException = new RuntimeException(new String(bArr));
            log.error("Extracted Process death exception", this.deathException);
        } catch (Exception e) {
            this.deathException = e;
            log.error("Error extracting Process death exception", this.deathException);
        }
    }

    public Process getProcess() {
        return this.process;
    }

    public List<String> getProcessArgs() {
        return this.processArgs;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public Throwable getDeathException() {
        return this.deathException;
    }
}
