package org.apache.beam.runners.core;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.core.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.java.repackaged.com.google.common.collect.Sets;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;

/* loaded from: input_file:org/apache/beam/runners/core/SimpleOldDoFnRunner.class */
class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
    private final OldDoFn<InputT, OutputT> fn;
    private final DoFnContext<InputT, OutputT> context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SimpleOldDoFnRunner$DoFnContext.class */
    public static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context {
        private static final int MAX_SIDE_OUTPUTS = 1000;
        final PipelineOptions options;
        final OldDoFn<InputT, OutputT> fn;
        final SideInputReader sideInputReader;
        final DoFnRunners.OutputManager outputManager;
        final TupleTag<OutputT> mainOutputTag;
        final ExecutionContext.StepContext stepContext;
        final AggregatorFactory aggregatorFactory;
        final WindowFn<?, ?> windowFn;
        private Set<TupleTag<?>> outputTags;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public DoFnContext(PipelineOptions pipelineOptions, OldDoFn<InputT, OutputT> oldDoFn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, AggregatorFactory aggregatorFactory, WindowFn<?, ?> windowFn) {
            super();
            oldDoFn.getClass();
            this.options = pipelineOptions;
            this.fn = oldDoFn;
            this.sideInputReader = sideInputReader;
            this.outputManager = outputManager;
            this.mainOutputTag = tupleTag;
            this.outputTags = Sets.newHashSet();
            this.outputTags.add(tupleTag);
            Iterator<TupleTag<?>> it = list.iterator();
            while (it.hasNext()) {
                this.outputTags.add(it.next());
            }
            this.stepContext = stepContext;
            this.aggregatorFactory = aggregatorFactory;
            this.windowFn = windowFn;
            super.setupDelegateAggregators();
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public PipelineOptions getPipelineOptions() {
            return this.options;
        }

        <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(T t, Instant instant, Collection<W> collection, PaneInfo paneInfo) {
            if (instant == null) {
                instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
            }
            if (collection == null) {
                try {
                    WindowFn<?, ?> windowFn = this.windowFn;
                    windowFn.getClass();
                    collection = windowFn.assignWindows(new WindowFn<Object, W>.AssignContext(windowFn, instant) { // from class: org.apache.beam.runners.core.SimpleOldDoFnRunner.DoFnContext.1
                        final /* synthetic */ Instant val$inputTimestamp;

                        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                        {
                            super(windowFn);
                            this.val$inputTimestamp = instant;
                            windowFn.getClass();
                        }

                        public Object element() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input element when none was available");
                        }

                        public Instant timestamp() {
                            if (this.val$inputTimestamp == null) {
                                throw new UnsupportedOperationException("WindowFn attempted to access input timestamp when none was available");
                            }
                            return this.val$inputTimestamp;
                        }

                        /* JADX WARN: Incorrect return type in method signature: ()TW; */
                        public BoundedWindow window() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input windows when none were available");
                        }
                    });
                } catch (Exception e) {
                    throw UserCodeException.wrap(e);
                }
            }
            return WindowedValue.of(t, instant, collection, paneInfo);
        }

        public <T> T sideInput(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
            if (this.sideInputReader.contains(pCollectionView)) {
                return (T) this.sideInputReader.get(pCollectionView, boundedWindow);
            }
            throw new IllegalArgumentException("calling sideInput() with unknown view");
        }

        /* JADX WARN: Multi-variable type inference failed */
        void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            outputWindowedValue(makeWindowedValue(outputt, instant, collection, paneInfo));
        }

        void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
            this.outputManager.output(this.mainOutputTag, windowedValue);
            if (this.stepContext != null) {
                this.stepContext.noteOutput(windowedValue);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public <T> void outputWindowedValue(TupleTag<T> tupleTag, T t, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            outputWindowedValue(tupleTag, makeWindowedValue(t, instant, collection, paneInfo));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public <T> void outputWindowedValue(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (!this.outputTags.contains(tupleTag)) {
                if (this.outputTags.size() >= MAX_SIDE_OUTPUTS) {
                    throw new IllegalArgumentException("the number of outputs has exceeded a limit of 1000");
                }
                this.outputTags.add(tupleTag);
            }
            this.outputManager.output(tupleTag, windowedValue);
            if (this.stepContext != null) {
                this.stepContext.noteOutput(tupleTag, windowedValue);
            }
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public void output(OutputT outputt) {
            outputWindowedValue(outputt, null, null, PaneInfo.NO_FIRING);
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public void outputWithTimestamp(OutputT outputt, Instant instant) {
            outputWindowedValue(outputt, instant, null, PaneInfo.NO_FIRING);
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public <T> void output(TupleTag<T> tupleTag, T t) {
            Preconditions.checkNotNull(tupleTag, "TupleTag passed to output cannot be null");
            outputWindowedValue(tupleTag, t, null, null, PaneInfo.NO_FIRING);
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public <T> void outputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
            Preconditions.checkNotNull(tupleTag, "TupleTag passed to outputWithTimestamp cannot be null");
            outputWindowedValue(tupleTag, t, instant, null, PaneInfo.NO_FIRING);
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String str, Combine.CombineFn<AggInputT, ?, AggOutputT> combineFn) {
            Preconditions.checkNotNull(combineFn, "Combiner passed to createAggregatorInternal cannot be null");
            return this.aggregatorFactory.createAggregatorForDoFn(this.fn.getClass(), this.stepContext, str, combineFn);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SimpleOldDoFnRunner$DoFnProcessContext.class */
    public static class DoFnProcessContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.ProcessContext {
        final OldDoFn<InputT, OutputT> fn;
        final DoFnContext<InputT, OutputT> context;
        final WindowedValue<InputT> windowedValue;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public DoFnProcessContext(OldDoFn<InputT, OutputT> oldDoFn, DoFnContext<InputT, OutputT> doFnContext, WindowedValue<InputT> windowedValue) {
            super();
            oldDoFn.getClass();
            this.fn = oldDoFn;
            this.context = doFnContext;
            this.windowedValue = windowedValue;
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        @Override // org.apache.beam.runners.core.OldDoFn.ProcessContext
        public InputT element() {
            return (InputT) this.windowedValue.getValue();
        }

        @Override // org.apache.beam.runners.core.OldDoFn.ProcessContext
        public <T> T sideInput(PCollectionView<T> pCollectionView) {
            GlobalWindow globalWindow;
            Preconditions.checkNotNull(pCollectionView, "View passed to sideInput cannot be null");
            Iterator<? extends BoundedWindow> it = windows().iterator();
            if (it.hasNext()) {
                globalWindow = (BoundedWindow) it.next();
                if (it.hasNext()) {
                    throw new IllegalStateException("sideInput called when main input element is in multiple windows");
                }
            } else {
                if (!(this.context.windowFn instanceof GlobalWindows)) {
                    throw new IllegalStateException("sideInput called when main input element is not in any windows");
                }
                globalWindow = GlobalWindow.INSTANCE;
            }
            return (T) this.context.sideInput(pCollectionView, pCollectionView.getWindowMappingFn().getSideInputWindow(globalWindow));
        }

        @Override // org.apache.beam.runners.core.OldDoFn.ProcessContext
        public BoundedWindow window() {
            if (this.fn instanceof OldDoFn.RequiresWindowAccess) {
                return (BoundedWindow) Iterables.getOnlyElement(windows());
            }
            throw new UnsupportedOperationException("window() is only available in the context of a OldDoFn marked asRequiresWindowAccess.");
        }

        @Override // org.apache.beam.runners.core.OldDoFn.ProcessContext
        public PaneInfo pane() {
            return this.windowedValue.getPane();
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public void output(OutputT outputt) {
            this.context.outputWindowedValue(this.windowedValue.withValue(outputt));
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public void outputWithTimestamp(OutputT outputt, Instant instant) {
            checkTimestamp(instant);
            this.context.outputWindowedValue(outputt, instant, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.context.outputWindowedValue(outputt, instant, collection, paneInfo);
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public <T> void output(TupleTag<T> tupleTag, T t) {
            Preconditions.checkNotNull(tupleTag, "Tag passed to output cannot be null");
            this.context.outputWindowedValue(tupleTag, this.windowedValue.withValue(t));
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public <T> void outputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
            Preconditions.checkNotNull(tupleTag, "Tag passed to outputWithTimestamp cannot be null");
            checkTimestamp(instant);
            this.context.outputWindowedValue(tupleTag, t, instant, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        @Override // org.apache.beam.runners.core.OldDoFn.ProcessContext
        public Instant timestamp() {
            return this.windowedValue.getTimestamp();
        }

        public Collection<? extends BoundedWindow> windows() {
            return this.windowedValue.getWindows();
        }

        private void checkTimestamp(Instant instant) {
            if (instant.isBefore(this.windowedValue.getTimestamp().minus(this.fn.getAllowedTimestampSkew()))) {
                throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s). See the OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", instant, this.windowedValue.getTimestamp(), PeriodFormat.getDefault().print(this.fn.getAllowedTimestampSkew().toPeriod())));
            }
        }

        @Override // org.apache.beam.runners.core.OldDoFn.ProcessContext
        public WindowingInternals<InputT, OutputT> windowingInternals() {
            return new WindowingInternals<InputT, OutputT>() { // from class: org.apache.beam.runners.core.SimpleOldDoFnRunner.DoFnProcessContext.1
                @Override // org.apache.beam.runners.core.WindowingInternals
                public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                    DoFnProcessContext.this.context.outputWindowedValue(outputt, instant, collection, paneInfo);
                }

                @Override // org.apache.beam.runners.core.WindowingInternals
                public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                    DoFnProcessContext.this.context.outputWindowedValue(tupleTag, additionaloutputt, instant, collection, paneInfo);
                }

                @Override // org.apache.beam.runners.core.WindowingInternals
                public Collection<? extends BoundedWindow> windows() {
                    return DoFnProcessContext.this.windowedValue.getWindows();
                }

                @Override // org.apache.beam.runners.core.WindowingInternals
                public PaneInfo pane() {
                    return DoFnProcessContext.this.windowedValue.getPane();
                }

                @Override // org.apache.beam.runners.core.WindowingInternals
                public TimerInternals timerInternals() {
                    return DoFnProcessContext.this.context.stepContext.timerInternals();
                }

                @Override // org.apache.beam.runners.core.WindowingInternals
                public StateInternals<?> stateInternals() {
                    return DoFnProcessContext.this.context.stepContext.stateInternals();
                }

                @Override // org.apache.beam.runners.core.WindowingInternals
                public <T> T sideInput(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
                    return (T) DoFnProcessContext.this.context.sideInput(pCollectionView, boundedWindow);
                }
            };
        }

        @Override // org.apache.beam.runners.core.OldDoFn.Context
        public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> createAggregatorInternal(String str, Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) {
            return (Aggregator<AggregatorInputT, AggregatorOutputT>) this.context.createAggregatorInternal(str, combineFn);
        }
    }

    public SimpleOldDoFnRunner(PipelineOptions pipelineOptions, OldDoFn<InputT, OutputT> oldDoFn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
        this.fn = oldDoFn;
        this.context = new DoFnContext<>(pipelineOptions, oldDoFn, sideInputReader, outputManager, tupleTag, list, stepContext, aggregatorFactory, windowingStrategy == null ? null : windowingStrategy.getWindowFn());
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void startBundle() {
        try {
            this.fn.startBundle(this.context);
        } catch (Throwable th) {
            throw wrapUserCodeException(th);
        }
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void processElement(WindowedValue<InputT> windowedValue) {
        if (windowedValue.getWindows().size() <= 1 || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(this.fn.getClass()) && this.context.sideInputReader.isEmpty())) {
            invokeProcessElement(windowedValue);
            return;
        }
        Iterator it = windowedValue.explodeWindows().iterator();
        while (it.hasNext()) {
            invokeProcessElement((WindowedValue) it.next());
        }
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
        throw new UnsupportedOperationException(String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName()));
    }

    private void invokeProcessElement(WindowedValue<InputT> windowedValue) {
        try {
            this.fn.processElement(createProcessContext(windowedValue));
        } catch (Exception e) {
            throw wrapUserCodeException(e);
        }
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void finishBundle() {
        try {
            this.fn.finishBundle(this.context);
        } catch (Throwable th) {
            throw wrapUserCodeException(th);
        }
    }

    private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> windowedValue) {
        return new DoFnProcessContext(this.fn, this.context, windowedValue);
    }

    private RuntimeException wrapUserCodeException(Throwable th) {
        throw UserCodeException.wrapIf(!isSystemDoFn(), th);
    }

    private boolean isSystemDoFn() {
        return this.fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
    }
}
