package org.apache.beam.sdk.io;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.class */
public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
    private final UnboundedSource<T, ?> source;
    private final long maxNumRecords;

    @Nullable
    private final Duration maxReadTime;
    private final BoundedSource<ValueWithRecordId<T>> adaptedSource;
    private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(10)).withMaxBackoff(Duration.standardSeconds(10));

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.class */
    public static abstract class UnboundedToBoundedSourceAdapter<T> extends BoundedSource<ValueWithRecordId<T>> {

        /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Builder.class */
        static abstract class Builder<T> {
            abstract Builder<T> setSource(UnboundedSource<T, ?> unboundedSource);

            abstract Builder<T> setMaxNumRecords(long j);

            abstract Builder<T> setMaxReadTime(@Nullable Duration duration);

            abstract UnboundedToBoundedSourceAdapter<T> build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.class */
        private class Reader extends BoundedSource.BoundedReader<ValueWithRecordId<T>> {
            private long recordsRead;

            @Nullable
            private Instant endTime;
            private UnboundedSource.UnboundedReader<T> reader;

            private Reader(UnboundedSource.UnboundedReader<T> unboundedReader) {
                this.recordsRead = 0L;
                this.recordsRead = 0L;
                if (UnboundedToBoundedSourceAdapter.this.getMaxReadTime() != null) {
                    this.endTime = Instant.now().plus(UnboundedToBoundedSourceAdapter.this.getMaxReadTime());
                } else {
                    this.endTime = null;
                }
                this.reader = unboundedReader;
            }

            @Override // org.apache.beam.sdk.io.Source.Reader
            public boolean start() throws IOException {
                if (UnboundedToBoundedSourceAdapter.this.getMaxNumRecords() <= 0) {
                    return false;
                }
                if (UnboundedToBoundedSourceAdapter.this.getMaxReadTime() != null && UnboundedToBoundedSourceAdapter.this.getMaxReadTime().getMillis() == 0) {
                    return false;
                }
                this.recordsRead++;
                if (this.reader.start()) {
                    return true;
                }
                return advanceWithBackoff();
            }

            @Override // org.apache.beam.sdk.io.Source.Reader
            public boolean advance() throws IOException {
                if (this.recordsRead >= UnboundedToBoundedSourceAdapter.this.getMaxNumRecords()) {
                    finalizeCheckpoint();
                    return false;
                }
                this.recordsRead++;
                return advanceWithBackoff();
            }

            private boolean advanceWithBackoff() throws IOException {
                BackOff backoff = BoundedReadFromUnboundedSource.BACKOFF_FACTORY.backoff();
                long nextBackOffMillis = backoff.nextBackOffMillis();
                while (true) {
                    long j = nextBackOffMillis;
                    if (j == -1) {
                        finalizeCheckpoint();
                        return false;
                    }
                    if (this.endTime != null && Instant.now().isAfter(this.endTime)) {
                        finalizeCheckpoint();
                        return false;
                    }
                    if (this.reader.advance()) {
                        return true;
                    }
                    Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
                    nextBackOffMillis = backoff.nextBackOffMillis();
                }
            }

            private void finalizeCheckpoint() throws IOException {
                this.reader.getCheckpointMark().finalizeCheckpoint();
            }

            @Override // org.apache.beam.sdk.io.Source.Reader
            public ValueWithRecordId<T> getCurrent() throws NoSuchElementException {
                return new ValueWithRecordId<>(this.reader.getCurrent(), this.reader.getCurrentRecordId());
            }

            @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return this.reader.getCurrentTimestamp();
            }

            @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
            public void close() throws IOException {
                this.reader.close();
            }

            @Override // org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
            public BoundedSource<ValueWithRecordId<T>> getCurrentSource() {
                return UnboundedToBoundedSourceAdapter.this;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract UnboundedSource<T, ?> getSource();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Duration getMaxReadTime();

        abstract Builder<T> toBuilder();

        private static long[] splitNumRecords(long j, int i) {
            long[] jArr = new long[i];
            for (int i2 = 0; i2 < i; i2++) {
                jArr[i2] = j / i;
            }
            for (int i3 = 0; i3 < j % i; i3++) {
                jArr[i3] = jArr[i3] + 1;
            }
            return jArr;
        }

        private static int numInitialSplits(long j) {
            return (int) Math.min(100L, (j / 10000) + 1);
        }

        @Override // org.apache.beam.sdk.io.BoundedSource
        public List<? extends BoundedSource<ValueWithRecordId<T>>> split(long j, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            List<? extends UnboundedSource<T, ?>> split = getSource().split(numInitialSplits(getMaxNumRecords()), pipelineOptions);
            int size = split.size();
            long[] splitNumRecords = splitNumRecords(getMaxNumRecords(), size);
            for (int i = 0; i < size; i++) {
                arrayList.add(toBuilder().setSource(split.get(i)).setMaxNumRecords(splitNumRecords[i]).setMaxReadTime(getMaxReadTime()).build());
            }
            return arrayList;
        }

        @Override // org.apache.beam.sdk.io.BoundedSource
        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
            return 0L;
        }

        @Override // org.apache.beam.sdk.io.Source
        public Coder<ValueWithRecordId<T>> getOutputCoder() {
            return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getOutputCoder());
        }

        @Override // org.apache.beam.sdk.io.Source
        public void validate() {
            getSource().validate();
        }

        @Override // org.apache.beam.sdk.io.BoundedSource
        public BoundedSource.BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions pipelineOptions) throws IOException {
            return new Reader(getSource().createReader(pipelineOptions, null));
        }

        @Override // org.apache.beam.sdk.io.Source, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.delegate(getSource());
        }
    }

    public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long j) {
        return new BoundedReadFromUnboundedSource<>(this.source, j, this.maxReadTime);
    }

    public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration duration) {
        return new BoundedReadFromUnboundedSource<>(this.source, this.maxNumRecords, duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BoundedReadFromUnboundedSource(UnboundedSource<T, ?> unboundedSource, long j, @Nullable Duration duration) {
        this.source = unboundedSource;
        this.maxNumRecords = j;
        this.maxReadTime = duration;
        this.adaptedSource = new AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter.Builder().setSource(unboundedSource).setMaxNumRecords(j).setMaxReadTime(duration).build();
    }

    @Experimental
    public BoundedSource<ValueWithRecordId<T>> getAdaptedSource() {
        return this.adaptedSource;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PCollection<T> expand(PBegin pBegin) {
        PCollection pCollection = (PCollection) Pipeline.applyTransform(pBegin, Read.from(getAdaptedSource()));
        if (this.source.requiresDeduping()) {
            pCollection = (PCollection) pCollection.apply(Distinct.withRepresentativeValueFn((v0) -> {
                return v0.getId();
            }).withRepresentativeType(TypeDescriptor.of(byte[].class)));
        }
        return ((PCollection) pCollection.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()))).setCoder(this.source.getOutputCoder());
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public String getKindString() {
        return String.format("Read(%s)", NameUtils.approximateSimpleName(this.source));
    }

    @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item("source", this.source.getClass()).withLabel("Read Source")).addIfNotDefault(DisplayData.item("maxRecords", Long.valueOf(this.maxNumRecords)).withLabel("Maximum Read Records"), Long.valueOf(OffsetRangeTracker.OFFSET_INFINITY)).addIfNotNull(DisplayData.item("maxReadTime", this.maxReadTime).withLabel("Maximum Read Time")).include("source", this.source);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/ValueWithRecordId") && serializedLambda.getImplMethodSignature().equals("()[B")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
