package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.class */
public class StreamTaskTerminationTest extends TestLogger {
    private static final OneShotLatch RUN_LATCH = new OneShotLatch();
    private static final AtomicBoolean SNAPSHOT_HAS_STARTED = new AtomicBoolean();
    private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch();

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(10);

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest$BlockingCallable.class */
    static class BlockingCallable implements Callable<SnapshotResult<OperatorStateHandle>> {
        BlockingCallable() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public SnapshotResult<OperatorStateHandle> call() throws Exception {
            StreamTaskTerminationTest.SNAPSHOT_HAS_STARTED.set(true);
            StreamTaskTerminationTest.CLEANUP_LATCH.await();
            throw new FlinkException("Checkpointing operation failed");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest$BlockingStateBackend.class */
    static class BlockingStateBackend implements StateBackend {
        private static final long serialVersionUID = -5053068148933314100L;

        BlockingStateBackend() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) {
            throw new UnsupportedOperationException();
        }

        public CheckpointStorage createCheckpointStorage(JobID jobID) throws IOException {
            return new MemoryBackendCheckpointStorage(jobID, (Path) null, (Path) null, Integer.MAX_VALUE);
        }

        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            OperatorStateBackend operatorStateBackend = (OperatorStateBackend) Mockito.mock(OperatorStateBackend.class);
            Mockito.when(operatorStateBackend.snapshot(Matchers.anyLong(), Matchers.anyLong(), (CheckpointStreamFactory) Matchers.any(CheckpointStreamFactory.class), (CheckpointOptions) Matchers.any(CheckpointOptions.class))).thenReturn(new FutureTask(new BlockingCallable()));
            return operatorStateBackend;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest$BlockingStreamTask.class */
    public static class BlockingStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> {
        private boolean isRunning;

        public BlockingStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (!this.isRunning) {
                this.isRunning = true;
                StreamTaskTerminationTest.RUN_LATCH.trigger();
            }
            if (isCanceled() || StreamTaskTerminationTest.SNAPSHOT_HAS_STARTED.get()) {
                controller.allActionsCompleted();
            }
        }

        protected void cleanup() throws Exception {
            StreamTaskTerminationTest.CLEANUP_LATCH.trigger();
            Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest$NoOpStreamOperator.class */
    private static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
        private static final long serialVersionUID = 4517845269225218312L;

        private NoOpStreamOperator() {
        }
    }

    @Test
    public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws Exception {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        NoOpStreamOperator noOpStreamOperator = new NoOpStreamOperator();
        BlockingStateBackend blockingStateBackend = new BlockingStateBackend();
        streamConfig.setStreamOperator(noOpStreamOperator);
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setStateBackend(blockingStateBackend);
        JobInformation jobInformation = new JobInformation(new JobID(), "Test Job", new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, BlockingStreamTask.class.getName(), configuration);
        TestingTaskManagerRuntimeInfo testingTaskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
        Task task = new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, MemoryManagerBuilder.newBuilder().setMemorySize(32768L).build(), new IOManagerAsync(), new NettyShuffleEnvironmentBuilder().build(), new KvStateService(new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, new TestTaskStateManager(), (TaskManagerActions) Mockito.mock(TaskManagerActions.class), (InputSplitProvider) Mockito.mock(InputSplitProvider.class), (CheckpointResponder) Mockito.mock(CheckpointResponder.class), new NoOpTaskOperatorEventGateway(), new TestGlobalAggregateManager(), TestingClassLoaderLease.newBuilder().build(), (FileCache) Mockito.mock(FileCache.class), testingTaskManagerRuntimeInfo, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor());
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            task.run();
        }, TestingUtils.defaultExecutor());
        RUN_LATCH.await();
        task.triggerCheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        runAsync.get();
        if (task.getFailureCause() != null) {
            throw new Exception("Task failed", task.getFailureCause());
        }
        Assert.assertEquals(ExecutionState.FINISHED, task.getExecutionState());
    }
}
