package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.base.Ascii;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.base.Joiner;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.class */
public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<WindowedValue<OutputT>> implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>, KeyGroupCheckpointedOperator, Triggerable<Object, TimerInternals.TimerData> {
    protected DoFn<InputT, OutputT> doFn;
    protected final SerializablePipelineOptions serializedOptions;
    protected final TupleTag<OutputT> mainOutputTag;
    protected final List<TupleTag<?>> additionalOutputTags;
    protected final Collection<PCollectionView<?>> sideInputs;
    protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
    protected final WindowingStrategy<?, ?> windowingStrategy;
    protected final OutputManagerFactory<OutputT> outputManagerFactory;
    protected transient DoFnRunner<InputT, OutputT> doFnRunner;
    protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
    protected transient SideInputHandler sideInputHandler;
    protected transient SideInputReader sideInputReader;
    protected transient BufferedOutputManager<OutputT> outputManager;
    private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
    protected transient long currentInputWatermark;
    protected transient long currentSideInputWatermark;
    protected transient long currentOutputWatermark;
    private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
    protected transient FlinkStateInternals<?> keyedStateInternals;
    private final String stepName;
    private final Coder<WindowedValue<InputT>> inputCoder;
    private final Coder<?> keyCoder;
    private final TimerInternals.TimerDataCoder timerCoder;
    private final long maxBundleSize;
    private final long maxBundleTimeMills;
    protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
    protected transient DoFnOperator<InputT, OutputT>.FlinkTimerInternals timerInternals;
    private transient StateInternals nonKeyedStateInternals;
    private transient Optional<Long> pushedBackWatermark;
    private transient boolean bundleStarted = false;
    private transient long elementCount;
    private transient long lastFinishBundleTime;
    private transient ScheduledFuture<?> checkFinishBundleTimer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$state$TimeDomain = new int[TimeDomain.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.EVENT_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.PROCESSING_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.SYNCHRONIZED_PROCESSING_TIME.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$BufferedOutputManager.class */
    public static class BufferedOutputManager<OutputT> implements DoFnRunners.OutputManager {
        private TupleTag<OutputT> mainTag;
        private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
        private Map<TupleTag<?>, Integer> tagsToIds;
        protected Output<StreamRecord<WindowedValue<OutputT>>> output;
        private BagState<KV<Integer, WindowedValue<?>>> bufferState;
        private boolean openBuffer = false;
        private Map<Integer, TupleTag<?>> idsToTags = new HashMap();

        BufferedOutputManager(Output<StreamRecord<WindowedValue<OutputT>>> output, TupleTag<OutputT> tupleTag, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> map, Map<TupleTag<?>, Coder<WindowedValue<?>>> map2, Map<TupleTag<?>, Integer> map3, StateInternals stateInternals) {
            this.output = output;
            this.mainTag = tupleTag;
            this.tagsToOutputTags = map;
            this.tagsToIds = map3;
            for (Map.Entry<TupleTag<?>, Integer> entry : map3.entrySet()) {
                this.idsToTags.put(entry.getValue(), entry.getKey());
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry<TupleTag<?>, Integer> entry2 : map3.entrySet()) {
                builder.put(entry2.getValue(), map2.get(entry2.getKey()));
            }
            this.bufferState = stateInternals.state(StateNamespaces.global(), StateTags.bag("bundle-buffer-tag", new TaggedKvCoder(builder.build())));
        }

        void openBuffer() {
            this.openBuffer = true;
        }

        void closeBuffer() {
            this.openBuffer = false;
        }

        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (this.openBuffer) {
                this.bufferState.add(KV.of(this.tagsToIds.get(tupleTag), windowedValue));
            } else {
                emit(tupleTag, windowedValue);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        void flushBuffer() {
            for (KV kv : this.bufferState.read()) {
                emit(this.idsToTags.get(kv.getKey()), (WindowedValue) kv.getValue());
            }
            this.bufferState.clear();
        }

        private <T> void emit(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (tupleTag.equals(this.mainTag)) {
                this.output.collect(new StreamRecord(windowedValue));
            } else {
                this.output.collect(this.tagsToOutputTags.get(tupleTag), new StreamRecord(windowedValue));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$FlinkStepContext.class */
    protected class FlinkStepContext implements StepContext {
        protected FlinkStepContext() {
        }

        public StateInternals stateInternals() {
            return DoFnOperator.this.keyedStateInternals;
        }

        public TimerInternals timerInternals() {
            return DoFnOperator.this.timerInternals;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$FlinkTimerInternals.class */
    public class FlinkTimerInternals implements TimerInternals {
        private FlinkTimerInternals() {
        }

        public void setTimer(StateNamespace stateNamespace, String str, Instant instant, TimeDomain timeDomain) {
            setTimer(TimerInternals.TimerData.of(str, stateNamespace, instant, timeDomain));
        }

        @Deprecated
        public void setTimer(TimerInternals.TimerData timerData) {
            long millis = timerData.getTimestamp().getMillis();
            switch (AnonymousClass1.$SwitchMap$org$apache$beam$sdk$state$TimeDomain[timerData.getDomain().ordinal()]) {
                case Ascii.SOH /* 1 */:
                    DoFnOperator.this.timerService.registerEventTimeTimer(timerData, millis);
                    return;
                case 2:
                case Ascii.ETX /* 3 */:
                    DoFnOperator.this.timerService.registerProcessingTimeTimer(timerData, millis);
                    return;
                default:
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
            }
        }

        @Deprecated
        public void deleteTimer(StateNamespace stateNamespace, String str) {
            throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
        }

        public void deleteTimer(StateNamespace stateNamespace, String str, TimeDomain timeDomain) {
            throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
        }

        @Deprecated
        public void deleteTimer(TimerInternals.TimerData timerData) {
            long millis = timerData.getTimestamp().getMillis();
            switch (AnonymousClass1.$SwitchMap$org$apache$beam$sdk$state$TimeDomain[timerData.getDomain().ordinal()]) {
                case Ascii.SOH /* 1 */:
                    DoFnOperator.this.timerService.deleteEventTimeTimer(timerData, millis);
                    return;
                case 2:
                case Ascii.ETX /* 3 */:
                    DoFnOperator.this.timerService.deleteProcessingTimeTimer(timerData, millis);
                    return;
                default:
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
            }
        }

        public Instant currentProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        @Nullable
        public Instant currentSynchronizedProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        public Instant currentInputWatermarkTime() {
            return new Instant(Math.min(DoFnOperator.this.currentInputWatermark, DoFnOperator.this.getPushbackWatermarkHold()));
        }

        @Nullable
        public Instant currentOutputWatermarkTime() {
            return new Instant(DoFnOperator.this.currentOutputWatermark);
        }

        /* synthetic */ FlinkTimerInternals(DoFnOperator doFnOperator, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$MultiOutputOutputManagerFactory.class */
    public static class MultiOutputOutputManagerFactory<OutputT> implements OutputManagerFactory<OutputT> {
        private TupleTag<OutputT> mainTag;
        private Map<TupleTag<?>, Integer> tagsToIds;
        private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
        private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;

        public MultiOutputOutputManagerFactory(TupleTag<OutputT> tupleTag, Coder<WindowedValue<OutputT>> coder) {
            this(tupleTag, new HashMap(), ImmutableMap.builder().put(tupleTag, coder).build(), ImmutableMap.builder().put(tupleTag, 0).build());
        }

        public MultiOutputOutputManagerFactory(TupleTag<OutputT> tupleTag, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> map, Map<TupleTag<?>, Coder<WindowedValue<?>>> map2, Map<TupleTag<?>, Integer> map3) {
            this.mainTag = tupleTag;
            this.tagsToOutputTags = map;
            this.tagsToCoders = map2;
            this.tagsToIds = map3;
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory
        public BufferedOutputManager<OutputT> create(Output<StreamRecord<WindowedValue<OutputT>>> output, StateInternals stateInternals) {
            return new BufferedOutputManager<>(output, this.mainTag, this.tagsToOutputTags, this.tagsToCoders, this.tagsToIds, stateInternals);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$OutputManagerFactory.class */
    public interface OutputManagerFactory<OutputT> extends Serializable {
        BufferedOutputManager<OutputT> create(Output<StreamRecord<WindowedValue<OutputT>>> output, StateInternals stateInternals);
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$TaggedKvCoder.class */
    private static class TaggedKvCoder extends StructuredCoder<KV<Integer, WindowedValue<?>>> {
        private Map<Integer, Coder<WindowedValue<?>>> idsToCoders;

        TaggedKvCoder(Map<Integer, Coder<WindowedValue<?>>> map) {
            this.idsToCoders = map;
        }

        public void encode(KV<Integer, WindowedValue<?>> kv, OutputStream outputStream) throws IOException {
            Coder<WindowedValue<?>> coder = this.idsToCoders.get(kv.getKey());
            VarIntCoder.of().encode((Integer) kv.getKey(), outputStream);
            coder.encode(kv.getValue(), outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public KV<Integer, WindowedValue<?>> m271decode(InputStream inputStream) throws IOException {
            Integer decode = VarIntCoder.of().decode(inputStream);
            return KV.of(decode, (WindowedValue) this.idsToCoders.get(decode).decode(inputStream));
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return new ArrayList(this.idsToCoders.values());
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
            Iterator<Coder<WindowedValue<?>>> it = this.idsToCoders.values().iterator();
            while (it.hasNext()) {
                verifyDeterministic(this, "Coder must be deterministic", new Coder[]{it.next()});
            }
        }
    }

    public DoFnOperator(DoFn<InputT, OutputT> doFn, String str, Coder<WindowedValue<InputT>> coder, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions, Coder<?> coder2) {
        this.doFn = doFn;
        this.stepName = str;
        this.inputCoder = coder;
        this.mainOutputTag = tupleTag;
        this.additionalOutputTags = list;
        this.sideInputTagMapping = map;
        this.sideInputs = collection;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        setChainingStrategy(ChainingStrategy.ALWAYS);
        this.keyCoder = coder2;
        this.timerCoder = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) pipelineOptions.as(FlinkPipelineOptions.class);
        this.maxBundleSize = flinkPipelineOptions.getMaxBundleSize().longValue();
        this.maxBundleTimeMills = flinkPipelineOptions.getMaxBundleTimeMills().longValue();
    }

    protected DoFn<InputT, OutputT> getDoFn() {
        return this.doFn;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<WindowedValue<OutputT>>> output) {
        FileSystems.setDefaultPipelineOptions((FlinkPipelineOptions) this.serializedOptions.get().as(FlinkPipelineOptions.class));
        super.setup(streamTask, streamConfig, output);
    }

    public void open() throws Exception {
        super.open();
        setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        FlinkPipelineOptions flinkPipelineOptions = (FlinkPipelineOptions) this.serializedOptions.get().as(FlinkPipelineOptions.class);
        this.sideInputReader = NullSideInputReader.of(this.sideInputs);
        if (this.nonKeyedStateInternals == null) {
            if (this.keyCoder != null) {
                this.nonKeyedStateInternals = new FlinkKeyGroupStateInternals(this.keyCoder, getKeyedStateBackend());
            } else {
                this.nonKeyedStateInternals = new FlinkSplitStateInternals(getOperatorStateBackend());
            }
        }
        if (!this.sideInputs.isEmpty()) {
            this.pushedBackTag = StateTags.bag("pushed-back-values", this.inputCoder);
            this.sideInputHandler = new SideInputHandler(this.sideInputs, new FlinkBroadcastStateInternals(getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend()));
            this.sideInputReader = this.sideInputHandler;
            this.pushedBackWatermark = Optional.absent();
        }
        this.outputManager = this.outputManagerFactory.create(this.output, this.nonKeyedStateInternals);
        if (this.keyCoder != null) {
            this.keyedStateInternals = new FlinkStateInternals<>(getKeyedStateBackend(), this.keyCoder);
            if (this.timerService == null) {
                this.timerService = getInternalTimerService("beam-timer", new CoderTypeSerializer(this.timerCoder), this);
            }
            this.timerInternals = new FlinkTimerInternals(this, null);
        }
        this.doFn = getDoFn();
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
        FlinkStepContext flinkStepContext = new FlinkStepContext();
        this.doFnRunner = DoFnRunners.simpleRunner(flinkPipelineOptions, this.doFn, this.sideInputReader, this.outputManager, this.mainOutputTag, this.additionalOutputTags, flinkStepContext, this.windowingStrategy);
        if (this.doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
            this.doFnRunner = DoFnRunners.lateDataDroppingRunner(this.doFnRunner, flinkStepContext, this.windowingStrategy);
        } else if (this.keyCoder != null) {
            this.doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(this.doFn, this.doFnRunner, this.windowingStrategy, new StatefulDoFnRunner.TimeInternalsCleanupTimer(flinkStepContext.timerInternals(), this.windowingStrategy), new StatefulDoFnRunner.StateInternalsStateCleaner(this.doFn, flinkStepContext.stateInternals(), this.windowingStrategy.getWindowFn().windowCoder()));
        }
        if (flinkPipelineOptions.getEnableMetrics().booleanValue()) {
            this.doFnRunner = new DoFnRunnerWithMetricsUpdate(this.stepName, this.doFnRunner, getRuntimeContext());
        }
        this.elementCount = 0L;
        this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
        long j = (this.maxBundleTimeMills + 1) / 2;
        this.checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate(j2 -> {
            checkInvokeFinishBundleByTime();
        }, j, j);
        this.pushbackDoFnRunner = SimplePushbackSideInputDoFnRunner.create(this.doFnRunner, this.sideInputs, this.sideInputHandler);
    }

    public void dispose() throws Exception {
        try {
            super.dispose();
            this.checkFinishBundleTimer.cancel(true);
        } finally {
            this.doFnInvoker.invokeTeardown();
        }
    }

    public void close() throws Exception {
        Iterable<?> read;
        super.close();
        if (!this.sideInputs.isEmpty() && this.nonKeyedStateInternals != null && (read = this.nonKeyedStateInternals.state(StateNamespaces.global(), this.pushedBackTag).read()) != null && !Iterables.isEmpty(read)) {
            throw new RuntimeException("Leftover pushed-back data: " + Joiner.on(",").join(read) + ". This indicates a bug.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getPushbackWatermarkHold() {
        if (this.sideInputs.isEmpty()) {
            return Long.MAX_VALUE;
        }
        try {
            checkInitPushedBackWatermark();
            return this.pushedBackWatermark.get().longValue();
        } catch (Exception e) {
            throw new RuntimeException("Error retrieving pushed back watermark state.", e);
        }
    }

    private void checkInitPushedBackWatermark() {
        if (this.pushedBackWatermark.isPresent()) {
            return;
        }
        long j = Long.MAX_VALUE;
        Iterator it = this.nonKeyedStateInternals.state(StateNamespaces.global(), this.pushedBackTag).read().iterator();
        while (it.hasNext()) {
            j = Math.min(j, ((WindowedValue) it.next()).getTimestamp().getMillis());
        }
        setPushedBackWatermark(j);
    }

    public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        checkInvokeStartBundle();
        this.doFnRunner.processElement((WindowedValue) streamRecord.getValue());
        checkInvokeFinishBundleByCount();
    }

    private void setPushedBackWatermark(long j) {
        this.pushedBackWatermark = Optional.fromNullable(Long.valueOf(j));
    }

    public final void processElement1(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        checkInvokeStartBundle();
        Iterable<WindowedValue> processElementInReadyWindows = this.pushbackDoFnRunner.processElementInReadyWindows((WindowedValue) streamRecord.getValue());
        BagState state = this.nonKeyedStateInternals.state(StateNamespaces.global(), this.pushedBackTag);
        checkInitPushedBackWatermark();
        long longValue = this.pushedBackWatermark.get().longValue();
        for (WindowedValue windowedValue : processElementInReadyWindows) {
            longValue = Math.min(longValue, windowedValue.getTimestamp().getMillis());
            state.add(windowedValue);
        }
        setPushedBackWatermark(longValue);
        checkInvokeFinishBundleByCount();
    }

    public final void processElement2(StreamRecord<RawUnionValue> streamRecord) throws Exception {
        checkInvokeStartBundle();
        this.sideInputHandler.addSideInputValue(this.sideInputTagMapping.get(Integer.valueOf(((RawUnionValue) streamRecord.getValue()).getUnionTag())), (WindowedValue) ((RawUnionValue) streamRecord.getValue()).getValue());
        BagState state = this.nonKeyedStateInternals.state(StateNamespaces.global(), this.pushedBackTag);
        ArrayList<WindowedValue> arrayList = new ArrayList();
        Iterable<WindowedValue> read = state.read();
        if (read != null) {
            for (WindowedValue windowedValue : read) {
                setKeyContextElement1(new StreamRecord(windowedValue));
                Iterables.addAll(arrayList, this.pushbackDoFnRunner.processElementInReadyWindows(windowedValue));
            }
        }
        state.clear();
        long j = Long.MAX_VALUE;
        for (WindowedValue windowedValue2 : arrayList) {
            j = Math.min(j, windowedValue2.getTimestamp().getMillis());
            state.add(windowedValue2);
        }
        setPushedBackWatermark(j);
        checkInvokeFinishBundleByCount();
        processWatermark1(new Watermark(this.currentInputWatermark));
    }

    public void processWatermark(Watermark watermark) throws Exception {
        processWatermark1(watermark);
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        checkInvokeStartBundle();
        if (this.currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            emitAllPushedBackData();
        }
        if (this.keyCoder == null) {
            setCurrentInputWatermark(watermark.getTimestamp());
            long min = Math.min(getPushbackWatermarkHold(), this.currentInputWatermark);
            if (min > this.currentOutputWatermark) {
                setCurrentOutputWatermark(min);
                emitWatermark(this.currentOutputWatermark);
                return;
            }
            return;
        }
        setCurrentInputWatermark(watermark.getTimestamp());
        long min2 = Math.min(getPushbackWatermarkHold(), watermark.getTimestamp());
        this.timerService.advanceWatermark(toFlinkRuntimeWatermark(min2));
        long min3 = Math.min(min2, Math.min(this.keyedStateInternals.watermarkHold().getMillis(), getPushbackWatermarkHold()));
        if (min3 > this.currentOutputWatermark) {
            setCurrentOutputWatermark(min3);
            emitWatermark(this.currentOutputWatermark);
        }
    }

    private void emitWatermark(long j) {
        if (j >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            invokeFinishBundle();
        }
        this.output.emitWatermark(new Watermark(j));
    }

    public void processWatermark2(Watermark watermark) throws Exception {
        checkInvokeStartBundle();
        setCurrentSideInputWatermark(watermark.getTimestamp());
        if (watermark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
            emitAllPushedBackData();
            processWatermark1(new Watermark(this.currentInputWatermark));
        }
    }

    private static long toFlinkRuntimeWatermark(long j) {
        return j - 1;
    }

    private void emitAllPushedBackData() throws Exception {
        BagState state = this.nonKeyedStateInternals.state(StateNamespaces.global(), this.pushedBackTag);
        Iterable<WindowedValue> read = state.read();
        if (read != null) {
            for (WindowedValue windowedValue : read) {
                setKeyContextElement1(new StreamRecord(windowedValue));
                this.doFnRunner.processElement(windowedValue);
            }
        }
        state.clear();
        setPushedBackWatermark(Long.MAX_VALUE);
    }

    private void checkInvokeStartBundle() {
        if (this.bundleStarted) {
            return;
        }
        this.outputManager.flushBuffer();
        this.pushbackDoFnRunner.startBundle();
        this.bundleStarted = true;
    }

    private void checkInvokeFinishBundleByCount() {
        this.elementCount++;
        if (this.elementCount >= this.maxBundleSize) {
            invokeFinishBundle();
        }
    }

    private void checkInvokeFinishBundleByTime() {
        if (getProcessingTimeService().getCurrentProcessingTime() - this.lastFinishBundleTime >= this.maxBundleTimeMills) {
            invokeFinishBundle();
        }
    }

    private void invokeFinishBundle() {
        if (this.bundleStarted) {
            this.pushbackDoFnRunner.finishBundle();
            this.bundleStarted = false;
            this.elementCount = 0L;
            this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        this.outputManager.openBuffer();
        invokeFinishBundle();
        this.outputManager.closeBuffer();
        if (getKeyedStateBackend() != null) {
            try {
                KeyedStateCheckpointOutputStream rawKeyedOperatorStateOutput = stateSnapshotContext.getRawKeyedOperatorStateOutput();
                try {
                    try {
                        Iterator it = rawKeyedOperatorStateOutput.getKeyGroupList().iterator();
                        while (it.hasNext()) {
                            int intValue = ((Integer) it.next()).intValue();
                            rawKeyedOperatorStateOutput.startNewKeyGroup(intValue);
                            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(rawKeyedOperatorStateOutput);
                            snapshotKeyGroupState(intValue, dataOutputViewStreamWrapper);
                            if (this.keyCoder != null) {
                                this.timerService.snapshotTimersForKeyGroup(dataOutputViewStreamWrapper, intValue);
                            }
                        }
                        try {
                            rawKeyedOperatorStateOutput.close();
                        } catch (Exception e) {
                            LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", getOperatorName(), e);
                        }
                    } catch (Exception e2) {
                        throw new Exception("Could not write timer service of " + getOperatorName() + " to checkpoint state stream.", e2);
                    }
                } catch (Throwable th) {
                    try {
                        rawKeyedOperatorStateOutput.close();
                    } catch (Exception e3) {
                        LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", getOperatorName(), e3);
                    }
                    throw th;
                }
            } catch (Exception e4) {
                throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', e4);
            }
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator
    public void snapshotKeyGroupState(int i, DataOutputStream dataOutputStream) throws Exception {
        if (this.keyCoder != null) {
            ((FlinkKeyGroupStateInternals) this.nonKeyedStateInternals).snapshotKeyGroupState(i, dataOutputStream);
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        if (getKeyedStateBackend() != null) {
            int numberOfKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
            KeyGroupRange keyGroupRange = getKeyedStateBackend().getKeyGroupRange();
            for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : stateInitializationContext.getRawKeyedStateInputs()) {
                DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(keyGroupStatePartitionStreamProvider.getStream());
                int keyGroupId = keyGroupStatePartitionStreamProvider.getKeyGroupId();
                Preconditions.checkArgument(keyGroupRange.contains(keyGroupId), "Key Group " + keyGroupId + " does not belong to the local range.");
                restoreKeyGroupState(keyGroupId, dataInputViewStreamWrapper);
                if (this.keyCoder != null) {
                    if (this.timerService == null) {
                        HeapInternalTimerService<?, TimerInternals.TimerData> heapInternalTimerService = new HeapInternalTimerService<>(numberOfKeyGroups, keyGroupRange, this, getRuntimeContext().getProcessingTimeService());
                        heapInternalTimerService.startTimerService(getKeyedStateBackend().getKeySerializer(), new CoderTypeSerializer(this.timerCoder), this);
                        this.timerService = heapInternalTimerService;
                    }
                    this.timerService.restoreTimersForKeyGroup(dataInputViewStreamWrapper, keyGroupId, getUserCodeClassloader());
                }
            }
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupRestoringOperator
    public void restoreKeyGroupState(int i, DataInputStream dataInputStream) throws Exception {
        if (this.keyCoder != null) {
            if (this.nonKeyedStateInternals == null) {
                this.nonKeyedStateInternals = new FlinkKeyGroupStateInternals(this.keyCoder, getKeyedStateBackend());
            }
            ((FlinkKeyGroupStateInternals) this.nonKeyedStateInternals).restoreKeyGroupState(i, dataInputStream, getUserCodeClassloader());
        }
    }

    public void onEventTime(InternalTimer<Object, TimerInternals.TimerData> internalTimer) throws Exception {
        fireTimer(internalTimer);
    }

    public void onProcessingTime(InternalTimer<Object, TimerInternals.TimerData> internalTimer) throws Exception {
        checkInvokeStartBundle();
        fireTimer(internalTimer);
    }

    public void fireTimer(InternalTimer<?, TimerInternals.TimerData> internalTimer) {
        TimerInternals.TimerData timerData = (TimerInternals.TimerData) internalTimer.getNamespace();
        StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
        Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
        this.pushbackDoFnRunner.onTimer(timerData.getTimerId(), namespace.getWindow(), timerData.getTimestamp(), timerData.getDomain());
    }

    private void setCurrentInputWatermark(long j) {
        this.currentInputWatermark = j;
    }

    private void setCurrentSideInputWatermark(long j) {
        this.currentSideInputWatermark = j;
    }

    private void setCurrentOutputWatermark(long j) {
        this.currentOutputWatermark = j;
    }
}
