package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher.class */
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway, LeaderContender {
    public static final String DISPATCHER_NAME = "dispatcher";
    private final Configuration configuration;
    private final SubmittedJobGraphStore submittedJobGraphStore;
    private final RunningJobsRegistry runningJobsRegistry;
    private final HighAvailabilityServices highAvailabilityServices;
    private final ResourceManagerGateway resourceManagerGateway;
    private final JobManagerServices jobManagerServices;
    private final HeartbeatServices heartbeatServices;
    private final MetricRegistry metricRegistry;
    private final FatalErrorHandler fatalErrorHandler;
    private final Map<JobID, JobManagerRunner> jobManagerRunners;
    private final LeaderElectionService leaderElectionService;
    private final CompletableFuture<String> restAddressFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$DispatcherOnCompleteActions.class */
    public class DispatcherOnCompleteActions implements OnCompletionActions {
        private final JobID jobId;

        private DispatcherOnCompleteActions(JobID jobID) {
            this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFinished(JobExecutionResult jobExecutionResult) {
            Dispatcher.this.log.info("Job {} finished.", this.jobId);
            Dispatcher.this.runAsync(() -> {
                try {
                    Dispatcher.this.removeJob(this.jobId, true);
                } catch (Exception e) {
                    Dispatcher.this.log.warn("Could not properly remove job {} from the dispatcher.", this.jobId, e);
                }
            });
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFailed(Throwable th) {
            Dispatcher.this.log.info("Job {} failed.", this.jobId);
            Dispatcher.this.runAsync(() -> {
                try {
                    Dispatcher.this.removeJob(this.jobId, true);
                } catch (Exception e) {
                    Dispatcher.this.log.warn("Could not properly remove job {} from the dispatcher.", this.jobId, e);
                }
            });
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFinishedByOther() {
            Dispatcher.this.log.info("Job {} was finished by other JobManager.", this.jobId);
            Dispatcher.this.runAsync(() -> {
                try {
                    Dispatcher.this.removeJob(this.jobId, false);
                } catch (Exception e) {
                    Dispatcher.this.log.warn("Could not properly remove job {} from the dispatcher.", this.jobId, e);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Dispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, Optional<String> optional) throws Exception {
        super(rpcService, str);
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.resourceManagerGateway = (ResourceManagerGateway) Preconditions.checkNotNull(resourceManagerGateway);
        this.jobManagerServices = JobManagerServices.fromConfiguration(configuration, (BlobServer) Preconditions.checkNotNull(blobServer));
        this.heartbeatServices = (HeartbeatServices) Preconditions.checkNotNull(heartbeatServices);
        this.metricRegistry = (MetricRegistry) Preconditions.checkNotNull(metricRegistry);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
        this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
        this.jobManagerRunners = new HashMap(16);
        this.leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
        this.restAddressFuture = (CompletableFuture) optional.map((v0) -> {
            return CompletableFuture.completedFuture(v0);
        }).orElse(FutureUtils.completedExceptionally(new DispatcherException("The Dispatcher has not been started with a REST endpoint.")));
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void postStop() throws Exception {
        Throwable th = null;
        clearState();
        try {
            this.jobManagerServices.shutdown();
        } catch (Throwable th2) {
            th = ExceptionUtils.firstOrSuppressed(th2, (Throwable) null);
        }
        try {
            this.submittedJobGraphStore.stop();
        } catch (Exception e) {
            th = ExceptionUtils.firstOrSuppressed(e, th);
        }
        try {
            this.leaderElectionService.stop();
        } catch (Exception e2) {
            th = ExceptionUtils.firstOrSuppressed(e2, th);
        }
        try {
            super.postStop();
        } catch (Exception e3) {
            th = ExceptionUtils.firstOrSuppressed(e3, th);
        }
        if (th != null) {
            throw new FlinkException("Could not properly terminate the Dispatcher.", th);
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() throws Exception {
        super.start();
        this.leaderElectionService.start(this);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time time) {
        JobID jobID = jobGraph.getJobID();
        this.log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
        try {
            RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(jobID);
            if (jobSchedulingStatus != RunningJobsRegistry.JobSchedulingStatus.PENDING) {
                return FutureUtils.completedExceptionally(new JobSubmissionException(jobID, "Job has already been submitted and is currently in state " + jobSchedulingStatus + '.'));
            }
            try {
                this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
                try {
                    JobManagerRunner createJobManagerRunner = createJobManagerRunner(ResourceID.generate(), jobGraph, this.configuration, getRpcService(), this.highAvailabilityServices, this.heartbeatServices, this.jobManagerServices, this.metricRegistry, new DispatcherOnCompleteActions(jobGraph.getJobID()), this.fatalErrorHandler);
                    createJobManagerRunner.start();
                    this.jobManagerRunners.put(jobID, createJobManagerRunner);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                } catch (Exception e) {
                    try {
                        this.submittedJobGraphStore.removeJobGraph(jobID);
                    } catch (Throwable th) {
                        this.log.warn("Cannot remove job graph from submitted job graph store.", th);
                        e.addSuppressed(th);
                    }
                    return FutureUtils.completedExceptionally(new JobSubmissionException(jobID, "Could not start JobManager.", e));
                }
            } catch (Exception e2) {
                this.log.warn("Cannot persist JobGraph.", e2);
                return FutureUtils.completedExceptionally(new JobSubmissionException(jobID, "Could not persist JobGraph.", e2));
            }
        } catch (IOException e3) {
            this.log.warn("Cannot retrieve job status for {}.", jobID, e3);
            return FutureUtils.completedExceptionally(new JobSubmissionException(jobID, "Could not retrieve the job status.", e3));
        }
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Collection<JobID>> listJobs(Time time) {
        return CompletableFuture.completedFuture(this.jobManagerRunners.keySet());
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> cancelJob(JobID jobID, Time time) {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.get(jobID);
        return jobManagerRunner == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : jobManagerRunner.getJobManagerGateway().cancel(time);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> stopJob(JobID jobID, Time time) {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.get(jobID);
        return jobManagerRunner == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : jobManagerRunner.getJobManagerGateway().stop(time);
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<String> requestRestAddress(Time time) {
        return this.restAddressFuture;
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        CompletableFuture<ResourceOverview> requestResourceOverview = this.resourceManagerGateway.requestResourceOverview(time);
        ArrayList arrayList = new ArrayList(this.jobManagerRunners.size());
        Iterator<Map.Entry<JobID, JobManagerRunner>> it = this.jobManagerRunners.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue().getJobManagerGateway().requestJobStatus(time));
        }
        return FutureUtils.combineAll(arrayList).thenCombine((CompletionStage) requestResourceOverview, (collection, resourceOverview) -> {
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            int i4 = 0;
            Iterator it2 = collection.iterator();
            while (it2.hasNext()) {
                switch ((JobStatus) it2.next()) {
                    case FINISHED:
                        i2++;
                        break;
                    case FAILED:
                        i4++;
                        break;
                    case CANCELED:
                        i3++;
                        break;
                    default:
                        i++;
                        break;
                }
            }
            return new ClusterOverview(resourceOverview.getNumberTaskManagers(), resourceOverview.getNumberRegisteredSlots(), resourceOverview.getNumberFreeSlots(), i, i2, i3, i4);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean z, boolean z2, Time time) {
        ArrayList arrayList = new ArrayList(this.jobManagerRunners.size());
        Iterator<JobManagerRunner> it = this.jobManagerRunners.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getJobManagerGateway().requestJobDetails(time));
        }
        return FutureUtils.combineAll(arrayList).thenApply(collection -> {
            return new MultipleJobsDetails(collection, null);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobID, Time time) {
        JobManagerRunner jobManagerRunner = this.jobManagerRunners.get(jobID);
        return jobManagerRunner == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(time);
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time time) {
        String metricQueryServicePath = this.metricRegistry.getMetricQueryServicePath();
        return metricQueryServicePath != null ? CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath)) : CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time time) {
        return this.resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(time);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Integer> getBlobServerPort(Time time) {
        return CompletableFuture.completedFuture(Integer.valueOf(this.jobManagerServices.blobServer.getPort()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeJob(JobID jobID, boolean z) throws Exception {
        JobManagerRunner remove = this.jobManagerRunners.remove(jobID);
        if (remove != null) {
            remove.shutdown();
        }
        if (z) {
            this.submittedJobGraphStore.removeJobGraph(jobID);
        }
    }

    private void clearState() throws Exception {
        Exception exc = null;
        Iterator<JobManagerRunner> it = this.jobManagerRunners.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().shutdown();
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        this.jobManagerRunners.clear();
        if (exc != null) {
            throw exc;
        }
    }

    private void recoverJobs() {
        this.log.info("Recovering all persisted jobs.");
        getRpcService().execute(() -> {
            try {
                for (JobID jobID : this.submittedJobGraphStore.getJobIds()) {
                    try {
                        SubmittedJobGraph recoverJobGraph = this.submittedJobGraphStore.recoverJobGraph(jobID);
                        runAsync(() -> {
                            submitJob(recoverJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
                        });
                    } catch (Exception e) {
                        this.log.error("Could not recover the job graph for " + jobID + '.', e);
                    }
                }
            } catch (Exception e2) {
                this.log.error("Could not recover job ids from the submitted job graph store. Aborting recovery.", e2);
            }
        });
    }

    private void onFatalError(Throwable th) {
        this.log.error("Fatal error occurred in dispatcher {}.", getAddress(), th);
        this.fatalErrorHandler.onFatalError(th);
    }

    protected abstract JobManagerRunner createJobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler) throws Exception;

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        runAsyncWithoutFencing(() -> {
            DispatcherId dispatcherId = new DispatcherId(uuid);
            this.log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId);
            if (getFencingToken() != null) {
                try {
                    clearState();
                } catch (Exception e) {
                    this.log.warn("Could not properly clear the Dispatcher state while granting leadership.", e);
                }
            }
            setFencingToken(dispatcherId);
            getRpcService().execute(() -> {
                this.leaderElectionService.confirmLeaderSessionID(uuid);
            });
            recoverJobs();
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        runAsyncWithoutFencing(() -> {
            this.log.info("Dispatcher {} was revoked leadership.", getAddress());
            try {
                clearState();
            } catch (Exception e) {
                this.log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e);
            }
            setFencingToken(null);
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exc));
    }
}
