package org.apache.beam.sdk.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ActiveWindowSet;
import org.apache.beam.sdk.util.ReduceFn;
import org.apache.beam.sdk.util.ReduceFnContextFactory;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WatermarkHold;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/util/ReduceFnRunner.class */
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final OutputWindowedValue<KV<K, OutputT>> outputter;
    private final StateInternals<K> stateInternals;
    private final Aggregator<Long, Long> droppedDueToClosedWindow;
    private final K key;
    private final ActiveWindowSet<W> activeWindows = createActiveWindowSet();
    private final ReduceFn<K, InputT, OutputT, W> reduceFn;
    private final TimerInternals timerInternals;
    private final TriggerRunner<W> triggerRunner;
    private final WatermarkHold<W> watermarkHold;
    private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
    private final PaneInfoTracker paneInfoTracker;
    private final NonEmptyPanes<K, W> nonEmptyPanes;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/util/ReduceFnRunner$OnMergeCallback.class */
    public class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
        private final Map<W, W> windowToMergeResult;

        OnMergeCallback(Map<W, W> map) {
            this.windowToMergeResult = map;
        }

        private List<W> activeWindows(Iterable<W> iterable) {
            ArrayList arrayList = new ArrayList();
            for (W w : iterable) {
                if (ReduceFnRunner.this.activeWindows.isActive(w)) {
                    arrayList.add(w);
                }
            }
            return arrayList;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void prefetchOnMerge(Collection<W> collection, W w) throws Exception {
            List<W> activeWindows = activeWindows(collection);
            ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge = ReduceFnRunner.this.contextFactory.forMerge(activeWindows, w, ReduceFnContextFactory.StateStyle.DIRECT);
            ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge2 = ReduceFnRunner.this.contextFactory.forMerge(activeWindows, w, ReduceFnContextFactory.StateStyle.RENAMED);
            ReduceFnRunner.this.triggerRunner.prefetchForMerge(w, activeWindows, forMerge.mo245state());
            ReduceFnRunner.this.reduceFn.prefetchOnMerge(forMerge2.mo245state());
            ReduceFnRunner.this.watermarkHold.prefetchOnMerge(forMerge2.mo245state());
            ReduceFnRunner.this.nonEmptyPanes.prefetchOnMerge(forMerge2.mo245state());
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void onMerge(Collection<W> collection, W w) throws Exception {
            Iterator<W> it = collection.iterator();
            while (it.hasNext()) {
                this.windowToMergeResult.put(it.next(), w);
            }
            List<W> activeWindows = activeWindows(collection);
            ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge = ReduceFnRunner.this.contextFactory.forMerge(activeWindows, w, ReduceFnContextFactory.StateStyle.DIRECT);
            ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge2 = ReduceFnRunner.this.contextFactory.forMerge(activeWindows, w, ReduceFnContextFactory.StateStyle.RENAMED);
            ReduceFnRunner.this.reduceFn.onMerge(forMerge2);
            ReduceFnRunner.this.watermarkHold.onMerge(forMerge2);
            ReduceFnRunner.this.nonEmptyPanes.onMerge(forMerge2.mo245state());
            ReduceFnRunner.this.triggerRunner.onMerge(forMerge.window(), forMerge.timers(), forMerge.mo245state());
            for (W w2 : activeWindows) {
                if (!w2.equals(w)) {
                    WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", new Object[]{w2, w});
                    ReduceFn<K, InputT, OutputT, W>.Context base = ReduceFnRunner.this.contextFactory.base(w2, ReduceFnContextFactory.StateStyle.DIRECT);
                    ReduceFnRunner.this.cancelEndOfWindowAndGarbageCollectionTimers(base);
                    ReduceFnRunner.this.paneInfoTracker.clear(base.mo245state());
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/util/ReduceFnRunner$OutputViaWindowingInternals.class */
    private static class OutputViaWindowingInternals<OutputT> implements OutputWindowedValue<OutputT> {
        private final WindowingInternals<?, OutputT> windowingInternals;

        public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
            this.windowingInternals = windowingInternals;
        }

        @Override // org.apache.beam.sdk.util.ReduceFnRunner.OutputWindowedValue
        public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.windowingInternals.outputWindowedValue(outputt, instant, collection, paneInfo);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/util/ReduceFnRunner$OutputWindowedValue.class */
    private interface OutputWindowedValue<OutputT> {
        void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo);
    }

    public ReduceFnRunner(K k, WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals, TimerInternals timerInternals, WindowingInternals<?, KV<K, OutputT>> windowingInternals, Aggregator<Long, Long> aggregator, ReduceFn<K, InputT, OutputT, W> reduceFn, PipelineOptions pipelineOptions) {
        this.key = k;
        this.timerInternals = timerInternals;
        this.paneInfoTracker = new PaneInfoTracker(timerInternals);
        this.stateInternals = stateInternals;
        this.outputter = new OutputViaWindowingInternals(windowingInternals);
        this.droppedDueToClosedWindow = aggregator;
        this.reduceFn = reduceFn;
        this.windowingStrategy = windowingStrategy;
        this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
        this.contextFactory = new ReduceFnContextFactory<>(k, reduceFn, this.windowingStrategy, stateInternals, this.activeWindows, timerInternals, windowingInternals, pipelineOptions);
        this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
        this.triggerRunner = new TriggerRunner<>(windowingStrategy.getTrigger(), new TriggerContextFactory(windowingStrategy.getWindowFn(), stateInternals, this.activeWindows));
    }

    private ActiveWindowSet<W> createActiveWindowSet() {
        return this.windowingStrategy.getWindowFn().isNonMerging() ? new NonMergingActiveWindowSet() : new MergingActiveWindowSet(this.windowingStrategy.getWindowFn(), this.stateInternals);
    }

    @VisibleForTesting
    boolean isFinished(W w) {
        return this.triggerRunner.isClosed(this.contextFactory.base(w, ReduceFnContextFactory.StateStyle.DIRECT).mo245state());
    }

    @VisibleForTesting
    boolean hasNoActiveWindows() {
        return this.activeWindows.getActiveAndNewWindows().isEmpty();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElements(Iterable<WindowedValue<InputT>> iterable) throws Exception {
        Map<W, W> collectAndMergeWindows = collectAndMergeWindows(iterable);
        HashSet<BoundedWindow> hashSet = new HashSet();
        Iterator<WindowedValue<InputT>> it = iterable.iterator();
        while (it.hasNext()) {
            hashSet.addAll(processElement(collectAndMergeWindows, it.next()));
        }
        for (BoundedWindow boundedWindow : hashSet) {
            ReduceFn<K, InputT, OutputT, W>.Context base = this.contextFactory.base(boundedWindow, ReduceFnContextFactory.StateStyle.DIRECT);
            ReduceFn<K, InputT, OutputT, W>.Context base2 = this.contextFactory.base(boundedWindow, ReduceFnContextFactory.StateStyle.RENAMED);
            this.triggerRunner.prefetchShouldFire(boundedWindow, base.mo245state());
            emitIfAppropriate(base, base2);
        }
        this.activeWindows.cleanupTemporaryWindows();
    }

    public void persist() {
        this.activeWindows.persist();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> iterable) throws Exception {
        if (this.windowingStrategy.getWindowFn().isNonMerging()) {
            return ImmutableMap.of();
        }
        Iterator<WindowedValue<InputT>> it = iterable.iterator();
        while (it.hasNext()) {
            for (BoundedWindow boundedWindow : it.next().getWindows()) {
                if (this.activeWindows.isActive(boundedWindow) && this.activeWindows.readStateAddresses(boundedWindow).size() > 1) {
                    ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge = this.contextFactory.forPremerge(boundedWindow);
                    this.reduceFn.onMerge(forPremerge);
                    this.watermarkHold.onMerge(forPremerge);
                    this.activeWindows.merged(boundedWindow);
                }
                this.activeWindows.ensureWindowExists(boundedWindow);
            }
        }
        HashMap hashMap = new HashMap();
        this.activeWindows.merge(new OnMergeCallback(hashMap));
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Collection<W> processElement(Map<W, W> map, WindowedValue<InputT> windowedValue) throws Exception {
        ArrayList<BoundedWindow> arrayList = new ArrayList();
        for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
            BoundedWindow boundedWindow2 = map.get(boundedWindow);
            if (boundedWindow2 == null) {
                boundedWindow2 = boundedWindow;
            }
            arrayList.add(boundedWindow2);
        }
        for (BoundedWindow boundedWindow3 : arrayList) {
            this.triggerRunner.prefetchForValue(boundedWindow3, this.contextFactory.forValue(boundedWindow3, windowedValue.getValue(), windowedValue.getTimestamp(), ReduceFnContextFactory.StateStyle.DIRECT).mo245state());
        }
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        for (BoundedWindow boundedWindow4 : arrayList) {
            ReduceFn.ProcessValueContext forValue = this.contextFactory.forValue(boundedWindow4, windowedValue.getValue(), windowedValue.getTimestamp(), ReduceFnContextFactory.StateStyle.DIRECT);
            if (this.triggerRunner.isClosed(forValue.mo245state())) {
                this.droppedDueToClosedWindow.addValue(1L);
                WindowTracing.debug("ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} since window is no longer active at inputWatermark:{}; outputWatermark:{}", new Object[]{windowedValue.getTimestamp(), this.key, boundedWindow4, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
            } else {
                arrayList2.add(boundedWindow4);
                this.activeWindows.ensureWindowIsActive(boundedWindow4);
                ReduceFn<?, ?, ?, W>.ProcessValueContext forValue2 = this.contextFactory.forValue(boundedWindow4, windowedValue.getValue(), windowedValue.getTimestamp(), ReduceFnContextFactory.StateStyle.RENAMED);
                this.nonEmptyPanes.recordContent(forValue2.mo245state());
                Instant scheduleEndOfWindowOrGarbageCollectionTimer = scheduleEndOfWindowOrGarbageCollectionTimer(forValue);
                Instant addHolds = this.watermarkHold.addHolds(forValue2);
                if (addHolds != null) {
                    Preconditions.checkState((!addHolds.isAfter(boundedWindow4.maxTimestamp())) == (!scheduleEndOfWindowOrGarbageCollectionTimer.isAfter(boundedWindow4.maxTimestamp())), "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s", addHolds, scheduleEndOfWindowOrGarbageCollectionTimer, forValue.window());
                }
                this.reduceFn.processValue(forValue2);
                this.triggerRunner.processValue(forValue.window(), forValue.timestamp(), forValue.timers(), forValue.mo245state());
            }
        }
        return arrayList2;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onTimer(TimerInternals.TimerData timerData) throws Exception {
        Preconditions.checkArgument(timerData.getNamespace() instanceof StateNamespaces.WindowNamespace, "Expected timer to be in WindowNamespace, but was in %s", timerData.getNamespace());
        BoundedWindow window = timerData.getNamespace().getWindow();
        ReduceFn.Context base = this.contextFactory.base(window, ReduceFnContextFactory.StateStyle.DIRECT);
        ReduceFn.Context base2 = this.contextFactory.base(window, ReduceFnContextFactory.StateStyle.RENAMED);
        boolean z = this.activeWindows.isActive(window) && !this.triggerRunner.isClosed(base.mo245state());
        if (!z) {
            WindowTracing.debug("ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", new Object[]{timerData, window});
        }
        boolean z2 = TimeDomain.EVENT_TIME == timerData.getDomain() && timerData.getTimestamp().equals(window.maxTimestamp());
        Instant garbageCollectionTime = garbageCollectionTime(window);
        if (TimeDomain.EVENT_TIME == timerData.getDomain() && !timerData.getTimestamp().isBefore(garbageCollectionTime)) {
            WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", new Object[]{this.key, window, timerData.getTimestamp(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
            if (z) {
                Instant onTrigger = onTrigger(base, base2, true, z2);
                Preconditions.checkState(onTrigger == null, "Hold placed at %s despite isFinished being true.", onTrigger);
            }
            clearAllState(base, base2, z);
            return;
        }
        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", new Object[]{this.key, window, timerData.getTimestamp(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        if (z) {
            emitIfAppropriate(base, base2);
        }
        if (z2) {
            Preconditions.checkState(this.windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), "Unexpected zero getAllowedLateness");
            WindowTracing.debug("ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", new Object[]{this.key, base.window(), garbageCollectionTime, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
            Preconditions.checkState(!garbageCollectionTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Cleanup time %s is beyond end-of-time", garbageCollectionTime);
            base.timers().setTimer(garbageCollectionTime, TimeDomain.EVENT_TIME);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v3, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    private void clearAllState(ReduceFn<K, InputT, OutputT, W>.Context context, ReduceFn<K, InputT, OutputT, W>.Context context2, boolean z) throws Exception {
        if (z) {
            this.reduceFn.clearState(context2);
            this.watermarkHold.clearHolds(context2);
            this.nonEmptyPanes.clearPane(context2.mo245state());
            this.triggerRunner.clearState(context.window(), context.timers(), context.mo245state());
            this.paneInfoTracker.clear(context.mo245state());
        } else if (this.windowingStrategy.getMode() == WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES && !this.windowingStrategy.getWindowFn().isNonMerging()) {
            this.watermarkHold.clearHolds(context);
        }
        this.activeWindows.remove((BoundedWindow) context.window());
        this.triggerRunner.clearFinished(context.mo245state());
    }

    private boolean shouldDiscardAfterFiring(boolean z) {
        return z || this.windowingStrategy.getMode() == WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v15, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context context, ReduceFn<K, InputT, OutputT, W>.Context context2) throws Exception {
        if (this.triggerRunner.shouldFire(context.window(), context.timers(), context.mo245state())) {
            this.triggerRunner.onFire(context.window(), context.timers(), context.mo245state());
            boolean isClosed = this.triggerRunner.isClosed(context.mo245state());
            boolean shouldDiscardAfterFiring = shouldDiscardAfterFiring(isClosed);
            onTrigger(context, context2, isClosed, false);
            this.nonEmptyPanes.clearPane(context2.mo245state());
            if (shouldDiscardAfterFiring) {
                this.reduceFn.clearState(context2);
            }
            if (isClosed) {
                this.triggerRunner.clearState(context.window(), context.timers(), context.mo245state());
                this.paneInfoTracker.clear(context.mo245state());
                this.activeWindows.remove((BoundedWindow) context.window());
            }
        }
    }

    private boolean needToEmit(boolean z, boolean z2, PaneInfo.Timing timing) {
        if (z && timing != PaneInfo.Timing.ON_TIME) {
            return z2 && this.windowingStrategy.getClosingBehavior() == Window.ClosingBehavior.FIRE_ALWAYS;
        }
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v19, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    /* JADX WARN: Type inference failed for: r1v22, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Nullable
    private Instant onTrigger(final ReduceFn<K, InputT, OutputT, W>.Context context, ReduceFn<K, InputT, OutputT, W>.Context context2, boolean z, boolean z2) throws Exception {
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        ReadableState readLater = this.watermarkHold.extractAndRelease(context2, z).readLater();
        ReadableState readLater2 = this.paneInfoTracker.getNextPaneInfo(context, z).readLater();
        ReadableState readLater3 = this.nonEmptyPanes.isEmpty(context2.mo245state()).readLater();
        this.reduceFn.prefetchOnTrigger(context.mo245state());
        this.triggerRunner.prefetchOnFire(context.window(), context.mo245state());
        final PaneInfo paneInfo = (PaneInfo) readLater2.read();
        WatermarkHold.OldAndNewHolds oldAndNewHolds = (WatermarkHold.OldAndNewHolds) readLater.read();
        final Instant instant = oldAndNewHolds.oldHold;
        Instant instant2 = oldAndNewHolds.newHold;
        if (instant2 != null) {
            Preconditions.checkState(!z, "new hold at %s but finished %s", instant2, context.window());
            Preconditions.checkState(!instant2.isBefore(currentInputWatermarkTime), "new hold %s is before input watermark %s", instant2, currentInputWatermarkTime);
            if (instant2.isAfter(context.window().maxTimestamp())) {
                Preconditions.checkState(instant2.isEqual(garbageCollectionTime(context.window())), "new hold %s should be at garbage collection for window %s plus %s", instant2, context.window(), this.windowingStrategy.getAllowedLateness());
            } else {
                Preconditions.checkState(instant2.isEqual(context.window().maxTimestamp()), "new hold %s should be at end of window %s", instant2, context.window());
                Preconditions.checkState(!z2, "new hold at %s for %s but this is the watermark trigger", instant2, context.window());
            }
        }
        if (needToEmit(((Boolean) readLater3.read()).booleanValue(), z, paneInfo.getTiming())) {
            final List singletonList = Collections.singletonList(context.window());
            this.reduceFn.onTrigger(this.contextFactory.forTrigger(context.window(), readLater2, ReduceFnContextFactory.StateStyle.RENAMED, new ReduceFnContextFactory.OnTriggerCallbacks<OutputT>() { // from class: org.apache.beam.sdk.util.ReduceFnRunner.1
                @Override // org.apache.beam.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks
                public void output(OutputT outputt) {
                    ReduceFnRunner.this.paneInfoTracker.storeCurrentPaneInfo(context, paneInfo);
                    ReduceFnRunner.this.outputter.outputWindowedValue(KV.of(ReduceFnRunner.this.key, outputt), instant, singletonList, paneInfo);
                }
            }));
        }
        return instant2;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    private Instant scheduleEndOfWindowOrGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context context) {
        Instant instant;
        Object obj;
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        Instant maxTimestamp = context.window().maxTimestamp();
        if (maxTimestamp.isBefore(currentInputWatermarkTime)) {
            instant = garbageCollectionTime(context.window());
            obj = "garbage collection";
        } else {
            instant = maxTimestamp;
            obj = "end-of-window";
        }
        WindowTracing.trace("ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", new Object[]{obj, instant, this.key, context.window(), currentInputWatermarkTime, this.timerInternals.currentOutputWatermarkTime()});
        Preconditions.checkState(!instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Timer %s is beyond end-of-time", instant);
        context.timers().setTimer(instant, TimeDomain.EVENT_TIME);
        return instant;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    public void cancelEndOfWindowAndGarbageCollectionTimers(ReduceFn<?, ?, ?, W>.Context context) {
        WindowTracing.debug("ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", new Object[]{this.key, context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        Instant maxTimestamp = context.window().maxTimestamp();
        context.timers().deleteTimer(maxTimestamp, TimeDomain.EVENT_TIME);
        if (garbageCollectionTime(context.window()).isAfter(maxTimestamp)) {
            context.timers().deleteTimer(maxTimestamp, TimeDomain.EVENT_TIME);
        }
    }

    private Instant garbageCollectionTime(W w) {
        return GlobalWindow.INSTANCE.maxTimestamp().minus(this.windowingStrategy.getAllowedLateness()).isBefore(w.maxTimestamp()) ? GlobalWindow.INSTANCE.maxTimestamp() : w.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness());
    }
}
