package org.apache.flink.streaming.util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.class */
public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
    protected StreamOperator<OUT> operator;
    protected final StreamOperatorFactory<OUT> factory;
    protected final ConcurrentLinkedQueue<Object> outputList;
    protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
    protected final StreamConfig config;
    protected final ExecutionConfig executionConfig;
    protected final TestProcessingTimeService processingTimeService;
    protected final MockTtlTimeProvider ttlTimeProvider;
    protected final MockStreamTask<OUT, ?> mockTask;
    protected final TestTaskStateManager taskStateManager;
    final MockEnvironment environment;
    private final Optional<MockEnvironment> internalEnvironment;
    protected StreamTaskStateInitializer streamTaskStateInitializer;
    private final TaskMailbox taskMailbox;
    protected StateBackend stateBackend;
    private CheckpointStorage checkpointStorage;
    private final Object checkpointLock;
    private static final OperatorStateRepartitioner<OperatorStateHandle> operatorStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
    private boolean setupCalled;
    private boolean initializeCalled;
    private volatile boolean wasFailedExternally;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness$MockOutput.class */
    public class MockOutput implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;
        private TypeSerializer sideOutputSerializer;

        MockOutput(AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness) {
            this(null);
        }

        MockOutput(TypeSerializer<OUT> typeSerializer) {
            this.outputSerializer = typeSerializer;
        }

        public void emitWatermark(Watermark watermark) {
            AbstractStreamOperatorTestHarness.this.outputList.add(watermark);
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            AbstractStreamOperatorTestHarness.this.outputList.add(latencyMarker);
        }

        public void collect(StreamRecord<OUT> streamRecord) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject(streamRecord.getValue()).createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            }
            if (streamRecord.hasTimestamp()) {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(streamRecord.getValue()), streamRecord.getTimestamp()));
            } else {
                AbstractStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(streamRecord.getValue())));
            }
        }

        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
            this.sideOutputSerializer = outputTag.getTypeInfo().createSerializer(AbstractStreamOperatorTestHarness.this.executionConfig);
            ConcurrentLinkedQueue<Object> concurrentLinkedQueue = AbstractStreamOperatorTestHarness.this.sideOutputLists.get(outputTag);
            if (concurrentLinkedQueue == null) {
                concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
                AbstractStreamOperatorTestHarness.this.sideOutputLists.put(outputTag, concurrentLinkedQueue);
            }
            if (streamRecord.hasTimestamp()) {
                concurrentLinkedQueue.add(new StreamRecord(this.sideOutputSerializer.copy(streamRecord.getValue()), streamRecord.getTimestamp()));
            } else {
                concurrentLinkedQueue.add(new StreamRecord(this.sideOutputSerializer.copy(streamRecord.getValue())));
            }
        }

        public void close() {
        }
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> streamOperator, int i, int i2, int i3) throws Exception {
        this(streamOperator, i, i2, i3, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> streamOperator, int i, int i2, int i3, OperatorID operatorID) throws Exception {
        this((StreamOperator) streamOperator, (StreamOperatorFactory) SimpleOperatorFactory.of(streamOperator), new MockEnvironmentBuilder().setTaskName("MockTask").setManagedMemorySize(3145728L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE).setMaxParallelism(i).setParallelism(i2).setSubtaskIndex(i3).build(), true, operatorID);
    }

    public AbstractStreamOperatorTestHarness(StreamOperatorFactory<OUT> streamOperatorFactory, MockEnvironment mockEnvironment) throws Exception {
        this((StreamOperator) null, (StreamOperatorFactory) streamOperatorFactory, mockEnvironment, false, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperatorFactory<OUT> streamOperatorFactory, int i, int i2, int i3) throws Exception {
        this(streamOperatorFactory, i, i2, i3, new OperatorID());
    }

    public AbstractStreamOperatorTestHarness(StreamOperatorFactory<OUT> streamOperatorFactory, int i, int i2, int i3, OperatorID operatorID) throws Exception {
        this((StreamOperator) null, (StreamOperatorFactory) streamOperatorFactory, new MockEnvironmentBuilder().setTaskName("MockTask").setManagedMemorySize(3145728L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE).setMaxParallelism(i).setParallelism(i2).setSubtaskIndex(i3).build(), true, operatorID);
    }

    public AbstractStreamOperatorTestHarness(StreamOperator<OUT> streamOperator, MockEnvironment mockEnvironment) throws Exception {
        this((StreamOperator) streamOperator, (StreamOperatorFactory) SimpleOperatorFactory.of(streamOperator), mockEnvironment, false, new OperatorID());
    }

    private AbstractStreamOperatorTestHarness(StreamOperator<OUT> streamOperator, StreamOperatorFactory<OUT> streamOperatorFactory, MockEnvironment mockEnvironment, boolean z, OperatorID operatorID) throws Exception {
        this.stateBackend = new MemoryStateBackend();
        this.checkpointStorage = this.stateBackend.createCheckpointStorage(new JobID());
        this.setupCalled = false;
        this.initializeCalled = false;
        this.wasFailedExternally = false;
        this.operator = streamOperator;
        this.factory = streamOperatorFactory;
        this.outputList = new ConcurrentLinkedQueue<>();
        this.sideOutputLists = new HashMap();
        this.config = new StreamConfig(mockEnvironment.getTaskConfiguration());
        this.config.setCheckpointingEnabled(true);
        this.config.setOperatorID(operatorID);
        this.executionConfig = mockEnvironment.getExecutionConfig();
        this.checkpointLock = new Object();
        this.environment = (MockEnvironment) Preconditions.checkNotNull(mockEnvironment);
        this.taskStateManager = mockEnvironment.getTaskStateManager();
        this.internalEnvironment = z ? Optional.of(this.environment) : Optional.empty();
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        this.ttlTimeProvider = new MockTtlTimeProvider();
        this.ttlTimeProvider.setCurrentTimestamp(0L);
        this.streamTaskStateInitializer = createStreamTaskStateManager(this.environment, this.stateBackend, this.ttlTimeProvider);
        BiConsumer<String, Throwable> biConsumer = (str, th) -> {
            this.wasFailedExternally = true;
        };
        this.taskMailbox = new TaskMailboxImpl();
        this.mockTask = new MockStreamTaskBuilder(mockEnvironment).setCheckpointLock(this.checkpointLock).setConfig(this.config).setExecutionConfig(this.executionConfig).setStreamTaskStateInitializer(this.streamTaskStateInitializer).setCheckpointStorage(this.checkpointStorage).setTimerService(this.processingTimeService).setHandleAsyncException(biConsumer).setTaskMailbox(this.taskMailbox).build();
    }

    protected StreamTaskStateInitializer createStreamTaskStateManager(Environment environment, StateBackend stateBackend, TtlTimeProvider ttlTimeProvider) {
        return new StreamTaskStateInitializerImpl(environment, stateBackend, ttlTimeProvider);
    }

    public void setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
        try {
            this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Deprecated
    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public MockEnvironment getEnvironment() {
        return this.environment;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public StreamConfig getStreamConfig() {
        return this.config;
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public <X> ConcurrentLinkedQueue<StreamRecord<X>> getSideOutput(OutputTag<X> outputTag) {
        return (ConcurrentLinkedQueue) this.sideOutputLists.get(outputTag);
    }

    public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
        LinkedList linkedList = new LinkedList();
        Iterator<Object> it = getOutput().iterator();
        while (it.hasNext()) {
            Object next = it.next();
            if (next instanceof StreamRecord) {
                linkedList.add((StreamRecord) next);
            }
        }
        return linkedList;
    }

    public List<OUT> extractOutputValues() {
        List<StreamRecord<? extends OUT>> extractOutputStreamRecords = extractOutputStreamRecords();
        ArrayList arrayList = new ArrayList();
        Iterator<StreamRecord<? extends OUT>> it = extractOutputStreamRecords.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue());
        }
        return arrayList;
    }

    public void setup() {
        setup(null);
    }

    public void setup(TypeSerializer<OUT> typeSerializer) {
        if (this.setupCalled) {
            return;
        }
        this.streamTaskStateInitializer = createStreamTaskStateManager(this.environment, this.stateBackend, this.ttlTimeProvider);
        this.mockTask.setStreamTaskStateInitializer(this.streamTaskStateInitializer);
        if (this.operator == null) {
            this.operator = (StreamOperator) StreamOperatorFactoryUtil.createOperator(this.factory, this.mockTask, this.config, new MockOutput(typeSerializer), (OperatorEventDispatcher) null).f0;
        } else {
            if (this.operator instanceof AbstractStreamOperator) {
                this.operator.setProcessingTimeService(this.processingTimeService);
            }
            if (this.operator instanceof SetupableStreamOperator) {
                this.operator.setup(this.mockTask, this.config, new MockOutput(typeSerializer));
            }
        }
        this.setupCalled = true;
        this.mockTask.init();
    }

    public void initializeState(OperatorSubtaskState operatorSubtaskState) throws Exception {
        initializeState(operatorSubtaskState, null);
    }

    public void initializeState(String str) throws Exception {
        initializeState(OperatorSnapshotUtil.readStateHandle(str));
    }

    public void initializeEmptyState() throws Exception {
        initializeState((OperatorSubtaskState) null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v32, types: [java.util.Collection] */
    /* JADX WARN: Type inference failed for: r0v41, types: [java.util.Collection] */
    public static OperatorSubtaskState repartitionOperatorState(OperatorSubtaskState operatorSubtaskState, int i, int i2, int i3, int i4) {
        List emptyList;
        List emptyList2;
        Preconditions.checkNotNull(operatorSubtaskState, "the previous operatorStateHandles should not be null.");
        KeyGroupRange keyGroupRange = (KeyGroupRange) StateAssignmentOperation.createKeyGroupPartitions(i, i3).get(i4);
        ArrayList arrayList = new ArrayList();
        StateAssignmentOperation.extractIntersectingState(operatorSubtaskState.getManagedKeyedState(), keyGroupRange, arrayList);
        ArrayList arrayList2 = new ArrayList();
        StateAssignmentOperation.extractIntersectingState(operatorSubtaskState.getRawKeyedState(), keyGroupRange, arrayList2);
        StateObjectCollection managedOperatorState = operatorSubtaskState.getManagedOperatorState();
        if (managedOperatorState.isEmpty()) {
            emptyList = Collections.emptyList();
        } else {
            emptyList = (Collection) operatorStateRepartitioner.repartitionState((List) managedOperatorState.stream().map((v0) -> {
                return Collections.singletonList(v0);
            }).collect(Collectors.toList()), i2, i3).get(i4);
        }
        StateObjectCollection rawOperatorState = operatorSubtaskState.getRawOperatorState();
        if (rawOperatorState.isEmpty()) {
            emptyList2 = Collections.emptyList();
        } else {
            emptyList2 = (Collection) operatorStateRepartitioner.repartitionState((List) rawOperatorState.stream().map((v0) -> {
                return Collections.singletonList(v0);
            }).collect(Collectors.toList()), i2, i3).get(i4);
        }
        return new OperatorSubtaskState(new StateObjectCollection(nullToEmptyCollection(emptyList)), new StateObjectCollection(nullToEmptyCollection(emptyList2)), new StateObjectCollection(nullToEmptyCollection(arrayList)), new StateObjectCollection(nullToEmptyCollection(arrayList2)));
    }

    public void initializeState(OperatorSubtaskState operatorSubtaskState, OperatorSubtaskState operatorSubtaskState2) throws Exception {
        Preconditions.checkState(!this.initializeCalled, "TestHarness has already been initialized. Have you opened this harness before initializing it?");
        if (!this.setupCalled) {
            setup();
        }
        if (operatorSubtaskState != null) {
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(this.operator.getOperatorID(), operatorSubtaskState);
            this.taskStateManager.setReportedCheckpointId(0L);
            this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, taskStateSnapshot));
            if (operatorSubtaskState2 != null) {
                TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
                taskStateSnapshot2.putSubtaskStateByOperatorID(this.operator.getOperatorID(), operatorSubtaskState2);
                this.taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L, taskStateSnapshot2));
            }
        }
        this.operator.initializeState(this.mockTask.createStreamTaskStateInitializer());
        this.initializeCalled = true;
    }

    private static <T> Collection<T> nullToEmptyCollection(Collection<T> collection) {
        return collection != null ? collection : Collections.emptyList();
    }

    public static OperatorSubtaskState repackageState(OperatorSubtaskState... operatorSubtaskStateArr) throws Exception {
        if (operatorSubtaskStateArr.length < 1) {
            return null;
        }
        if (operatorSubtaskStateArr.length == 1) {
            return operatorSubtaskStateArr[0];
        }
        ArrayList arrayList = new ArrayList(operatorSubtaskStateArr.length);
        ArrayList arrayList2 = new ArrayList(operatorSubtaskStateArr.length);
        ArrayList arrayList3 = new ArrayList(operatorSubtaskStateArr.length);
        ArrayList arrayList4 = new ArrayList(operatorSubtaskStateArr.length);
        for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStateArr) {
            StateObjectCollection managedOperatorState = operatorSubtaskState.getManagedOperatorState();
            StateObjectCollection rawOperatorState = operatorSubtaskState.getRawOperatorState();
            StateObjectCollection managedKeyedState = operatorSubtaskState.getManagedKeyedState();
            StateObjectCollection rawKeyedState = operatorSubtaskState.getRawKeyedState();
            arrayList.addAll(managedOperatorState);
            arrayList2.addAll(rawOperatorState);
            arrayList3.addAll(managedKeyedState);
            arrayList4.addAll(rawKeyedState);
        }
        return new OperatorSubtaskState(new StateObjectCollection(arrayList), new StateObjectCollection(arrayList2), new StateObjectCollection(arrayList3), new StateObjectCollection(arrayList4));
    }

    public void open() throws Exception {
        if (!this.initializeCalled) {
            initializeEmptyState();
        }
        this.operator.open();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.operator.prepareSnapshotPreBarrier(j);
    }

    public OperatorSubtaskState snapshot(long j, long j2) throws Exception {
        return snapshotWithLocalState(j, j2).getJobManagerOwnedState();
    }

    public OperatorSnapshotFinalizer snapshotWithLocalState(long j, long j2) throws Exception {
        return new OperatorSnapshotFinalizer(this.operator.snapshotState(j, j2, CheckpointOptions.forCheckpointWithDefaultLocation(), this.checkpointStorage.resolveCheckpointStorageLocation(j, CheckpointStorageLocationReference.getDefault())));
    }

    public void notifyOfCompletedCheckpoint(long j) throws Exception {
        this.operator.notifyCheckpointComplete(j);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.operator.close();
        this.operator.dispose();
        if (this.processingTimeService != null) {
            this.processingTimeService.shutdownService();
        }
        this.setupCalled = false;
        if (this.internalEnvironment.isPresent()) {
            this.internalEnvironment.get().close();
        }
        this.mockTask.cleanup();
    }

    public AbstractStreamOperator<OUT> getOperator() {
        return this.operator;
    }

    public StreamOperatorFactory<OUT> getOperatorFactory() {
        return this.factory;
    }

    public void setProcessingTime(long j) throws Exception {
        this.processingTimeService.setCurrentTime(j);
    }

    public void setStateTtlProcessingTime(long j) {
        this.ttlTimeProvider.setCurrentTimestamp(j);
    }

    public long getProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setTimeCharacteristic(timeCharacteristic);
    }

    public TimeCharacteristic getTimeCharacteristic() {
        return this.config.getTimeCharacteristic();
    }

    public boolean wasFailedExternally() {
        return this.wasFailedExternally;
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return this.operator.numProcessingTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        if (this.operator instanceof AbstractStreamOperator) {
            return this.operator.numEventTimeTimers();
        }
        throw new UnsupportedOperationException();
    }

    @VisibleForTesting
    public TestProcessingTimeService getProcessingTimeService() {
        return this.processingTimeService;
    }

    @VisibleForTesting
    public StreamStatus getStreamStatus() {
        return this.mockTask.getStreamStatusMaintainer().getStreamStatus();
    }

    @VisibleForTesting
    public TaskMailbox getTaskMailbox() {
        return this.taskMailbox;
    }
}
