package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobManagerRunner.class */
public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler {
    private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
    private final Object lock;
    private final JobGraph jobGraph;
    private final OnCompletionActions toNotifyOnComplete;
    private final FatalErrorHandler errorHandler;
    private final RunningJobsRegistry runningJobsRegistry;
    private final LeaderElectionService leaderElectionService;
    private final JobManagerServices jobManagerServices;
    private final JobMaster jobManager;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private volatile boolean shutdown;

    public JobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler) throws Exception {
        this(resourceID, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)), onCompletionActions, fatalErrorHandler);
    }

    public JobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler) throws Exception {
        this(resourceID, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, JobManagerServices.fromConfiguration(configuration, highAvailabilityServices), metricRegistry, onCompletionActions, fatalErrorHandler);
    }

    public JobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler) throws Exception {
        this.lock = new Object();
        JobManagerMetricGroup jobManagerMetricGroup = null;
        try {
            this.jobGraph = (JobGraph) Preconditions.checkNotNull(jobGraph);
            this.toNotifyOnComplete = (OnCompletionActions) Preconditions.checkNotNull(onCompletionActions);
            this.errorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
            this.jobManagerServices = (JobManagerServices) Preconditions.checkNotNull(jobManagerServices);
            Preconditions.checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
            jobManagerMetricGroup = new JobManagerMetricGroup(metricRegistry, rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress());
            this.jobManagerMetricGroup = jobManagerMetricGroup;
            BlobLibraryCacheManager blobLibraryCacheManager = jobManagerServices.libraryCacheManager;
            try {
                blobLibraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
                ClassLoader classLoader = blobLibraryCacheManager.getClassLoader(jobGraph.getJobID());
                if (classLoader == null) {
                    throw new Exception("The user code class loader could not be initialized.");
                }
                this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
                this.leaderElectionService = highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
                this.jobManager = new JobMaster(rpcService, resourceID, jobGraph, configuration, highAvailabilityServices, heartbeatServices, jobManagerServices.executorService, jobManagerServices.libraryCacheManager, jobManagerServices.restartStrategyFactory, jobManagerServices.rpcAskTimeout, jobManagerMetricGroup, this, this, classLoader);
            } catch (IOException e) {
                throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
            }
        } catch (Throwable th) {
            try {
                jobManagerServices.shutdown();
            } catch (Throwable th2) {
                log.error("Error while shutting down JobManager services", th2);
            }
            if (jobManagerMetricGroup != null) {
                jobManagerMetricGroup.close();
            }
            throw new JobExecutionException(jobGraph.getJobID(), "Could not set up JobManager", th);
        }
    }

    public void start() throws Exception {
        try {
            this.leaderElectionService.start(this);
        } catch (Exception e) {
            log.error("Could not start the JobManager because the leader election service did not start.", e);
            throw new Exception("Could not start the leader election service.", e);
        }
    }

    public void shutdown() {
        shutdownInternally();
    }

    private void shutdownInternally() {
        synchronized (this.lock) {
            this.shutdown = true;
            if (this.leaderElectionService != null) {
                try {
                    this.leaderElectionService.stop();
                } catch (Throwable th) {
                    log.error("Could not properly shutdown the leader election service", th);
                }
            }
            try {
                this.jobManager.shutDown();
            } catch (Throwable th2) {
                log.error("Error shutting down JobManager", th2);
            }
            try {
                this.jobManagerServices.shutdown();
            } catch (Throwable th3) {
                log.error("Error shutting down JobManager services", th3);
            }
            try {
                this.jobManagerMetricGroup.close();
            } catch (Throwable th4) {
                log.error("Error while unregistering metrics", th4);
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
    public void jobFinished(JobExecutionResult jobExecutionResult) {
        try {
            unregisterJobFromHighAvailability();
            shutdownInternally();
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFinished(jobExecutionResult);
            }
        } catch (Throwable th) {
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFinished(jobExecutionResult);
            }
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
    public void jobFailed(Throwable th) {
        try {
            unregisterJobFromHighAvailability();
            shutdownInternally();
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFailed(th);
            }
        } catch (Throwable th2) {
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFailed(th);
            }
            throw th2;
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
    public void jobFinishedByOther() {
        try {
            shutdownInternally();
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFinishedByOther();
            }
        } catch (Throwable th) {
            if (this.toNotifyOnComplete != null) {
                this.toNotifyOnComplete.jobFinishedByOther();
            }
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
    public void onFatalError(Throwable th) {
        try {
            log.error("JobManager runner encountered a fatal error.", th);
        } catch (Throwable th2) {
        }
        try {
            if (this.errorHandler != null) {
                this.errorHandler.onFatalError(th);
            }
        } finally {
            shutdownInternally();
        }
    }

    private void unregisterJobFromHighAvailability() {
        try {
            this.runningJobsRegistry.setJobFinished(this.jobGraph.getJobID());
        } catch (Throwable th) {
            log.error("Could not un-register from high-availability services job {} ({}).Other JobManager's may attempt to recover it and re-execute it.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), th});
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        synchronized (this.lock) {
            if (this.shutdown) {
                log.info("JobManagerRunner already shutdown.");
                return;
            }
            log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), uuid, getAddress()});
            this.leaderElectionService.confirmLeaderSessionID(uuid);
            try {
                RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = this.runningJobsRegistry.getJobSchedulingStatus(this.jobGraph.getJobID());
                if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
                    log.info("Granted leader ship but job {} has been finished. ", this.jobGraph.getJobID());
                    jobFinishedByOther();
                    return;
                }
                if (this.leaderElectionService.hasLeadership()) {
                    try {
                        if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
                            this.runningJobsRegistry.setJobRunning(this.jobGraph.getJobID());
                        }
                        this.jobManager.start(uuid);
                    } catch (Exception e) {
                        onFatalError(new Exception("Could not start the job manager.", e));
                    }
                }
            } catch (Throwable th) {
                log.error("Could not access status (running/finished) of job {}. ", this.jobGraph.getJobID(), th);
                onFatalError(th);
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        synchronized (this.lock) {
            if (this.shutdown) {
                log.info("JobManagerRunner already shutdown.");
            } else {
                log.info("JobManager for job {} ({}) was revoked leadership at {}.", new Object[]{this.jobGraph.getName(), this.jobGraph.getJobID(), getAddress()});
                this.jobManager.getSelf().suspendExecution(new Exception("JobManager is no longer the leader."));
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public String getAddress() {
        return this.jobManager.getAddress();
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        log.error("Leader Election Service encountered a fatal error.", exc);
        onFatalError(exc);
    }

    @VisibleForTesting
    boolean isShutdown() {
        return this.shutdown;
    }
}
