package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.class */
public class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {

    @VisibleForTesting
    static final int MAX_READER_REUSE_COUNT = 20;
    private final ConcurrentMap<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>> sourceEvaluators = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.class */
    public static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements TransformEvaluator<Object> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> transform;
        private final EvaluationContext evaluationContext;
        private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue;
        private final UnboundedSource<OutputT, CheckpointMarkT> source;
        private final UnboundedReadDeduplicator deduplicator;
        private int outputBundles = 0;
        private UnboundedSource.UnboundedReader<OutputT> currentReader = null;
        private CheckpointMarkT checkpointMark = null;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, EvaluationContext evaluationContext, UnboundedSource<OutputT, CheckpointMarkT> unboundedSource, UnboundedReadDeduplicator unboundedReadDeduplicator, ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> concurrentLinkedQueue) {
            this.transform = appliedPTransform;
            this.evaluationContext = evaluationContext;
            this.evaluatorQueue = concurrentLinkedQueue;
            this.source = unboundedSource;
            this.deduplicator = unboundedReadDeduplicator;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<Object> windowedValue) {
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult finishBundle() throws IOException {
            DirectRunner.UncommittedBundle<?> createRootBundle = this.evaluationContext.createRootBundle(this.transform.getOutput());
            try {
                boolean startReader = startReader();
                Instant watermark = this.currentReader.getWatermark();
                if (startReader) {
                    int i = 0;
                    do {
                        if (this.deduplicator.shouldOutput(this.currentReader.getCurrentRecordId())) {
                            createRootBundle.add(WindowedValue.timestampedValueInGlobalWindow(this.currentReader.getCurrent(), this.currentReader.getCurrentTimestamp()));
                        }
                        i++;
                        if (i >= 10) {
                            break;
                        }
                    } while (this.currentReader.advance());
                    watermark = this.currentReader.getWatermark();
                    finishRead();
                }
                StepTransformResult build = StepTransformResult.withHold(this.transform, watermark).addOutput(createRootBundle, new DirectRunner.UncommittedBundle[0]).build();
                this.evaluatorQueue.offer(this);
                return build;
            } catch (IOException e) {
                closeReader();
                throw e;
            }
        }

        private boolean startReader() throws IOException {
            if (this.currentReader != null) {
                return this.currentReader.advance();
            }
            if (this.checkpointMark != null) {
                this.checkpointMark.finalizeCheckpoint();
            }
            this.currentReader = this.source.createReader(this.evaluationContext.getPipelineOptions(), this.checkpointMark);
            this.checkpointMark = null;
            return this.currentReader.start();
        }

        private void finishRead() throws IOException {
            CheckpointMarkT checkpointmarkt = this.checkpointMark;
            final CheckpointMarkT checkpointmarkt2 = (CheckpointMarkT) this.currentReader.getCheckpointMark();
            this.checkpointMark = checkpointmarkt2;
            if (checkpointmarkt != null) {
                checkpointmarkt.finalizeCheckpoint();
            }
            if (!this.currentReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                this.evaluationContext.scheduleAfterOutputWouldBeProduced((PValue) this.transform.getOutput(), GlobalWindow.INSTANCE, this.transform.getOutput().getWindowingStrategy(), new Runnable() { // from class: org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            checkpointmarkt2.finalizeCheckpoint();
                        } catch (IOException e) {
                            throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                        }
                    }
                });
            }
            if (this.outputBundles < 20) {
                this.outputBundles++;
            } else {
                closeReader();
                this.outputBundles = 0;
            }
        }

        private void closeReader() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
                this.currentReader = null;
            }
        }
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, @Nullable DirectRunner.CommittedBundle<?> committedBundle, EvaluationContext evaluationContext) {
        return (TransformEvaluator<InputT>) getTransformEvaluator(appliedPTransform, evaluationContext);
    }

    private <OutputT> TransformEvaluator<?> getTransformEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, EvaluationContext evaluationContext) {
        return (TransformEvaluator) getTransformEvaluatorQueue(appliedPTransform, evaluationContext).poll();
    }

    private <OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, EvaluationContext evaluationContext) {
        ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>> concurrentLinkedQueue = this.sourceEvaluators.get(appliedPTransform);
        if (concurrentLinkedQueue == null) {
            concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            if (this.sourceEvaluators.putIfAbsent(appliedPTransform, concurrentLinkedQueue) == null) {
                UnboundedSource source = appliedPTransform.getTransform().getSource();
                concurrentLinkedQueue.offer(new UnboundedReadEvaluator(appliedPTransform, evaluationContext, source, source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create(), concurrentLinkedQueue));
            } else {
                concurrentLinkedQueue = this.sourceEvaluators.get(appliedPTransform);
            }
        }
        return concurrentLinkedQueue;
    }
}
