package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest.class */
public class AlternatingCheckpointBarrierHandlerTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandlerTest$TestInvokable.class */
    public static class TestInvokable extends AbstractInvokable {
        private List<Long> triggeredCheckpoints;

        TestInvokable() {
            super(new DummyEnvironment());
            this.triggeredCheckpoints = new ArrayList();
        }

        public void invoke() {
        }

        public <E extends Exception> void executeInTaskThread(ThrowingRunnable<E> throwingRunnable, String str, Object... objArr) throws Exception {
            throwingRunnable.run();
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            this.triggeredCheckpoints.add(Long.valueOf(checkpointMetaData.getCheckpointId()));
        }

        public void abortCheckpointOnBarrier(long j, Throwable th) {
        }
    }

    @Test
    public void testCheckpointHandling() throws Exception {
        testBarrierHandling(CheckpointType.CHECKPOINT);
    }

    @Test
    public void testSavepointHandling() throws Exception {
        testBarrierHandling(CheckpointType.SAVEPOINT);
    }

    @Test
    public void testAlternation() throws Exception {
        TestInvokable testInvokable = new TestInvokable();
        CheckpointedInputGate buildGate = buildGate(testInvokable, 123);
        ArrayList arrayList = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 123) {
                Assert.assertEquals(arrayList, testInvokable.triggeredCheckpoints);
                return;
            }
            arrayList.add(Long.valueOf(j2));
            CheckpointType checkpointType = j2 % 2 == 0 ? CheckpointType.CHECKPOINT : CheckpointType.SAVEPOINT;
            for (int i = 0; i < 123; i++) {
                sendBarrier(j2, checkpointType, (TestInputChannel) buildGate.getChannel(i), buildGate);
            }
            j = j2 + 1;
        }
    }

    @Test
    public void testPreviousHandlerReset() throws Exception {
        InputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        build.setInputChannels(new InputChannel[]{new TestInputChannel(build, 0), new TestInputChannel(build, 1)});
        TestInvokable testInvokable = new TestInvokable();
        CheckpointBarrierAligner checkpointBarrierAligner = new CheckpointBarrierAligner("test", testInvokable, new InputGate[]{build});
        CheckpointBarrierUnaligner checkpointBarrierUnaligner = new CheckpointBarrierUnaligner(TestSubtaskCheckpointCoordinator.INSTANCE, "test", testInvokable, new InputGate[]{build});
        AlternatingCheckpointBarrierHandler alternatingCheckpointBarrierHandler = new AlternatingCheckpointBarrierHandler(checkpointBarrierAligner, checkpointBarrierUnaligner, testInvokable);
        for (int i = 0; i < 4; i++) {
            int i2 = i % 2;
            CheckpointType checkpointType = i2 == 0 ? CheckpointType.CHECKPOINT : CheckpointType.SAVEPOINT;
            alternatingCheckpointBarrierHandler.processBarrier(new CheckpointBarrier(i, 0L, new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, i2));
            Assert.assertEquals(Boolean.valueOf(checkpointType.isSavepoint()), Boolean.valueOf(checkpointBarrierAligner.isCheckpointPending()));
            Assert.assertNotEquals(Boolean.valueOf(checkpointBarrierAligner.isCheckpointPending()), Boolean.valueOf(checkpointBarrierUnaligner.isCheckpointPending()));
            if (!checkpointType.isSavepoint()) {
                Assert.assertFalse(alternatingCheckpointBarrierHandler.getAllBarriersReceivedFuture(i).isDone());
                assertInflightDataEquals(checkpointBarrierUnaligner, alternatingCheckpointBarrierHandler, i, build.getNumberOfInputChannels());
            }
        }
    }

    private static void assertInflightDataEquals(CheckpointBarrierHandler checkpointBarrierHandler, CheckpointBarrierHandler checkpointBarrierHandler2, long j, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            InputChannelInfo inputChannelInfo = new InputChannelInfo(0, i2);
            Assert.assertEquals(Boolean.valueOf(checkpointBarrierHandler.hasInflightData(j, inputChannelInfo)), Boolean.valueOf(checkpointBarrierHandler2.hasInflightData(j, inputChannelInfo)));
        }
    }

    @Test
    public void testHasInflightDataBeforeProcessBarrier() throws Exception {
        InputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        build.setInputChannels(new InputChannel[]{new TestInputChannel(build, 0), new TestInputChannel(build, 1)});
        TestInvokable testInvokable = new TestInvokable();
        CheckpointBarrierAligner checkpointBarrierAligner = new CheckpointBarrierAligner("test", testInvokable, new InputGate[]{build});
        CheckpointBarrierUnaligner checkpointBarrierUnaligner = new CheckpointBarrierUnaligner(TestSubtaskCheckpointCoordinator.INSTANCE, "test", testInvokable, new InputGate[]{build});
        AlternatingCheckpointBarrierHandler alternatingCheckpointBarrierHandler = new AlternatingCheckpointBarrierHandler(checkpointBarrierAligner, checkpointBarrierUnaligner, testInvokable);
        checkpointBarrierUnaligner.processBarrier(new CheckpointBarrier(1L, 0L, new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        assertInflightDataEquals(checkpointBarrierUnaligner, alternatingCheckpointBarrierHandler, 1L, build.getNumberOfInputChannels());
        Assert.assertFalse(alternatingCheckpointBarrierHandler.getAllBarriersReceivedFuture(1L).isDone());
    }

    @Test
    public void testOutOfOrderBarrier() throws Exception {
        InputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        build.setInputChannels(new InputChannel[]{new TestInputChannel(build, 0), new TestInputChannel(build, 1)});
        TestInvokable testInvokable = new TestInvokable();
        CheckpointBarrierAligner checkpointBarrierAligner = new CheckpointBarrierAligner("test", testInvokable, new InputGate[]{build});
        CheckpointBarrierUnaligner checkpointBarrierUnaligner = new CheckpointBarrierUnaligner(TestSubtaskCheckpointCoordinator.INSTANCE, "test", testInvokable, new InputGate[]{build});
        AlternatingCheckpointBarrierHandler alternatingCheckpointBarrierHandler = new AlternatingCheckpointBarrierHandler(checkpointBarrierAligner, checkpointBarrierUnaligner, testInvokable);
        long latestCheckpointId = checkpointBarrierAligner.getLatestCheckpointId();
        alternatingCheckpointBarrierHandler.processBarrier(new CheckpointBarrier(10L, 0L, new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        alternatingCheckpointBarrierHandler.processBarrier(new CheckpointBarrier(5L, 0L, new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1));
        Assert.assertEquals(10L, alternatingCheckpointBarrierHandler.getLatestCheckpointId());
        assertInflightDataEquals(checkpointBarrierUnaligner, alternatingCheckpointBarrierHandler, 10L, build.getNumberOfInputChannels());
        Assert.assertEquals(latestCheckpointId, checkpointBarrierAligner.getLatestCheckpointId());
    }

    @Test
    public void testEndOfPartition() throws Exception {
        InputGate build = new SingleInputGateBuilder().setNumberOfChannels(5).setChannelFactory((v0, v1) -> {
            return v0.buildLocalChannel(v1);
        }).build();
        TestInvokable testInvokable = new TestInvokable();
        AlternatingCheckpointBarrierHandler alternatingCheckpointBarrierHandler = new AlternatingCheckpointBarrierHandler(new CheckpointBarrierAligner("test", testInvokable, new InputGate[]{build}), new CheckpointBarrierUnaligner(TestSubtaskCheckpointCoordinator.INSTANCE, "test", testInvokable, new InputGate[]{build}), testInvokable);
        for (int i = 0; i < 2; i++) {
            alternatingCheckpointBarrierHandler.processEndOfPartition();
        }
        Assert.assertEquals(2, r0.getNumClosedChannels());
        Assert.assertEquals(5 - 2, r0.getNumOpenChannels());
    }

    private void testBarrierHandling(CheckpointType checkpointType) throws Exception {
        TestInvokable testInvokable = new TestInvokable();
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        InputChannel testInputChannel = new TestInputChannel(build, 0, false, true);
        InputChannel testInputChannel2 = new TestInputChannel(build, 1, false, true);
        build.setInputChannels(new InputChannel[]{testInputChannel, testInputChannel2});
        AlternatingCheckpointBarrierHandler barrierHandler = barrierHandler(build, testInvokable);
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, barrierHandler);
        sendBarrier(123L, checkpointType, testInputChannel, checkpointedInputGate);
        Assert.assertEquals(Boolean.valueOf(checkpointType.isSavepoint()), Boolean.valueOf(testInvokable.triggeredCheckpoints.isEmpty()));
        Assert.assertEquals(Boolean.valueOf(checkpointType.isSavepoint()), Boolean.valueOf(barrierHandler.isBlocked(testInputChannel.getChannelInfo())));
        Assert.assertFalse(barrierHandler.isBlocked(testInputChannel2.getChannelInfo()));
        sendBarrier(123L, checkpointType, testInputChannel2, checkpointedInputGate);
        Assert.assertEquals(Collections.singletonList(123L), testInvokable.triggeredCheckpoints);
        for (TestInputChannel testInputChannel3 : build.getInputChannels().values()) {
            Assert.assertFalse(barrierHandler.isBlocked(testInputChannel3.getChannelInfo()));
            Assert.assertEquals(String.format("channel %d should be resumed", Integer.valueOf(testInputChannel3.getChannelIndex())), Boolean.valueOf(checkpointType.isSavepoint()), Boolean.valueOf(testInputChannel3.isResumed()));
        }
    }

    private void sendBarrier(long j, CheckpointType checkpointType, TestInputChannel testInputChannel, CheckpointedInputGate checkpointedInputGate) throws Exception {
        testInputChannel.read(barrier(j, checkpointType).retainBuffer());
        do {
        } while (checkpointedInputGate.pollNext().isPresent());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static AlternatingCheckpointBarrierHandler barrierHandler(SingleInputGate singleInputGate, AbstractInvokable abstractInvokable) {
        Arrays.fill(new InputGate[singleInputGate.getNumberOfInputChannels()], singleInputGate);
        return new AlternatingCheckpointBarrierHandler(new CheckpointBarrierAligner("test", abstractInvokable, new InputGate[]{singleInputGate}), new CheckpointBarrierUnaligner(TestSubtaskCheckpointCoordinator.INSTANCE, "test", abstractInvokable, new InputGate[]{singleInputGate}), abstractInvokable);
    }

    private Buffer barrier(long j, CheckpointType checkpointType) throws IOException {
        return EventSerializer.toBuffer(new CheckpointBarrier(j, System.currentTimeMillis(), new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault(), true, true)));
    }

    private static CheckpointedInputGate buildGate(TestInvokable testInvokable, int i) {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(i).build();
        TestInputChannel[] testInputChannelArr = new TestInputChannel[i];
        for (int i2 = 0; i2 < i; i2++) {
            testInputChannelArr[i2] = new TestInputChannel(build, i2, false, true);
        }
        build.setInputChannels(testInputChannelArr);
        return new CheckpointedInputGate(build, barrierHandler(build, testInvokable));
    }
}
