package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.class */
public class MultipleInputStreamTaskTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$DuplicatingOperator.class */
    static class DuplicatingOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$DuplicatingOperator$DuplicatingInput.class */
        class DuplicatingInput extends AbstractInput<String, String> {
            public DuplicatingInput(AbstractStreamOperatorV2<String> abstractStreamOperatorV2, int i) {
                super(abstractStreamOperatorV2, i);
            }

            public void processElement(StreamRecord<String> streamRecord) throws Exception {
                this.output.collect(streamRecord);
                this.output.collect(streamRecord);
            }
        }

        public DuplicatingOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new DuplicatingInput(this, 1), new DuplicatingInput(this, 2), new DuplicatingInput(this, 3));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$DuplicatingOperatorFactory.class */
    private static class DuplicatingOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private DuplicatingOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new DuplicatingOperator(streamOperatorParameters);
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return DuplicatingOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$MapToStringMultipleInputOperator.class */
    private static class MapToStringMultipleInputOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {
        private static final long serialVersionUID = 1;
        private boolean openCalled;
        private boolean closeCalled;

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$MapToStringMultipleInputOperator$MapToStringInput.class */
        public class MapToStringInput<T> extends AbstractInput<T, String> {
            public MapToStringInput(AbstractStreamOperatorV2<String> abstractStreamOperatorV2, int i) {
                super(abstractStreamOperatorV2, i);
            }

            public void processElement(StreamRecord<T> streamRecord) throws Exception {
                if (!MapToStringMultipleInputOperator.this.openCalled) {
                    Assert.fail("Open was not called before run.");
                }
                if (streamRecord.hasTimestamp()) {
                    this.output.collect(new StreamRecord(streamRecord.getValue().toString(), streamRecord.getTimestamp()));
                } else {
                    this.output.collect(new StreamRecord(streamRecord.getValue().toString()));
                }
            }
        }

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters, 3);
        }

        public void open() throws Exception {
            super.open();
            if (this.closeCalled) {
                Assert.fail("Close called before open.");
            }
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!this.openCalled) {
                Assert.fail("Open was not called before close.");
            }
            this.closeCalled = true;
        }

        public List<Input> getInputs() {
            return Arrays.asList(new MapToStringInput(this, 1), new MapToStringInput(this, 2), new MapToStringInput(this, 3));
        }

        public boolean wasCloseCalled() {
            return this.closeCalled;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$MapToStringMultipleInputOperatorFactory.class */
    private static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private MapToStringMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new MapToStringMultipleInputOperator(streamOperatorParameters);
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return MapToStringMultipleInputOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$TestBoundedMultipleInputOperatorFactory.class */
    private static class TestBoundedMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private TestBoundedMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new TestBoundedMultipleInputOperator("Operator0", streamOperatorParameters);
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return TestBoundedMultipleInputOperator.class;
        }
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processElement(new StreamRecord("Hello", 0 + 1), 0);
            arrayDeque.add(new StreamRecord("Hello", 0 + 1));
            build.processElement(new StreamRecord(1337, 0 + 2), 1);
            arrayDeque.add(new StreamRecord("1337", 0 + 2));
            build.processElement(new StreamRecord(Double.valueOf(42.44d), 0 + 3), 2);
            arrayDeque.add(new StreamRecord("42.44", 0 + 3));
            build.endInput();
            build.waitForTaskCompletion();
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            build.processElement(new StreamRecord("Ciao-0-0", 0L), 0, 1);
            arrayDeque.add(new StreamRecord("Ciao-0-0", 0L));
            build.processElement(new StreamRecord(11, 0L), 1, 1);
            build.processElement(new StreamRecord(Double.valueOf(1.0d), 0L), 2, 0);
            arrayDeque.add(new StreamRecord("11", 0L));
            arrayDeque.add(new StreamRecord("1.0", 0L));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            arrayDeque.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            build.processElement(new StreamRecord("Witam-0-1", 0L), 0, 1);
            build.processElement(new StreamRecord(42, 0L), 1, 1);
            build.processElement(new StreamRecord(Double.valueOf(1.0d), 0L), 2, 1);
            arrayDeque.add(new StreamRecord("Witam-0-1", 0L));
            arrayDeque.add(new StreamRecord("42", 0L));
            arrayDeque.add(new StreamRecord("1.0", 0L));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            arrayDeque.add(new CancelCheckpointMarker(0L));
            arrayDeque.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            build.waitForTaskCompletion();
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.1
            public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String str) {
                return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, str);
            }
        };
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>) new DuplicatingOperatorFactory()).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish()).setTaskMetricGroup(taskMetricGroup).build();
        Throwable th = null;
        try {
            try {
                Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
                Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
                for (int i = 0; i < 5; i++) {
                    build.processElement(new StreamRecord("hello"), 0, 0);
                }
                for (int i2 = 0; i2 < 3; i2++) {
                    build.processElement(new StreamRecord("hello"), 1, 0);
                }
                for (int i3 = 0; i3 < 2; i3++) {
                    build.processElement(new StreamRecord("hello"), 2, 0);
                }
                Assert.assertEquals(5 + 3 + 2, numRecordsInCounter.getCount());
                Assert.assertEquals(r0 * 2 * 2 * 2, numRecordsOutCounter.getCount());
                build.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testClosingAllOperatorsOnChainProperly() throws Exception {
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>) new TestBoundedMultipleInputOperatorFactory()).chain(new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish()).build();
        ArrayDeque arrayDeque = new ArrayDeque();
        try {
            build.processElement(new StreamRecord("Hello-1"), 0);
            build.endInput(0);
            build.process();
            build.processElement(new StreamRecord("Hello-2"), 1);
            build.processElement(new StreamRecord("Hello-3"), 2);
            build.endInput(1);
            build.process();
            build.endInput(2);
            build.process();
            Assert.assertEquals(true, Boolean.valueOf(build.getStreamTask().getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).isDone()));
            build.waitForTaskCompletion();
            arrayDeque.add(new StreamRecord("[Operator0-1]: Hello-1"));
            arrayDeque.add(new StreamRecord("[Operator0-1]: End of input"));
            arrayDeque.add(new StreamRecord("[Operator0-2]: Hello-2"));
            arrayDeque.add(new StreamRecord("[Operator0-3]: Hello-3"));
            arrayDeque.add(new StreamRecord("[Operator0-2]: End of input"));
            arrayDeque.add(new StreamRecord("[Operator0-3]: End of input"));
            arrayDeque.add(new StreamRecord("[Operator0]: Bye"));
            arrayDeque.add(new StreamRecord("[Operator1]: End of input"));
            arrayDeque.add(new StreamRecord("[Operator1]: Bye"));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
        } finally {
            build.close();
        }
    }

    @Test
    public void testInputFairness() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.setAutoProcess(false);
            build.processElement(new StreamRecord("0"), 0);
            build.processElement(new StreamRecord("1"), 0);
            build.processElement(new StreamRecord("2"), 0);
            build.processElement(new StreamRecord("3"), 0);
            build.processElement(new StreamRecord("0"), 2);
            build.processElement(new StreamRecord("1"), 2);
            build.process();
            arrayDeque.add(new StreamRecord("0"));
            arrayDeque.add(new StreamRecord("0"));
            arrayDeque.add(new StreamRecord("1"));
            arrayDeque.add(new StreamRecord("1"));
            arrayDeque.add(new StreamRecord("2"));
            arrayDeque.add(new StreamRecord("3"));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWatermark() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processElement(new Watermark(0L), 0, 0);
            build.processElement(new Watermark(0L), 0, 1);
            build.processElement(new Watermark(0L), 1, 0);
            build.processElement(new Watermark(0L), 1, 1);
            build.processElement(new Watermark(0L), 2, 0);
            MatcherAssert.assertThat(build.getOutput(), IsEmptyCollection.empty());
            build.processElement(new Watermark(0L), 2, 1);
            arrayDeque.add(new Watermark(0L));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(new StreamRecord("Hello", 0L), 0, 0);
            build.processElement(new StreamRecord(42, 0L), 1, 1);
            arrayDeque.add(new StreamRecord("Hello", 0L));
            arrayDeque.add(new StreamRecord("42", 0L));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(new Watermark(0 + 4), 0, 0);
            build.processElement(new Watermark(0 + 3), 0, 1);
            build.processElement(new Watermark(0 + 3), 1, 0);
            build.processElement(new Watermark(0 + 4), 1, 1);
            build.processElement(new Watermark(0 + 3), 2, 0);
            build.processElement(new Watermark(0 + 2), 2, 1);
            arrayDeque.add(new Watermark(0 + 2));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(new Watermark(0 + 4), 2, 1);
            arrayDeque.add(new Watermark(0 + 3));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(new Watermark(0 + 4), 0, 1);
            build.processElement(new Watermark(0 + 4), 1, 0);
            build.processElement(new Watermark(0 + 4), 2, 0);
            arrayDeque.add(new Watermark(0 + 4));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            Assert.assertEquals(2L, TestHarnessUtil.getRawElementsFromOutput(build.getOutput()).size());
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processElement(StreamStatus.IDLE, 0, 1);
            build.processElement(StreamStatus.IDLE, 1, 1);
            build.processElement(StreamStatus.IDLE, 2, 0);
            build.processElement(new Watermark(0 + 6), 0, 0);
            build.processElement(new Watermark(0 + 6), 1, 0);
            build.processElement(new Watermark(0 + 5), 2, 1);
            build.processElement(StreamStatus.IDLE, 2, 1);
            arrayDeque.add(new Watermark(0 + 5));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(StreamStatus.IDLE, 0, 0);
            build.processElement(StreamStatus.IDLE, 1, 0);
            arrayDeque.add(StreamStatus.IDLE);
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(StreamStatus.ACTIVE, 1, 0);
            build.processElement(StreamStatus.ACTIVE, 0, 1);
            arrayDeque.add(StreamStatus.ACTIVE);
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        final OperatorID operatorID = new OperatorID();
        final OperatorID operatorID2 = new OperatorID();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup2 = new InterceptingOperatorMetricGroup();
        TaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.2
            public OperatorMetricGroup getOrAddOperator(OperatorID operatorID3, String str) {
                return operatorID3.equals(operatorID) ? interceptingOperatorMetricGroup : operatorID3.equals(operatorID2) ? interceptingOperatorMetricGroup2 : super.getOrAddOperator(operatorID3, str);
            }
        };
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain(operatorID, (StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).chain(operatorID2, new OneInputStreamTaskTest.WatermarkMetricOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish()).setTaskMetricGroup(taskMetricGroup).build();
        Throwable th = null;
        try {
            try {
                Gauge gauge = taskMetricGroup.get("currentInputWatermark");
                Gauge gauge2 = interceptingOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(1));
                Gauge gauge3 = interceptingOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(2));
                Gauge gauge4 = interceptingOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(3));
                Gauge gauge5 = interceptingOperatorMetricGroup.get("currentInputWatermark");
                Gauge gauge6 = interceptingOperatorMetricGroup.get("currentOutputWatermark");
                Gauge gauge7 = interceptingOperatorMetricGroup2.get("currentInputWatermark");
                Gauge gauge8 = interceptingOperatorMetricGroup2.get("currentOutputWatermark");
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge2.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge8.getValue()).longValue());
                build.processElement(new Watermark(1L), 0);
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
                Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge8.getValue()).longValue());
                build.processElement(new Watermark(2L), 1);
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
                Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
                Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge8.getValue()).longValue());
                build.processElement(new Watermark(2L), 2);
                Assert.assertEquals(1L, ((Long) gauge.getValue()).longValue());
                Assert.assertEquals(1L, ((Long) gauge5.getValue()).longValue());
                Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge4.getValue()).longValue());
                Assert.assertEquals(1L, ((Long) gauge6.getValue()).longValue());
                Assert.assertEquals(1L, ((Long) gauge7.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge8.getValue()).longValue());
                build.processElement(new Watermark(4L), 0);
                build.processElement(new Watermark(3L), 1);
                Assert.assertEquals(2L, ((Long) gauge.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge5.getValue()).longValue());
                Assert.assertEquals(4L, ((Long) gauge2.getValue()).longValue());
                Assert.assertEquals(3L, ((Long) gauge3.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge4.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge6.getValue()).longValue());
                Assert.assertEquals(2L, ((Long) gauge7.getValue()).longValue());
                Assert.assertEquals(4L, ((Long) gauge8.getValue()).longValue());
                build.endInput();
                build.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).setTaskMetricGroup(new StreamTaskTestHarness.TestTaskMetricGroup(concurrentHashMap)).build();
        Throwable th = null;
        try {
            MatcherAssert.assertThat(concurrentHashMap, IsMapContaining.hasKey("checkpointAlignmentTime"));
            MatcherAssert.assertThat(concurrentHashMap, IsMapContaining.hasKey("checkpointStartDelayNanos"));
            build.endInput();
            build.waitForTaskCompletion();
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testLatencyMarker() throws Exception {
        StreamTaskMailboxTestHarness build = new MultipleInputStreamTaskTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory()).setTaskMetricGroup(new StreamTaskTestHarness.TestTaskMetricGroup(new ConcurrentHashMap())).build();
        Throwable th = null;
        try {
            try {
                ArrayDeque arrayDeque = new ArrayDeque();
                LatencyMarker latencyMarker = new LatencyMarker(42L, new OperatorID(), 0);
                build.processElement(latencyMarker);
                arrayDeque.add(latencyMarker);
                MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
                build.endInput();
                build.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }
}
