package org.apache.flink.runtime.execution;

import akka.actor.ActorRef;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/execution/RuntimeEnvironment.class */
public class RuntimeEnvironment implements Environment, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
    private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
    private final ActorRef jobManager;
    private final Task owner;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final ClassLoader userCodeClassLoader;
    private final AbstractInvokable invokable;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final InputSplitProvider inputSplitProvider;
    private Thread executingThread;
    private final BroadcastVariableManager broadcastVariableManager;
    private final ResultPartition[] producedPartitions;
    private final ResultPartitionWriter[] writers;
    private final SingleInputGate[] inputGates;
    private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap();
    private final AtomicBoolean canceled = new AtomicBoolean();
    private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById = new HashMap();

    public RuntimeEnvironment(ActorRef actorRef, Task task, TaskDeploymentDescriptor taskDeploymentDescriptor, ClassLoader classLoader, MemoryManager memoryManager, IOManager iOManager, InputSplitProvider inputSplitProvider, BroadcastVariableManager broadcastVariableManager, NetworkEnvironment networkEnvironment) throws Exception {
        this.owner = (Task) Preconditions.checkNotNull(task);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.inputSplitProvider = (InputSplitProvider) Preconditions.checkNotNull(inputSplitProvider);
        this.jobManager = (ActorRef) Preconditions.checkNotNull(actorRef);
        this.broadcastVariableManager = (BroadcastVariableManager) Preconditions.checkNotNull(broadcastVariableManager);
        try {
            List<ResultPartitionDeploymentDescriptor> producedPartitions = taskDeploymentDescriptor.getProducedPartitions();
            this.producedPartitions = new ResultPartition[producedPartitions.size()];
            this.writers = new ResultPartitionWriter[producedPartitions.size()];
            for (int i = 0; i < this.producedPartitions.length; i++) {
                ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = producedPartitions.get(i);
                this.producedPartitions[i] = new ResultPartition(task.getJobID(), new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), task.getExecutionId()), resultPartitionDeploymentDescriptor.getPartitionType(), resultPartitionDeploymentDescriptor.getNumberOfSubpartitions(), networkEnvironment.getPartitionManager(), networkEnvironment.getPartitionConsumableNotifier(), iOManager, networkEnvironment.getDefaultIOMode());
                this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
            }
            List<InputGateDeploymentDescriptor> inputGates = taskDeploymentDescriptor.getInputGates();
            this.inputGates = new SingleInputGate[inputGates.size()];
            for (int i2 = 0; i2 < this.inputGates.length; i2++) {
                this.inputGates[i2] = SingleInputGate.create(inputGates.get(i2), networkEnvironment);
                this.inputGatesById.put(this.inputGates[i2].getConsumedResultId(), this.inputGates[i2]);
            }
            this.jobConfiguration = taskDeploymentDescriptor.getJobConfiguration();
            this.taskConfiguration = taskDeploymentDescriptor.getTaskConfiguration();
            this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
            try {
                try {
                    this.invokable = (AbstractInvokable) Class.forName(taskDeploymentDescriptor.getInvokableClassName(), true, classLoader).asSubclass(AbstractInvokable.class).newInstance();
                    this.invokable.setEnvironment(this);
                    this.invokable.registerInputOutput();
                } catch (Throwable th) {
                    throw new Exception("Could not instantiate the invokable class.", th);
                }
            } catch (Throwable th2) {
                throw new Exception("Could not load invokable class.", th2);
            }
        } catch (Throwable th3) {
            throw new Exception("Error setting up runtime environment: " + th3.getMessage(), th3);
        }
    }

    public AbstractInvokable getInvokable() {
        return this.invokable;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public JobID getJobID() {
        return this.owner.getJobID();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public JobVertexID getJobVertexId() {
        return this.owner.getVertexID();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.owner.isCanceledOrFailed()) {
            this.owner.cancelingDone();
            return;
        }
        try {
            Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
            this.invokable.invoke();
            if (this.owner.isCanceledOrFailed()) {
                throw new CancelTaskException("Task has been canceled or failed");
            }
            if (this.producedPartitions != null) {
                for (ResultPartition resultPartition : this.producedPartitions) {
                    if (resultPartition != null) {
                        resultPartition.finish();
                    }
                }
            }
            if (this.owner.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            if (!this.owner.markAsFinished()) {
                throw new Exception("Could *not* notify job manager that the task is finished.");
            }
        } catch (Throwable th) {
            if (!this.owner.isCanceledOrFailed()) {
                try {
                    this.invokable.cancel();
                } catch (Throwable th2) {
                    LOG.error("Error while canceling the task", th2);
                }
            }
            if (this.owner.isCanceledOrFailed() || (th instanceof CancelTaskException)) {
                this.owner.cancelingDone();
            } else {
                this.owner.markFailed(th);
            }
        }
    }

    public Thread getExecutingThread() {
        Thread thread;
        synchronized (this) {
            if (this.executingThread == null) {
                String taskNameWithSubtasks = this.owner.getTaskNameWithSubtasks();
                if (LOG.isDebugEnabled()) {
                    taskNameWithSubtasks = taskNameWithSubtasks + " (" + this.owner.getExecutionId() + ")";
                }
                this.executingThread = new Thread(TASK_THREADS, this, taskNameWithSubtasks);
            }
            thread = this.executingThread;
        }
        return thread;
    }

    public void cancelExecution() {
        if (this.canceled.compareAndSet(false, true)) {
            LOG.info("Canceling {} ({}).", this.owner.getTaskNameWithSubtasks(), this.owner.getExecutionId());
            if (this.invokable != null) {
                try {
                    this.invokable.cancel();
                } catch (Throwable th) {
                    LOG.error("Error while canceling the task.", th);
                }
            }
            Thread thread = this.executingThread;
            if (thread != null) {
                thread.interrupt();
                try {
                    thread.join(5000L);
                } catch (InterruptedException e) {
                }
                if (thread.isAlive()) {
                    while (thread != null && thread.isAlive()) {
                        LOG.warn("Task " + this.owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
                        if (LOG.isDebugEnabled()) {
                            StringBuilder append = new StringBuilder("Task ").append(this.owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
                            for (StackTraceElement stackTraceElement : thread.getStackTrace()) {
                                append.append(stackTraceElement).append('\n');
                            }
                            LOG.debug(append.toString());
                        }
                        thread.interrupt();
                        try {
                            thread.join(1000L);
                        } catch (InterruptedException e2) {
                        }
                    }
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public ActorRef getJobManager() {
        return this.jobManager;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public IOManager getIOManager() {
        return this.ioManager;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public BroadcastVariableManager getBroadcastVariableManager() {
        return this.broadcastVariableManager;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public ResultPartitionWriter getWriter(int i) {
        Preconditions.checkElementIndex(i, this.writers.length, "Illegal environment writer request.");
        return this.writers[Preconditions.checkElementIndex(i, this.writers.length)];
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public ResultPartitionWriter[] getAllWriters() {
        return this.writers;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public InputGate getInputGate(int i) {
        Preconditions.checkElementIndex(i, this.inputGates.length);
        return this.inputGates[i];
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public SingleInputGate[] getAllInputGates() {
        return this.inputGates;
    }

    public ResultPartition[] getProducedPartitions() {
        return this.producedPartitions;
    }

    public SingleInputGate getInputGateById(IntermediateDataSetID intermediateDataSetID) {
        return this.inputGatesById.get(intermediateDataSetID);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getNumberOfSubtasks() {
        return this.owner.getNumberOfSubtasks();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getIndexInSubtaskGroup() {
        return this.owner.getSubtaskIndex();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public String getTaskName() {
        return this.owner.getTaskName();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public InputSplitProvider getInputSplitProvider() {
        return this.inputSplitProvider;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public String getTaskNameWithSubtasks() {
        return this.owner.getTaskNameWithSubtasks();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public ClassLoader getUserClassLoader() {
        return this.userCodeClassLoader;
    }

    public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> map) {
        this.cacheCopyTasks.putAll(map);
    }

    public void addCopyTaskForCacheFile(String str, FutureTask<Path> futureTask) {
        this.cacheCopyTasks.put(str, futureTask);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Map<String, FutureTask<Path>> getCopyTask() {
        return this.cacheCopyTasks;
    }
}
