package org.apache.beam.sdk.io;

import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/CountingSource.class */
public class CountingSource {

    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSource$BoundedCountingReader.class */
    private static class BoundedCountingReader extends OffsetBasedSource.OffsetBasedReader<Long> {
        private long current;
        private final Counter elementsRead;

        public BoundedCountingReader(OffsetBasedSource<Long> offsetBasedSource) {
            super(offsetBasedSource);
            this.elementsRead = SourceMetrics.elementsRead();
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected long getCurrentOffset() throws NoSuchElementException {
            return this.current;
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader, org.apache.beam.sdk.io.BoundedSource.BoundedReader
        public synchronized long getSplitPointsRemaining() {
            return Math.max(0L, getCurrentSource().getEndOffset() - this.current);
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader, org.apache.beam.sdk.io.BoundedSource.BoundedReader, org.apache.beam.sdk.io.Source.Reader
        public synchronized BoundedCountingSource getCurrentSource() {
            return (BoundedCountingSource) super.getCurrentSource();
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public Long getCurrent() throws NoSuchElementException {
            return Long.valueOf(this.current);
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected boolean startImpl() throws IOException {
            this.current = getCurrentSource().getStartOffset();
            return true;
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected boolean advanceImpl() throws IOException {
            this.elementsRead.inc();
            this.current++;
            return true;
        }

        @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSource$BoundedCountingSource.class */
    public static class BoundedCountingSource extends OffsetBasedSource<Long> {
        public BoundedCountingSource(long j, long j2) {
            super(j, j2, 1L);
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource
        public long getBytesPerOffset() {
            return 8L;
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource
        public long getMaxEndOffset(PipelineOptions pipelineOptions) throws Exception {
            return getEndOffset();
        }

        @Override // org.apache.beam.sdk.io.OffsetBasedSource
        public OffsetBasedSource<Long> createSourceForSubrange(long j, long j2) {
            return new BoundedCountingSource(j, j2);
        }

        @Override // org.apache.beam.sdk.io.BoundedSource
        public BoundedSource.BoundedReader<Long> createReader(PipelineOptions pipelineOptions) throws IOException {
            return new BoundedCountingReader(this);
        }

        @Override // org.apache.beam.sdk.io.Source
        public Coder<Long> getDefaultOutputCoder() {
            return VarLongCoder.of();
        }
    }

    @DefaultCoder(AvroCoder.class)
    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSource$CounterMark.class */
    public static class CounterMark implements UnboundedSource.CheckpointMark {
        private final long lastEmitted;
        private final Instant startTime;

        public CounterMark(long j, Instant instant) {
            this.lastEmitted = j;
            this.startTime = instant;
        }

        public long getLastEmitted() {
            return this.lastEmitted;
        }

        public Instant getStartTime() {
            return this.startTime;
        }

        private CounterMark() {
            this.lastEmitted = 0L;
            this.startTime = Instant.now();
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.CheckpointMark
        public void finalizeCheckpoint() throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSource$NowTimestampFn.class */
    static class NowTimestampFn implements SerializableFunction<Long, Instant> {
        NowTimestampFn() {
        }

        @Override // org.apache.beam.sdk.transforms.SerializableFunction
        public Instant apply(Long l) {
            return Instant.now();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSource$UnboundedCountingReader.class */
    public static class UnboundedCountingReader extends UnboundedSource.UnboundedReader<Long> {
        private UnboundedCountingSource source;
        private long current;
        private Instant currentTimestamp;
        private Instant firstStarted;
        private final Counter elementsRead = SourceMetrics.elementsRead();

        public UnboundedCountingReader(UnboundedCountingSource unboundedCountingSource, CounterMark counterMark) {
            this.source = unboundedCountingSource;
            if (counterMark == null) {
                this.current = unboundedCountingSource.start - unboundedCountingSource.stride;
            } else {
                this.current = counterMark.getLastEmitted();
                this.firstStarted = counterMark.getStartTime();
            }
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
        public boolean start() throws IOException {
            if (this.firstStarted == null) {
                this.firstStarted = Instant.now();
            }
            return advance();
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
        public boolean advance() throws IOException {
            if (OffsetRangeTracker.OFFSET_INFINITY - this.source.stride < this.current) {
                return false;
            }
            long j = this.current + this.source.stride;
            if (expectedValue() < j) {
                return false;
            }
            this.elementsRead.inc();
            this.current = j;
            this.currentTimestamp = (Instant) this.source.timestampFn.apply(Long.valueOf(this.current));
            return true;
        }

        private long expectedValue() {
            if (this.source.period.getMillis() == 0) {
                return OffsetRangeTracker.OFFSET_INFINITY;
            }
            return (long) (this.source.elementsPerPeriod * ((Instant.now().getMillis() - this.firstStarted.getMillis()) / this.source.period.getMillis()));
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
        public Instant getWatermark() {
            return (Instant) this.source.timestampFn.apply(Long.valueOf(this.current));
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
        public CounterMark getCheckpointMark() {
            return new CounterMark(this.current, this.firstStarted);
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
        public UnboundedSource<Long, CounterMark> getCurrentSource() {
            return this.source;
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public Long getCurrent() throws NoSuchElementException {
            return Long.valueOf(this.current);
        }

        @Override // org.apache.beam.sdk.io.Source.Reader
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentTimestamp;
        }

        @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
        public long getSplitBacklogBytes() {
            return Math.max(0L, (8 * (expectedValue() - this.current)) / this.source.stride);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSource$UnboundedCountingSource.class */
    public static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
        private final long start;
        private final long stride;
        private final long elementsPerPeriod;
        private final Duration period;
        private final SerializableFunction<Long, Instant> timestampFn;

        private UnboundedCountingSource(long j, long j2, long j3, Duration duration, SerializableFunction<Long, Instant> serializableFunction) {
            this.start = j;
            this.stride = j2;
            Preconditions.checkArgument(j3 > 0, "Must produce at least one element per period, got %s", j3);
            this.elementsPerPeriod = j3;
            Preconditions.checkArgument(duration.getMillis() >= 0, "Must have a non-negative period length, got %s", duration);
            this.period = duration;
            this.timestampFn = serializableFunction;
        }

        public UnboundedCountingSource withRate(long j, Duration duration) {
            return new UnboundedCountingSource(this.start, this.stride, j, duration, this.timestampFn);
        }

        public UnboundedCountingSource withTimestampFn(SerializableFunction<Long, Instant> serializableFunction) {
            Preconditions.checkNotNull(serializableFunction);
            return new UnboundedCountingSource(this.start, this.stride, this.elementsPerPeriod, this.period, serializableFunction);
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public List<? extends UnboundedSource<Long, CounterMark>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            long j = this.stride * i;
            ImmutableList.Builder builder = ImmutableList.builder();
            for (int i2 = 0; i2 < i; i2++) {
                builder.add((ImmutableList.Builder) new UnboundedCountingSource(this.start + (i2 * this.stride), j, this.elementsPerPeriod, this.period, this.timestampFn));
            }
            return builder.build();
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public UnboundedSource.UnboundedReader<Long> createReader(PipelineOptions pipelineOptions, CounterMark counterMark) {
            return new UnboundedCountingReader(this, counterMark);
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public Coder<CounterMark> getCheckpointMarkCoder() {
            return AvroCoder.of(CounterMark.class);
        }

        @Override // org.apache.beam.sdk.io.Source
        public void validate() {
        }

        @Override // org.apache.beam.sdk.io.Source
        public Coder<Long> getDefaultOutputCoder() {
            return VarLongCoder.of();
        }
    }

    @Deprecated
    public static BoundedSource<Long> upTo(long j) {
        Preconditions.checkArgument(j >= 0, "numElements (%s) must be greater than or equal to 0", j);
        return new BoundedCountingSource(0L, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BoundedSource<Long> createSourceForSubrange(long j, long j2) {
        Preconditions.checkArgument(j2 >= j, "endIndex (%s) must be greater than or equal to startIndex (%s)", j2, j);
        return new BoundedCountingSource(j, j2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static UnboundedCountingSource createUnboundedFrom(long j) {
        return new UnboundedCountingSource(j, 1L, 1L, Duration.ZERO, new NowTimestampFn());
    }

    @Deprecated
    public static UnboundedSource<Long, CounterMark> unbounded() {
        return unboundedWithTimestampFn(new NowTimestampFn());
    }

    @Deprecated
    public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(SerializableFunction<Long, Instant> serializableFunction) {
        return new UnboundedCountingSource(0L, 1L, 1L, Duration.ZERO, serializableFunction);
    }

    private CountingSource() {
    }
}
