package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.class */
public class StreamOperatorWrapperTest extends TestLogger {
    private static SystemProcessingTimeService timerService;
    private static final int numOperators = 3;
    private List<StreamOperatorWrapper<?, ?>> operatorWrappers;
    private ConcurrentLinkedQueue<Object> output;
    private volatile StreamTask<?, ?> containingTask;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest$TestOneInputStreamOperator.class */
    public static class TestOneInputStreamOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, BoundedOneInput {
        private static final long serialVersionUID = 1;
        private final String name;
        private final ConcurrentLinkedQueue<Object> output;
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final TimerMailController timerMailController;

        TestOneInputStreamOperator(String str, ConcurrentLinkedQueue<Object> concurrentLinkedQueue, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, TimerMailController timerMailController) {
            this.name = str;
            this.output = concurrentLinkedQueue;
            this.processingTimeService = processingTimeService;
            this.mailboxExecutor = mailboxExecutor;
            this.timerMailController = timerMailController;
            processingTimeService.registerTimer(Long.MAX_VALUE, j -> {
                concurrentLinkedQueue.add("[" + str + "]: Timer not triggered");
            });
        }

        public String getName() {
            return this.name;
        }

        public void processElement(StreamRecord<String> streamRecord) {
        }

        public void endInput() throws InterruptedException {
            this.output.add("[" + this.name + "]: End of input");
            ProcessingTimeCallback processingTimeCallback = j -> {
                this.output.add("[" + this.name + "]: Timer that was in mailbox before closing operator");
            };
            this.processingTimeService.registerTimer(0L, processingTimeCallback);
            this.timerMailController.getInMailboxLatch(processingTimeCallback).await();
        }

        public void close() throws Exception {
            ProcessingTimeCallback processingTimeCallback = j -> {
                this.output.add("[" + this.name + "]: Timer to put in mailbox when closing operator");
            };
            Assert.assertNotNull(this.processingTimeService.registerTimer(0L, processingTimeCallback));
            Assert.assertNull(this.timerMailController.getPuttingLatch(processingTimeCallback));
            this.mailboxExecutor.submit(() -> {
                return Boolean.valueOf(this.output.add("[" + this.name + "]: Mail to put in mailbox when closing operator"));
            }, "");
            this.output.add("[" + this.name + "]: Bye");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest$TimerMailController.class */
    private static class TimerMailController {
        private final StreamTask<?, ?> containingTask;
        private final MailboxExecutor mailboxExecutor;
        private final ConcurrentHashMap<ProcessingTimeCallback, OneShotLatch> puttingLatches = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<ProcessingTimeCallback, OneShotLatch> inMailboxLatches = new ConcurrentHashMap<>();

        TimerMailController(StreamTask<?, ?> streamTask, MailboxExecutor mailboxExecutor) {
            this.containingTask = streamTask;
            this.mailboxExecutor = mailboxExecutor;
        }

        OneShotLatch getPuttingLatch(ProcessingTimeCallback processingTimeCallback) {
            return this.puttingLatches.get(processingTimeCallback);
        }

        OneShotLatch getInMailboxLatch(ProcessingTimeCallback processingTimeCallback) {
            return this.inMailboxLatches.get(processingTimeCallback);
        }

        ProcessingTimeCallback wrapCallback(ProcessingTimeCallback processingTimeCallback) {
            this.puttingLatches.put(processingTimeCallback, new OneShotLatch());
            this.inMailboxLatches.put(processingTimeCallback, new OneShotLatch());
            return j -> {
                this.puttingLatches.get(processingTimeCallback).trigger();
                this.containingTask.deferCallbackToMailbox(this.mailboxExecutor, processingTimeCallback).onProcessingTime(j);
                this.inMailboxLatches.get(processingTimeCallback).trigger();
            };
        }
    }

    @BeforeClass
    public static void startTimeService() {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        timerService = new SystemProcessingTimeService((v1) -> {
            r2.complete(v1);
        });
    }

    @AfterClass
    public static void shutdownTimeService() {
        timerService.shutdownService();
    }

    @Before
    public void setup() throws Exception {
        this.operatorWrappers = new ArrayList();
        this.output = new ConcurrentLinkedQueue<>();
        MockEnvironment build = MockEnvironment.builder().build();
        Throwable th = null;
        try {
            this.containingTask = new MockStreamTaskBuilder(build).build();
            for (int i = 0; i < numOperators; i++) {
                MailboxExecutor createExecutor = this.containingTask.getMailboxExecutorFactory().createExecutor(i);
                TimerMailController timerMailController = new TimerMailController(this.containingTask, createExecutor);
                SystemProcessingTimeService systemProcessingTimeService = timerService;
                timerMailController.getClass();
                ProcessingTimeService processingTimeServiceImpl = new ProcessingTimeServiceImpl(systemProcessingTimeService, timerMailController::wrapCallback);
                TestOneInputStreamOperator testOneInputStreamOperator = new TestOneInputStreamOperator("Operator" + i, this.output, processingTimeServiceImpl, createExecutor, timerMailController);
                testOneInputStreamOperator.setProcessingTimeService(processingTimeServiceImpl);
                this.operatorWrappers.add(new StreamOperatorWrapper<>(testOneInputStreamOperator, Optional.ofNullable(testOneInputStreamOperator.getProcessingTimeService()), createExecutor));
            }
            StreamOperatorWrapper<?, ?> streamOperatorWrapper = null;
            for (StreamOperatorWrapper<?, ?> streamOperatorWrapper2 : this.operatorWrappers) {
                if (streamOperatorWrapper != null) {
                    streamOperatorWrapper.setNext(streamOperatorWrapper2);
                }
                streamOperatorWrapper2.setPrevious(streamOperatorWrapper);
                streamOperatorWrapper = streamOperatorWrapper2;
            }
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @After
    public void teardown() throws Exception {
        this.containingTask.cleanup();
    }

    @Test
    public void testClose() throws Exception {
        this.output.clear();
        this.operatorWrappers.get(0).close(this.containingTask.getActionExecutor());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.operatorWrappers.size(); i++) {
            String str = "[Operator" + i + "]";
            Collections.addAll(arrayList, str + ": End of input", str + ": Timer that was in mailbox before closing operator", str + ": Bye", str + ": Mail to put in mailbox when closing operator");
        }
        Assert.assertArrayEquals("Output was not correct.", arrayList.subList(2, arrayList.size()).toArray(), this.output.toArray());
    }

    @Test
    public void testClosingOperatorWithException() {
        AbstractStreamOperator<Void> abstractStreamOperator = new AbstractStreamOperator<Void>() { // from class: org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapperTest.1
            public void close() throws Exception {
                throw new Exception("test exception at closing");
            }
        };
        try {
            new StreamOperatorWrapper(abstractStreamOperator, Optional.ofNullable(abstractStreamOperator.getProcessingTimeService()), this.containingTask.getMailboxExecutorFactory().createExecutor(2147483646)).close(this.containingTask.getActionExecutor());
            Assert.fail("should throw an exception");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "test exception at closing").isPresent());
        }
    }

    @Test
    public void testReadIterator() {
        StreamOperatorWrapper.ReadIterator readIterator = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(0), false);
        for (int i = 0; i < this.operatorWrappers.size(); i++) {
            Assert.assertTrue(readIterator.hasNext());
            StreamOperatorWrapper<?, ?> streamOperatorWrapper = (StreamOperatorWrapper) readIterator.next();
            Assert.assertNotNull(streamOperatorWrapper);
            Assert.assertEquals("Operator" + i, getStreamOperatorFromWrapper(streamOperatorWrapper).getName());
        }
        Assert.assertFalse(readIterator.hasNext());
        StreamOperatorWrapper.ReadIterator readIterator2 = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(this.operatorWrappers.size() - 1), true);
        for (int size = this.operatorWrappers.size() - 1; size >= 0; size--) {
            Assert.assertTrue(readIterator2.hasNext());
            StreamOperatorWrapper<?, ?> streamOperatorWrapper2 = (StreamOperatorWrapper) readIterator2.next();
            Assert.assertNotNull(streamOperatorWrapper2);
            Assert.assertEquals("Operator" + size, getStreamOperatorFromWrapper(streamOperatorWrapper2).getName());
        }
        Assert.assertFalse(readIterator2.hasNext());
    }

    private TestOneInputStreamOperator getStreamOperatorFromWrapper(StreamOperatorWrapper<?, ?> streamOperatorWrapper) {
        return (TestOneInputStreamOperator) Objects.requireNonNull(streamOperatorWrapper.getStreamOperator());
    }
}
