package org.apache.flink.streaming.runtime.streamstatus;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.class */
public class StatusWatermarkValveTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest$StatusWatermarkOutput.class */
    private static class StatusWatermarkOutput implements PushingAsyncDataInput.DataOutput {
        private BlockingQueue<StreamElement> allOutputs;

        private StatusWatermarkOutput() {
            this.allOutputs = new LinkedBlockingQueue();
        }

        public void emitWatermark(Watermark watermark) {
            this.allOutputs.add(watermark);
        }

        public void emitStreamStatus(StreamStatus streamStatus) {
            this.allOutputs.add(streamStatus);
        }

        public void emitRecord(StreamRecord streamRecord) {
            throw new UnsupportedOperationException();
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            throw new UnsupportedOperationException();
        }

        public StreamElement popLastSeenOutput() {
            return this.allOutputs.poll();
        }
    }

    @Test
    public void testSingleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0);
        Assert.assertEquals(new Watermark(0L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        Assert.assertEquals(new Watermark(25L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testSingleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        Assert.assertEquals(new Watermark(25L), statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(18L), 0);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(42L), 0);
        Assert.assertEquals(new Watermark(42L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testSingleInputStreamStatusToggling() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1, statusWatermarkOutput);
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertEquals(StreamStatus.IDLE, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertEquals(StreamStatus.ACTIVE, statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testSingleInputWatermarksIntactDuringIdleness() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        Assert.assertEquals(new Watermark(25L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertEquals(StreamStatus.IDLE, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(50L), 0);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals(25L, statusWatermarkValve.getInputChannelStatus(0).watermark);
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertEquals(StreamStatus.ACTIVE, statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(50L), 0);
        Assert.assertEquals(new Watermark(50L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(0L), 2);
        Assert.assertEquals(new Watermark(0L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 1);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 2);
        Assert.assertEquals(new Watermark(0L), statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(12L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(8L), 2);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 2);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(15L), 1);
        Assert.assertEquals(new Watermark(10L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(17L), 2);
        Assert.assertEquals(new Watermark(12L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(20L), 0);
        Assert.assertEquals(new Watermark(15L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1);
        statusWatermarkValve.inputWatermark(new Watermark(17L), 2);
        Assert.assertEquals(new Watermark(10L), statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(12L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(8L), 1);
        statusWatermarkValve.inputWatermark(new Watermark(15L), 2);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputStreamStatusToggling() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(2, statusWatermarkOutput);
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        Assert.assertEquals(StreamStatus.IDLE, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 1);
        Assert.assertEquals(StreamStatus.ACTIVE, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 0);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(15L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(new Watermark(10L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(18L), 1);
        Assert.assertEquals(new Watermark(15L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(20L), 0);
        Assert.assertEquals(new Watermark(18L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1);
        statusWatermarkValve.inputWatermark(new Watermark(17L), 2);
        Assert.assertEquals(new Watermark(10L), statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertEquals(new Watermark(17L), statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(new Watermark(25L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputFlushMaxWatermarkAndStreamStatusOnceAllInputsBecomeIdle() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(5L), 1);
        statusWatermarkValve.inputWatermark(new Watermark(3L), 2);
        Assert.assertEquals(new Watermark(3L), statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(new Watermark(10L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals(StreamStatus.IDLE, statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testMultipleInputWatermarkRealignmentAfterResumeActive() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(7L), 1);
        statusWatermarkValve.inputWatermark(new Watermark(3L), 2);
        Assert.assertEquals(new Watermark(3L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(new Watermark(7L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 2);
        Assert.assertFalse(statusWatermarkValve.getInputChannelStatus(2).isWatermarkAligned);
        statusWatermarkValve.inputWatermark(new Watermark(5L), 2);
        Assert.assertEquals(5L, statusWatermarkValve.getInputChannelStatus(2).watermark);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(9L), 2);
        Assert.assertTrue(statusWatermarkValve.getInputChannelStatus(2).isWatermarkAligned);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputWatermark(new Watermark(12L), 1);
        Assert.assertEquals(new Watermark(9L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }

    @Test
    public void testNoOutputWhenAllActiveChannelsAreUnaligned() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 0);
        statusWatermarkValve.inputWatermark(new Watermark(7L), 1);
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 2);
        Assert.assertEquals(new Watermark(7L), statusWatermarkOutput.popLastSeenOutput());
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.ACTIVE, 2);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 0);
        statusWatermarkValve.inputStreamStatus(StreamStatus.IDLE, 1);
        Assert.assertEquals((Object) null, statusWatermarkOutput.popLastSeenOutput());
    }
}
