package org.apache.beam.runners.direct.repackaged.runners.core;

import java.io.Serializable;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/repackaged/runners/core/WatermarkHold.class */
public class WatermarkHold<W extends BoundedWindow> implements Serializable {

    @VisibleForTesting
    public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("extra", TimestampCombiner.EARLIEST));
    private final TimerInternals timerInternals;
    private final WindowingStrategy<?, W> windowingStrategy;
    private final StateTag<WatermarkHoldState> elementHoldTag;

    /* loaded from: input_file:org/apache/beam/runners/direct/repackaged/runners/core/WatermarkHold$OldAndNewHolds.class */
    public static class OldAndNewHolds {
        public final Instant oldHold;

        @Nullable
        public final Instant newHold;

        public OldAndNewHolds(Instant instant, @Nullable Instant instant2) {
            this.oldHold = instant;
            this.newHold = instant2;
        }
    }

    public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(TimestampCombiner timestampCombiner) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", timestampCombiner));
    }

    public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
        this.timerInternals = timerInternals;
        this.windowingStrategy = windowingStrategy;
        this.elementHoldTag = watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
    }

    @Nullable
    public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext processValueContext) {
        Instant addElementHold = addElementHold(processValueContext);
        if (addElementHold == null) {
            addElementHold = addEndOfWindowOrGarbageCollectionHolds(processValueContext, false);
        }
        return addElementHold;
    }

    private Instant shift(Instant instant, W w) {
        Instant assign = this.windowingStrategy.getTimestampCombiner().assign(w, this.windowingStrategy.getWindowFn().getOutputTime(instant, w));
        Preconditions.checkState(!assign.isBefore(instant), "TimestampCombiner moved element from %s to earlier time %s for window %s", BoundedWindow.formatTimestamp(instant), BoundedWindow.formatTimestamp(assign), w);
        Preconditions.checkState(instant.isAfter(w.maxTimestamp()) || !assign.isAfter(w.maxTimestamp()), "TimestampCombiner moved element from %s to %s which is beyond end of window %s", instant, assign, w);
        return assign;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Nullable
    private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext processValueContext) {
        Object obj;
        boolean z;
        Instant shift = shift(processValueContext.timestamp(), processValueContext.window());
        Instant currentOutputWatermarkTime = this.timerInternals.currentOutputWatermarkTime();
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        if (currentOutputWatermarkTime != null && shift.isBefore(currentOutputWatermarkTime)) {
            obj = "too late to effect output watermark";
            z = true;
        } else if (processValueContext.window().maxTimestamp().isBefore(currentInputWatermarkTime)) {
            obj = "too late for end-of-window timer";
            z = true;
        } else {
            obj = "on time";
            z = false;
            Preconditions.checkState(!shift.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Element hold %s is beyond end-of-time", shift);
            processValueContext.state().access(this.elementHoldTag).add(shift);
        }
        WindowTracing.trace("WatermarkHold.addHolds: element hold at {} is {} for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{shift, obj, processValueContext.key(), processValueContext.window(), currentInputWatermarkTime, currentOutputWatermarkTime});
        if (z) {
            return null;
        }
        return shift;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public Instant addEndOfWindowOrGarbageCollectionHolds(ReduceFn<?, ?, ?, W>.Context context, boolean z) {
        Instant addEndOfWindowHold = addEndOfWindowHold(context, z);
        if (addEndOfWindowHold == null) {
            addEndOfWindowHold = addGarbageCollectionHold(context, z);
        }
        return addEndOfWindowHold;
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Nullable
    private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean z) {
        Instant currentOutputWatermarkTime = this.timerInternals.currentOutputWatermarkTime();
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        Instant maxTimestamp = context.window().maxTimestamp();
        if (maxTimestamp.isBefore(currentInputWatermarkTime)) {
            WindowTracing.trace("WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{maxTimestamp, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime});
            return null;
        }
        Preconditions.checkState(currentOutputWatermarkTime == null || !maxTimestamp.isBefore(currentOutputWatermarkTime), "End-of-window hold %s cannot be before output watermark %s", maxTimestamp, currentOutputWatermarkTime);
        Preconditions.checkState(!maxTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "End-of-window hold %s is beyond end-of-time", maxTimestamp);
        context.state().access(EXTRA_HOLD_TAG).add(maxTimestamp);
        WindowTracing.trace("WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{maxTimestamp, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime});
        return maxTimestamp;
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    @Nullable
    private Instant addGarbageCollectionHold(ReduceFn<?, ?, ?, W>.Context context, boolean z) {
        Instant currentOutputWatermarkTime = this.timerInternals.currentOutputWatermarkTime();
        Instant currentInputWatermarkTime = this.timerInternals.currentInputWatermarkTime();
        Instant garbageCollectionTime = LateDataUtils.garbageCollectionTime((BoundedWindow) context.window(), this.windowingStrategy);
        if (!this.windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
            WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since no allowed lateness for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{garbageCollectionTime, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime});
            return null;
        }
        if (z && context.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_IF_NON_EMPTY) {
            WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{garbageCollectionTime, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime});
            return null;
        }
        if (!garbageCollectionTime.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            garbageCollectionTime = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
        }
        Preconditions.checkState(!garbageCollectionTime.isBefore(currentInputWatermarkTime), "Garbage collection hold %s cannot be before input watermark %s", garbageCollectionTime, currentInputWatermarkTime);
        Preconditions.checkState(!garbageCollectionTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Garbage collection hold %s is beyond end-of-time", garbageCollectionTime);
        context.state().access(EXTRA_HOLD_TAG).add(garbageCollectionTime);
        WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{garbageCollectionTime, context.key(), context.window(), currentInputWatermarkTime, currentOutputWatermarkTime});
        return garbageCollectionTime;
    }

    public void prefetchOnMerge(MergingStateAccessor<?, W> mergingStateAccessor) {
        StateMerging.prefetchWatermarks(mergingStateAccessor, this.elementHoldTag);
    }

    /* JADX WARN: Type inference failed for: r2v5, types: [org.apache.beam.sdk.transforms.windowing.BoundedWindow] */
    public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext onMergeContext) {
        WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{onMergeContext.key(), onMergeContext.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        StateMerging.mergeWatermarks((MergingStateAccessor<K, ??>) onMergeContext.state(), this.elementHoldTag, onMergeContext.window());
        StateMerging.clear(onMergeContext.state(), EXTRA_HOLD_TAG);
        addEndOfWindowOrGarbageCollectionHolds(onMergeContext, false);
    }

    public void prefetchExtract(ReduceFn<?, ?, ?, W>.Context context) {
        context.state().access(this.elementHoldTag).readLater();
        context.state().access(EXTRA_HOLD_TAG).readLater();
    }

    public ReadableState<OldAndNewHolds> extractAndRelease(final ReduceFn<?, ?, ?, W>.Context context, final boolean z) {
        WindowTracing.debug("WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        final WatermarkHoldState access = context.state().access(this.elementHoldTag);
        final WatermarkHoldState access2 = context.state().access(EXTRA_HOLD_TAG);
        return new ReadableState<OldAndNewHolds>() { // from class: org.apache.beam.runners.direct.repackaged.runners.core.WatermarkHold.1
            public ReadableState<OldAndNewHolds> readLater() {
                access.readLater();
                access2.readLater();
                return this;
            }

            /* renamed from: read, reason: merged with bridge method [inline-methods] */
            public OldAndNewHolds m451read() {
                Instant instant = (Instant) access.read();
                Instant instant2 = (Instant) access2.read();
                Instant instant3 = instant == null ? instant2 : instant2 == null ? instant : instant.isBefore(instant2) ? instant : instant2;
                if (instant3 == null || instant3.isAfter(context.window().maxTimestamp())) {
                    WindowTracing.debug("WatermarkHold.extractAndRelease.read: clipping from {} to end of window for key:{}; window:{}", new Object[]{instant3, context.key(), context.window()});
                    instant3 = context.window().maxTimestamp();
                }
                WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", new Object[]{context.key(), context.window()});
                access.clear();
                access2.clear();
                Instant instant4 = null;
                if (!z) {
                    instant4 = WatermarkHold.this.addEndOfWindowOrGarbageCollectionHolds(context, true);
                }
                return new OldAndNewHolds(instant3, instant4);
            }
        };
    }

    public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
        WindowTracing.debug("WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", new Object[]{context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
        context.state().access(this.elementHoldTag).clear();
        context.state().access(EXTRA_HOLD_TAG).clear();
    }

    @Nullable
    public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
        return (Instant) context.state().access(this.elementHoldTag).read();
    }
}
