package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.class */
public class StreamSourceOperatorWatermarksTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest$FiniteSource.class */
    private static final class FiniteSource extends RichSourceFunction<String> {
        private volatile transient boolean canceled;
        private transient SourceFunction.SourceContext<String> context;
        private final boolean outputingARecordWhenClosing;

        public FiniteSource() {
            this(false);
        }

        public FiniteSource(boolean z) {
            this.canceled = false;
            this.outputingARecordWhenClosing = z;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) {
            this.context = sourceContext;
        }

        public void close() {
            if (this.canceled || !this.outputingARecordWhenClosing) {
                return;
            }
            this.context.collect("Hello");
        }

        public void cancel() {
            this.canceled = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest$InfiniteSource.class */
    private static final class InfiniteSource<T> implements SourceFunction<T> {
        private volatile boolean running;

        private InfiniteSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    @Test
    public void testEmitMaxWatermarkForFiniteSource() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = setupSourceStreamTask(new StreamSource(new FiniteSource()), BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.invoke();
        streamTaskTestHarness.waitForTaskCompletion();
        Assert.assertEquals(1L, streamTaskTestHarness.getOutput().size());
        Assert.assertEquals(Watermark.MAX_WATERMARK, streamTaskTestHarness.getOutput().peek());
    }

    @Test
    public void testMaxWatermarkIsForwardedLastForFiniteSource() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = setupSourceStreamTask(new StreamSource(new FiniteSource(true)), BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.invoke();
        streamTaskTestHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("Hello"));
        concurrentLinkedQueue.add(Watermark.MAX_WATERMARK);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, streamTaskTestHarness.getOutput());
    }

    @Test
    public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
        boolean isPresent;
        StreamTaskTestHarness streamTaskTestHarness = setupSourceStreamTask(new StreamSource(new InfiniteSource()), BasicTypeInfo.STRING_TYPE_INFO, true);
        streamTaskTestHarness.invoke();
        try {
            streamTaskTestHarness.waitForTaskCompletion();
            Assert.fail("should throw an exception");
        } finally {
            if (!isPresent) {
            }
            Assert.assertTrue(streamTaskTestHarness.getOutput().isEmpty());
        }
        Assert.assertTrue(streamTaskTestHarness.getOutput().isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
        boolean isPresent;
        StreamTaskTestHarness streamTaskTestHarness = setupSourceStreamTask(new StreamSource(new InfiniteSource()), BasicTypeInfo.STRING_TYPE_INFO);
        streamTaskTestHarness.invoke();
        streamTaskTestHarness.waitForTaskRunning();
        Thread.sleep(200L);
        streamTaskTestHarness.mo115getTask().cancel();
        try {
            streamTaskTestHarness.waitForTaskCompletion();
        } finally {
            if (!isPresent) {
            }
            Assert.assertTrue(streamTaskTestHarness.getOutput().isEmpty());
        }
        Assert.assertTrue(streamTaskTestHarness.getOutput().isEmpty());
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        StreamSource streamSource = new StreamSource(new InfiniteSource());
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        testProcessingTimeService.setCurrentTime(0L);
        MockStreamTask mockStreamTask = setupSourceOperator(streamSource, TimeCharacteristic.IngestionTime, 10L, testProcessingTimeService);
        ArrayList arrayList = new ArrayList();
        StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime, testProcessingTimeService, mockStreamTask.getCheckpointLock(), streamSource.getContainingTask().getStreamStatusMaintainer(), new CollectorOutput(arrayList), streamSource.getExecutionConfig().getAutoWatermarkInterval(), -1L);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                break;
            }
            testProcessingTimeService.setCurrentTime(j2);
            j = j2 + 10;
        }
        Assert.assertEquals(9L, arrayList.size());
        long j3 = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            j3 += 10;
            Assert.assertEquals(((StreamElement) it.next()).getTimestamp(), j3);
        }
    }

    private static <T> MockStreamTask setupSourceOperator(StreamSource<T, ?> streamSource, TimeCharacteristic timeCharacteristic, long j, TimerService timerService) throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(j);
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStateBackend(new MemoryStateBackend());
        streamConfig.setTimeCharacteristic(timeCharacteristic);
        streamConfig.setOperatorID(new OperatorID());
        DummyEnvironment dummyEnvironment = new DummyEnvironment("MockTwoInputTask", 1, 0);
        StreamStatusMaintainer streamStatusMaintainer = (StreamStatusMaintainer) Mockito.mock(StreamStatusMaintainer.class);
        Mockito.when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
        MockStreamTask build = new MockStreamTaskBuilder(dummyEnvironment).setConfig(streamConfig).setExecutionConfig(executionConfig).setStreamStatusMaintainer(streamStatusMaintainer).setTimerService(timerService).build();
        streamSource.setup(build, streamConfig, (Output) Mockito.mock(Output.class));
        return build;
    }

    private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(StreamSource<T, ?> streamSource, TypeInformation<T> typeInformation) {
        return setupSourceStreamTask(streamSource, typeInformation, false);
    }

    private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(StreamSource<T, ?> streamSource, TypeInformation<T> typeInformation, boolean z) {
        StreamTaskTestHarness<T> streamTaskTestHarness = new StreamTaskTestHarness<>(environment -> {
            SourceStreamTask sourceStreamTask = new SourceStreamTask(environment);
            if (z) {
                try {
                    sourceStreamTask.cancel();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return sourceStreamTask;
        }, typeInformation);
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(streamSource);
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        return streamTaskTestHarness;
    }
}
