package org.apache.flink.client.program;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/client/program/Client.class */
public class Client {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    final Optimizer compiler;
    private final ActorSystem actorSystem;
    private final Configuration config;
    private final FiniteDuration timeout;
    private final FiniteDuration lookupTimeout;
    private final int maxSlots;
    private boolean printStatusDuringExecution;
    private JobID lastJobID;

    public Client(Configuration configuration) throws IOException {
        this(configuration, -1);
    }

    public Client(Configuration configuration, int i) throws IOException {
        this.printStatusDuringExecution = true;
        this.config = (Configuration) Preconditions.checkNotNull(configuration);
        this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
        this.maxSlots = i;
        LOG.info("Starting client actor system");
        try {
            this.actorSystem = JobClient.startJobClientActorSystem(configuration);
            this.timeout = AkkaUtils.getTimeout(configuration);
            this.lookupTimeout = AkkaUtils.getTimeout(configuration);
        } catch (Exception e) {
            throw new IOException("Could start client actor system.", e);
        }
    }

    public void shutdown() {
        if (this.actorSystem.isTerminated()) {
            return;
        }
        this.actorSystem.shutdown();
        this.actorSystem.awaitTermination();
    }

    public void setPrintStatusDuringExecution(boolean z) {
        this.printStatusDuringExecution = z;
    }

    public boolean getPrintStatusDuringExecution() {
        return this.printStatusDuringExecution;
    }

    public int getMaxSlots() {
        return this.maxSlots;
    }

    public static String getOptimizedPlanAsJson(Optimizer optimizer, PackagedProgram packagedProgram, int i) throws CompilerException, ProgramInvocationException {
        return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(getOptimizedPlan(optimizer, packagedProgram, i));
    }

    public static FlinkPlan getOptimizedPlan(Optimizer optimizer, PackagedProgram packagedProgram, int i) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return getOptimizedPlan(optimizer, packagedProgram.getPlanWithJars(), i);
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new RuntimeException("Couldn't determine program mode.");
        }
        OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
        if (i > 0) {
            optimizerPlanEnvironment.setParallelism(i);
        }
        return optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer optimizer, Plan plan, int i) throws CompilerException {
        if (i > 0 && plan.getDefaultParallelism() <= 0) {
            LOG.debug("Changing plan default parallelism from {} to {}", Integer.valueOf(plan.getDefaultParallelism()), Integer.valueOf(i));
            plan.setDefaultParallelism(i);
        }
        LOG.debug("Set parallelism {}, plan default parallelism {}", Integer.valueOf(i), Integer.valueOf(plan.getDefaultParallelism()));
        return optimizer.compile(plan);
    }

    public JobSubmissionResult runBlocking(PackagedProgram packagedProgram, int i) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return runBlocking(packagedProgram.getPlanWithJars(), i);
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new RuntimeException();
        }
        LOG.info("Starting program in interactive mode");
        ContextEnvironment.setAsContext(this, packagedProgram.getAllLibraries(), packagedProgram.getClasspaths(), packagedProgram.getUserCodeClassLoader(), i, true);
        try {
            packagedProgram.invokeInteractiveModeForExecution();
            ContextEnvironment.unsetContext();
            return new JobSubmissionResult(this.lastJobID);
        } catch (Throwable th) {
            ContextEnvironment.unsetContext();
            throw th;
        }
    }

    public JobSubmissionResult runDetached(PackagedProgram packagedProgram, int i) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        if (packagedProgram.isUsingProgramEntryPoint()) {
            return runDetached(packagedProgram.getPlanWithJars(), i);
        }
        if (!packagedProgram.isUsingInteractiveMode()) {
            throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
        }
        LOG.info("Starting program in interactive mode");
        ContextEnvironment.setAsContext(this, packagedProgram.getAllLibraries(), packagedProgram.getClasspaths(), packagedProgram.getUserCodeClassLoader(), i, false);
        try {
            packagedProgram.invokeInteractiveModeForExecution();
            ContextEnvironment.unsetContext();
            return new JobSubmissionResult(this.lastJobID);
        } catch (Throwable th) {
            ContextEnvironment.unsetContext();
            throw th;
        }
    }

    public JobExecutionResult runBlocking(JobWithJars jobWithJars, int i) throws CompilerException, ProgramInvocationException {
        ClassLoader userCodeClassLoader = jobWithJars.getUserCodeClassLoader();
        if (userCodeClassLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        return runBlocking(getOptimizedPlan(this.compiler, jobWithJars, i), jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), userCodeClassLoader);
    }

    public JobSubmissionResult runDetached(JobWithJars jobWithJars, int i) throws CompilerException, ProgramInvocationException {
        ClassLoader userCodeClassLoader = jobWithJars.getUserCodeClassLoader();
        if (userCodeClassLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        return runDetached(getOptimizedPlan(this.compiler, jobWithJars, i), jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), userCodeClassLoader);
    }

    public JobExecutionResult runBlocking(OptimizedPlan optimizedPlan, List<URL> list, List<URL> list2, ClassLoader classLoader) throws ProgramInvocationException {
        return runBlocking(getJobGraph(optimizedPlan, list, list2), classLoader);
    }

    public JobSubmissionResult runDetached(OptimizedPlan optimizedPlan, List<URL> list, List<URL> list2, ClassLoader classLoader) throws ProgramInvocationException {
        return runDetached(getJobGraph(optimizedPlan, list, list2), classLoader);
    }

    public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        try {
            LeaderRetrievalService createLeaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(this.config);
            try {
                this.lastJobID = jobGraph.getJobID();
                return JobClient.submitJobAndWait(this.actorSystem, createLeaderRetrievalService, jobGraph, this.timeout, this.printStatusDuringExecution, classLoader);
            } catch (JobExecutionException e) {
                throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
            }
        } catch (Exception e2) {
            throw new ProgramInvocationException("Could not create the leader retrieval service.", e2);
        }
    }

    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        try {
            ActorGateway jobManagerGateway = getJobManagerGateway();
            LOG.info("Checking and uploading JAR files");
            try {
                JobClient.uploadJarFiles(jobGraph, jobManagerGateway, this.timeout);
                try {
                    this.lastJobID = jobGraph.getJobID();
                    JobClient.submitJobDetached(jobManagerGateway, jobGraph, this.timeout, classLoader);
                    return new JobSubmissionResult(jobGraph.getJobID());
                } catch (JobExecutionException e) {
                    throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
                }
            } catch (IOException e2) {
                throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e2);
            }
        } catch (Exception e3) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e3);
        }
    }

    public void cancel(JobID jobID) throws Exception {
        try {
            Object result = Await.result(getJobManagerGateway().ask(new JobManagerMessages.CancelJob(jobID), this.timeout), this.timeout);
            if (result instanceof JobManagerMessages.CancellationSuccess) {
                LOG.debug("Job cancellation with ID " + jobID + " succeeded.");
            } else {
                if (!(result instanceof JobManagerMessages.CancellationFailure)) {
                    throw new Exception("Unknown message received while cancelling.");
                }
                Throwable cause = ((JobManagerMessages.CancellationFailure) result).cause();
                LOG.debug("Job cancellation with ID " + jobID + " failed.", cause);
                throw new Exception("Failed to cancel the job because of \n" + cause.getMessage());
            }
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
        }
    }

    public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
        return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    }

    public Map<String, Object> getAccumulators(JobID jobID, ClassLoader classLoader) throws Exception {
        try {
            Object result = Await.result(getJobManagerGateway().ask(new RequestAccumulatorResults(jobID), this.timeout), this.timeout);
            if (result instanceof AccumulatorResultsFound) {
                return AccumulatorHelper.deserializeAccumulators(((AccumulatorResultsFound) result).result(), classLoader);
            }
            if (result instanceof AccumulatorResultsErroneous) {
                throw ((AccumulatorResultsErroneous) result).cause();
            }
            throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
        } catch (Exception e) {
            throw new Exception("Failed to query the job manager gateway for accumulators.", e);
        }
    }

    public void endSession(JobID jobID) throws Exception {
        if (jobID == null) {
            throw new IllegalArgumentException("The JobID must not be null.");
        }
        endSessions(Collections.singletonList(jobID));
    }

    public void endSessions(List<JobID> list) throws Exception {
        if (list == null) {
            throw new IllegalArgumentException("The JobIDs must not be null");
        }
        ActorGateway jobManagerGateway = getJobManagerGateway();
        for (JobID jobID : list) {
            if (jobID != null) {
                LOG.info("Telling job manager to end the session {}.", jobID);
                jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID));
            }
        }
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer optimizer, JobWithJars jobWithJars, int i) throws CompilerException, ProgramInvocationException {
        return getOptimizedPlan(optimizer, jobWithJars.getPlan(), i);
    }

    public JobGraph getJobGraph(PackagedProgram packagedProgram, FlinkPlan flinkPlan) throws ProgramInvocationException {
        return getJobGraph(flinkPlan, packagedProgram.getAllLibraries(), packagedProgram.getClasspaths());
    }

    private JobGraph getJobGraph(FlinkPlan flinkPlan, List<URL> list, List<URL> list2) {
        JobGraph jobGraph = flinkPlan instanceof StreamingPlan ? ((StreamingPlan) flinkPlan).getJobGraph() : new JobGraphGenerator(this.config).compileJobGraph((OptimizedPlan) flinkPlan);
        Iterator<URL> it = list.iterator();
        while (it.hasNext()) {
            try {
                jobGraph.addJar(new Path(it.next().toURI()));
            } catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
        jobGraph.setClasspaths(list2);
        return jobGraph;
    }

    private ActorGateway getJobManagerGateway() throws Exception {
        LOG.info("Looking up JobManager");
        return LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.createLeaderRetrievalService(this.config), this.actorSystem, this.lookupTimeout);
    }
}
