package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/HeapInternalTimerService.class */
public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    private final ProcessingTimeService processingTimeService;
    private final KeyContext keyContext;
    private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
    private final KeyGroupsList localKeyGroupRange;
    private final int totalKeyGroups;
    private final int localKeyGroupRangeStartIdx;
    private long currentWatermark = Long.MIN_VALUE;
    private ScheduledFuture<?> nextTimer;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<N> namespaceSerializer;
    private InternalTimer.TimerSerializer<K, N> timerSerializer;
    private Triggerable<K, N> triggerTarget;
    private volatile boolean isInitialized;
    private TypeSerializer<K> keyDeserializer;
    private TypeSerializer<N> namespaceDeserializer;

    public HeapInternalTimerService(int i, KeyGroupsList keyGroupsList, KeyContext keyContext, ProcessingTimeService processingTimeService) {
        this.keyContext = (KeyContext) Preconditions.checkNotNull(keyContext);
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.totalKeyGroups = i;
        this.localKeyGroupRange = (KeyGroupsList) Preconditions.checkNotNull(keyGroupsList);
        int i2 = Integer.MAX_VALUE;
        Iterator it = keyGroupsList.iterator();
        while (it.hasNext()) {
            i2 = Math.min(((Integer) it.next()).intValue(), i2);
        }
        this.localKeyGroupRangeStartIdx = i2;
        int numberOfKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups();
        this.eventTimeTimersQueue = new PriorityQueue<>(100);
        this.eventTimeTimersByKeyGroup = new HashSet[numberOfKeyGroups];
        this.processingTimeTimersQueue = new PriorityQueue<>(100);
        this.processingTimeTimersByKeyGroup = new HashSet[numberOfKeyGroups];
    }

    public void startTimerService(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<K, N> triggerable) {
        if (this.isInitialized) {
            if (!this.keySerializer.equals(typeSerializer) || !this.namespaceSerializer.equals(typeSerializer2)) {
                throw new IllegalArgumentException("Already initialized Timer Service tried to be initialized with different key and namespace serializers.");
            }
            return;
        }
        if (typeSerializer == null || typeSerializer2 == null) {
            throw new IllegalArgumentException("The TimersService serializers cannot be null.");
        }
        if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
            throw new IllegalStateException("The TimerService has already been initialized.");
        }
        if ((this.keyDeserializer != null && !this.keyDeserializer.equals(typeSerializer)) || (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(typeSerializer2))) {
            throw new IllegalStateException("Tried to initialize restored TimerService with different serializers than those used to snapshot its state.");
        }
        this.keySerializer = typeSerializer;
        this.namespaceSerializer = typeSerializer2;
        this.keyDeserializer = null;
        this.namespaceDeserializer = null;
        this.triggerTarget = (Triggerable) Preconditions.checkNotNull(triggerable);
        this.timerSerializer = new InternalTimer.TimerSerializer<>(this.keySerializer, this.namespaceSerializer);
        if (this.processingTimeTimersQueue.size() > 0) {
            this.nextTimer = this.processingTimeService.registerTimer(this.processingTimeTimersQueue.peek().getTimestamp(), this);
        }
        this.isInitialized = true;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public long currentWatermark() {
        return this.currentWatermark;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerProcessingTimeTimer(N n, long j) {
        InternalTimer<K, N> internalTimer = new InternalTimer<>(j, this.keyContext.getCurrentKey(), n);
        if (getProcessingTimeTimerSetForTimer(internalTimer).add(internalTimer)) {
            InternalTimer<K, N> peek = this.processingTimeTimersQueue.peek();
            long timestamp = peek != null ? peek.getTimestamp() : Long.MAX_VALUE;
            this.processingTimeTimersQueue.add(internalTimer);
            if (j < timestamp) {
                if (this.nextTimer != null) {
                    this.nextTimer.cancel(false);
                }
                this.nextTimer = this.processingTimeService.registerTimer(j, this);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void registerEventTimeTimer(N n, long j) {
        InternalTimer<K, N> internalTimer = new InternalTimer<>(j, this.keyContext.getCurrentKey(), n);
        if (getEventTimeTimerSetForTimer(internalTimer).add(internalTimer)) {
            this.eventTimeTimersQueue.add(internalTimer);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteProcessingTimeTimer(N n, long j) {
        InternalTimer<K, N> internalTimer = new InternalTimer<>(j, this.keyContext.getCurrentKey(), n);
        if (getProcessingTimeTimerSetForTimer(internalTimer).remove(internalTimer)) {
            this.processingTimeTimersQueue.remove(internalTimer);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerService
    public void deleteEventTimeTimer(N n, long j) {
        InternalTimer<K, N> internalTimer = new InternalTimer<>(j, this.keyContext.getCurrentKey(), n);
        if (getEventTimeTimerSetForTimer(internalTimer).remove(internalTimer)) {
            this.eventTimeTimersQueue.remove(internalTimer);
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        InternalTimer<K, N> peek;
        this.nextTimer = null;
        while (true) {
            peek = this.processingTimeTimersQueue.peek();
            if (peek == null || peek.getTimestamp() > j) {
                break;
            }
            getProcessingTimeTimerSetForTimer(peek).remove(peek);
            this.processingTimeTimersQueue.remove();
            this.keyContext.setCurrentKey(peek.getKey());
            this.triggerTarget.onProcessingTime(peek);
        }
        if (peek == null || this.nextTimer != null) {
            return;
        }
        this.nextTimer = this.processingTimeService.registerTimer(peek.getTimestamp(), this);
    }

    public void advanceWatermark(long j) throws Exception {
        this.currentWatermark = j;
        while (true) {
            InternalTimer<K, N> peek = this.eventTimeTimersQueue.peek();
            if (peek == null || peek.getTimestamp() > j) {
                return;
            }
            getEventTimeTimerSetForTimer(peek).remove(peek);
            this.eventTimeTimersQueue.remove();
            this.keyContext.setCurrentKey(peek.getKey());
            this.triggerTarget.onEventTime(peek);
        }
    }

    public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper dataOutputViewStreamWrapper, int i) throws Exception {
        InstantiationUtil.serializeObject(dataOutputViewStreamWrapper, this.keySerializer);
        InstantiationUtil.serializeObject(dataOutputViewStreamWrapper, this.namespaceSerializer);
        Set<InternalTimer<K, N>> eventTimeTimerSetForKeyGroup = getEventTimeTimerSetForKeyGroup(i);
        if (eventTimeTimerSetForKeyGroup != null) {
            dataOutputViewStreamWrapper.writeInt(eventTimeTimerSetForKeyGroup.size());
            Iterator<InternalTimer<K, N>> it = eventTimeTimerSetForKeyGroup.iterator();
            while (it.hasNext()) {
                this.timerSerializer.serialize((InternalTimer) it.next(), (DataOutputView) dataOutputViewStreamWrapper);
            }
        } else {
            dataOutputViewStreamWrapper.writeInt(0);
        }
        Set<InternalTimer<K, N>> processingTimeTimerSetForKeyGroup = getProcessingTimeTimerSetForKeyGroup(i);
        if (processingTimeTimerSetForKeyGroup == null) {
            dataOutputViewStreamWrapper.writeInt(0);
            return;
        }
        dataOutputViewStreamWrapper.writeInt(processingTimeTimerSetForKeyGroup.size());
        Iterator<InternalTimer<K, N>> it2 = processingTimeTimerSetForKeyGroup.iterator();
        while (it2.hasNext()) {
            this.timerSerializer.serialize((InternalTimer) it2.next(), (DataOutputView) dataOutputViewStreamWrapper);
        }
    }

    public void restoreTimersForKeyGroup(DataInputViewStreamWrapper dataInputViewStreamWrapper, int i, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        TypeSerializer<K> typeSerializer = (TypeSerializer) InstantiationUtil.deserializeObject(dataInputViewStreamWrapper, classLoader);
        TypeSerializer<N> typeSerializer2 = (TypeSerializer) InstantiationUtil.deserializeObject(dataInputViewStreamWrapper, classLoader);
        if ((this.keyDeserializer != null && !this.keyDeserializer.equals(typeSerializer)) || (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(typeSerializer2))) {
            throw new IllegalArgumentException("Tried to restore timers for the same service with different serializers.");
        }
        this.keyDeserializer = typeSerializer;
        this.namespaceDeserializer = typeSerializer2;
        InternalTimer.TimerSerializer timerSerializer = new InternalTimer.TimerSerializer(this.keyDeserializer, this.namespaceDeserializer);
        Preconditions.checkArgument(this.localKeyGroupRange.contains(i), "Key Group " + i + " does not belong to the local range.");
        int readInt = dataInputViewStreamWrapper.readInt();
        if (readInt > 0) {
            Set<InternalTimer<K, N>> eventTimeTimerSetForKeyGroup = getEventTimeTimerSetForKeyGroup(i);
            for (int i2 = 0; i2 < readInt; i2++) {
                InternalTimer<K, N> m40deserialize = timerSerializer.m40deserialize((DataInputView) dataInputViewStreamWrapper);
                eventTimeTimerSetForKeyGroup.add(m40deserialize);
                this.eventTimeTimersQueue.add(m40deserialize);
            }
        }
        int readInt2 = dataInputViewStreamWrapper.readInt();
        if (readInt2 > 0) {
            Set<InternalTimer<K, N>> processingTimeTimerSetForKeyGroup = getProcessingTimeTimerSetForKeyGroup(i);
            for (int i3 = 0; i3 < readInt2; i3++) {
                InternalTimer<K, N> m40deserialize2 = timerSerializer.m40deserialize((DataInputView) dataInputViewStreamWrapper);
                processingTimeTimerSetForKeyGroup.add(m40deserialize2);
                this.processingTimeTimersQueue.add(m40deserialize2);
            }
        }
    }

    private Set<InternalTimer<K, N>> getEventTimeTimerSetForTimer(InternalTimer<K, N> internalTimer) {
        Preconditions.checkArgument(this.localKeyGroupRange != null, "The operator has not been initialized.");
        return getEventTimeTimerSetForKeyGroup(KeyGroupRangeAssignment.assignToKeyGroup(internalTimer.getKey(), this.totalKeyGroups));
    }

    private Set<InternalTimer<K, N>> getEventTimeTimerSetForKeyGroup(int i) {
        int indexForKeyGroup = getIndexForKeyGroup(i);
        Set<InternalTimer<K, N>> set = this.eventTimeTimersByKeyGroup[indexForKeyGroup];
        if (set == null) {
            set = new HashSet();
            this.eventTimeTimersByKeyGroup[indexForKeyGroup] = set;
        }
        return set;
    }

    private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForTimer(InternalTimer<K, N> internalTimer) {
        Preconditions.checkArgument(this.localKeyGroupRange != null, "The operator has not been initialized.");
        return getProcessingTimeTimerSetForKeyGroup(KeyGroupRangeAssignment.assignToKeyGroup(internalTimer.getKey(), this.totalKeyGroups));
    }

    private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForKeyGroup(int i) {
        int indexForKeyGroup = getIndexForKeyGroup(i);
        Set<InternalTimer<K, N>> set = this.processingTimeTimersByKeyGroup[indexForKeyGroup];
        if (set == null) {
            set = new HashSet();
            this.processingTimeTimersByKeyGroup[indexForKeyGroup] = set;
        }
        return set;
    }

    private int getIndexForKeyGroup(int i) {
        Preconditions.checkArgument(this.localKeyGroupRange.contains(i), "Key Group " + i + " does not belong to the local range.");
        return i - this.localKeyGroupRangeStartIdx;
    }

    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }

    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }

    public int numProcessingTimeTimers(N n) {
        int i = 0;
        Iterator<InternalTimer<K, N>> it = this.processingTimeTimersQueue.iterator();
        while (it.hasNext()) {
            if (it.next().getNamespace().equals(n)) {
                i++;
            }
        }
        return i;
    }

    public int numEventTimeTimers(N n) {
        int i = 0;
        Iterator<InternalTimer<K, N>> it = this.eventTimeTimersQueue.iterator();
        while (it.hasNext()) {
            if (it.next().getNamespace().equals(n)) {
                i++;
            }
        }
        return i;
    }

    @VisibleForTesting
    public int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }

    @VisibleForTesting
    public Set<InternalTimer<K, N>>[] getEventTimeTimersPerKeyGroup() {
        return this.eventTimeTimersByKeyGroup;
    }

    @VisibleForTesting
    public Set<InternalTimer<K, N>>[] getProcessingTimeTimersPerKeyGroup() {
        return this.processingTimeTimersByKeyGroup;
    }
}
