package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Futures;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ListeningExecutorService;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.MoreExecutors;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.fusesource.jansi.AnsiRenderer;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSource.class */
public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FileBasedSource.class);
    private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
    static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
    static final int THREAD_POOL_SIZE = 128;
    private final String fileOrPatternSpec;
    private final Mode mode;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSource$FileBasedReader.class */
    public static abstract class FileBasedReader<T> extends OffsetBasedSource.OffsetBasedReader<T> {
        private ReadableByteChannel channel;

        public FileBasedReader(FileBasedSource<T> fileBasedSource) {
            super(fileBasedSource);
            this.channel = null;
            Preconditions.checkArgument(fileBasedSource.getMode() != Mode.FILEPATTERN, "FileBasedReader does not support reading file patterns");
        }

        @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource.OffsetBasedReader, com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
        public synchronized FileBasedSource<T> getCurrentSource() {
            return (FileBasedSource) super.getCurrentSource();
        }

        @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected final boolean startImpl() throws IOException {
            FileBasedSource<T> currentSource = getCurrentSource();
            this.channel = IOChannelUtils.getFactory(currentSource.getFileOrPatternSpec()).open(currentSource.getFileOrPatternSpec());
            if (this.channel instanceof SeekableByteChannel) {
                ((SeekableByteChannel) this.channel).position(currentSource.getStartOffset());
            } else {
                Preconditions.checkArgument(((FileBasedSource) currentSource).mode != Mode.SINGLE_FILE_OR_SUBRANGE, "Subrange-based sources must only be defined for file types that support seekable  read channels");
                Preconditions.checkArgument(currentSource.getStartOffset() == 0, "Start offset " + currentSource.getStartOffset() + " is not zero but channel for reading the file is not seekable.");
            }
            startReading(this.channel);
            return advanceImpl();
        }

        @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource.OffsetBasedReader
        protected final boolean advanceImpl() throws IOException {
            return readNextRecord();
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.channel != null) {
                this.channel.close();
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract void startReading(ReadableByteChannel readableByteChannel) throws IOException;

        /* JADX INFO: Access modifiers changed from: protected */
        public abstract boolean readNextRecord() throws IOException;
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSource$FilePatternReader.class */
    private class FilePatternReader extends BoundedSource.BoundedReader<T> {
        private final FileBasedSource<T> source;
        private final List<FileBasedReader<T>> fileReaders;
        final ListIterator<FileBasedReader<T>> fileReadersIterator;
        FileBasedReader<T> currentReader = null;

        public FilePatternReader(FileBasedSource<T> fileBasedSource, List<FileBasedReader<T>> list) {
            this.source = fileBasedSource;
            this.fileReaders = list;
            this.fileReadersIterator = list.listIterator();
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
        public boolean start() throws IOException {
            return startNextNonemptyReader();
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
        public boolean advance() throws IOException {
            Preconditions.checkState(this.currentReader != null, "Call start() before advance()");
            if (this.currentReader.advance()) {
                return true;
            }
            return startNextNonemptyReader();
        }

        private boolean startNextNonemptyReader() throws IOException {
            while (this.fileReadersIterator.hasNext()) {
                this.currentReader = this.fileReadersIterator.next();
                if (this.currentReader.start()) {
                    return true;
                }
                this.currentReader.close();
            }
            return false;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader
        public T getCurrent() throws NoSuchElementException {
            return this.currentReader.getCurrent();
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentReader.getCurrentTimestamp();
        }

        @Override // com.google.cloud.dataflow.sdk.io.Source.Reader, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            while (this.fileReadersIterator.hasNext()) {
                this.fileReadersIterator.next().close();
            }
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader, com.google.cloud.dataflow.sdk.io.Source.Reader
        public FileBasedSource<T> getCurrentSource() {
            return this.source;
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader
        public FileBasedSource<T> splitAtFraction(double d) {
            FileBasedSource.LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
            return null;
        }

        @Override // com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader
        public Double getFractionConsumed() {
            int previousIndex;
            int size;
            if (this.currentReader == null) {
                return Double.valueOf(CMAESOptimizer.DEFAULT_STOPFITNESS);
            }
            if (!this.fileReaders.isEmpty() && (previousIndex = this.fileReadersIterator.previousIndex()) != (size = this.fileReaders.size())) {
                double d = (1.0d * previousIndex) / size;
                double d2 = (1.0d * (previousIndex + 1)) / size;
                Double fractionConsumed = this.currentReader.getFractionConsumed();
                return fractionConsumed == null ? Double.valueOf(d) : Double.valueOf(d + (fractionConsumed.doubleValue() * (d2 - d)));
            }
            return Double.valueOf(1.0d);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSource$Mode.class */
    public enum Mode {
        FILEPATTERN,
        SINGLE_FILE_OR_SUBRANGE
    }

    public FileBasedSource(String str, long j) {
        super(0L, Long.MAX_VALUE, j);
        this.mode = Mode.FILEPATTERN;
        this.fileOrPatternSpec = str;
    }

    public FileBasedSource(String str, long j, long j2, long j3) {
        super(j2, j3, j);
        this.mode = Mode.SINGLE_FILE_OR_SUBRANGE;
        this.fileOrPatternSpec = str;
    }

    public final String getFileOrPatternSpec() {
        return this.fileOrPatternSpec;
    }

    public final Mode getMode() {
        return this.mode;
    }

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource
    public final FileBasedSource<T> createSourceForSubrange(long j, long j2) {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot split a file pattern based source based on positions");
        Preconditions.checkArgument(j >= getStartOffset(), "Start offset value " + j + " of the subrange cannot be smaller than the start offset value " + getStartOffset() + " of the parent source");
        Preconditions.checkArgument(j2 <= getEndOffset(), "End offset value " + j2 + " of the subrange cannot be larger than the end offset value " + getEndOffset() + " of the parent source");
        FileBasedSource<T> createForSubrangeOfFile = createForSubrangeOfFile(this.fileOrPatternSpec, j, j2);
        if (j > 0 || j2 != Long.MAX_VALUE) {
            Preconditions.checkArgument(createForSubrangeOfFile.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [" + j + AnsiRenderer.CODE_LIST_SEPARATOR + j2 + ") must be a subrange source");
        }
        return createForSubrangeOfFile;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FileBasedSource<T> createForSubrangeOfFile(String str, long j, long j2);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract FileBasedReader<T> createSingleFileReader(PipelineOptions pipelineOptions);

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource, com.google.cloud.dataflow.sdk.io.BoundedSource
    public final long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
        long estimatedSizeOfFilesBySampling;
        IOChannelFactory factory = IOChannelUtils.getFactory(this.fileOrPatternSpec);
        if (this.mode != Mode.FILEPATTERN) {
            return Math.min(getEndOffset(), getMaxEndOffset(pipelineOptions)) - getStartOffset();
        }
        long currentTimeMillis = System.currentTimeMillis();
        Collection<String> match = factory.match(this.fileOrPatternSpec);
        if (match.size() <= 100) {
            estimatedSizeOfFilesBySampling = getExactTotalSizeOfFiles(match, factory);
            LOG.debug("Size estimation of all files of pattern " + this.fileOrPatternSpec + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        } else {
            estimatedSizeOfFilesBySampling = getEstimatedSizeOfFilesBySampling(match, factory);
            LOG.debug("Size estimation of pattern " + this.fileOrPatternSpec + " by sampling took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        }
        return estimatedSizeOfFilesBySampling;
    }

    private static long getExactTotalSizeOfFiles(Collection<String> collection, IOChannelFactory iOChannelFactory) throws Exception {
        ArrayList arrayList = new ArrayList();
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(128));
        long j = 0;
        try {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                arrayList.add(createFutureForSizeEstimation(it.next(), iOChannelFactory, listeningDecorator));
            }
            Iterator it2 = ((List) Futures.allAsList(arrayList).get()).iterator();
            while (it2.hasNext()) {
                j += ((Long) it2.next()).longValue();
            }
            return j;
        } finally {
            listeningDecorator.shutdown();
        }
    }

    private static ListenableFuture<Long> createFutureForSizeEstimation(final String str, final IOChannelFactory iOChannelFactory, ListeningExecutorService listeningExecutorService) {
        return listeningExecutorService.submit((Callable) new Callable<Long>() { // from class: com.google.cloud.dataflow.sdk.io.FileBasedSource.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                return Long.valueOf(IOChannelFactory.this.getSizeBytes(str));
            }
        });
    }

    private static long getEstimatedSizeOfFilesBySampling(Collection<String> collection, IOChannelFactory iOChannelFactory) throws Exception {
        int max = Math.max(100, (int) (FRACTION_OF_FILES_TO_STAT * collection.size()));
        ArrayList arrayList = new ArrayList(collection);
        Collections.shuffle(arrayList);
        return (collection.size() * getExactTotalSizeOfFiles(arrayList.subList(0, max), iOChannelFactory)) / r0.size();
    }

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource, com.google.cloud.dataflow.sdk.io.Source, com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern"));
    }

    private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(final String str, final long j, final PipelineOptions pipelineOptions, ListeningExecutorService listeningExecutorService) {
        return listeningExecutorService.submit((Callable) new Callable<List<? extends FileBasedSource<T>>>() { // from class: com.google.cloud.dataflow.sdk.io.FileBasedSource.2
            @Override // java.util.concurrent.Callable
            public List<? extends FileBasedSource<T>> call() throws Exception {
                return FileBasedSource.this.createForSubrangeOfFile(str, 0L, Long.MAX_VALUE).splitIntoBundles(j, pipelineOptions);
            }
        });
    }

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource, com.google.cloud.dataflow.sdk.io.BoundedSource
    public final List<? extends FileBasedSource<T>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
        if (this.mode != Mode.FILEPATTERN) {
            if (!isSplittable()) {
                LOG.debug("The source for file " + this.fileOrPatternSpec + " is not split into sub-range based sources since the file is not seekable");
                return ImmutableList.of(this);
            }
            ArrayList arrayList = new ArrayList();
            Iterator<? extends OffsetBasedSource<T>> it = super.splitIntoBundles(j, pipelineOptions).iterator();
            while (it.hasNext()) {
                arrayList.add((FileBasedSource) it.next());
            }
            return arrayList;
        }
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList2 = new ArrayList();
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(128));
        try {
            Iterator<String> it2 = expandFilePattern(this.fileOrPatternSpec).iterator();
            while (it2.hasNext()) {
                arrayList2.add(createFutureForFileSplit(it2.next(), j, pipelineOptions, listeningDecorator));
            }
            ImmutableList copyOf = ImmutableList.copyOf(Iterables.concat((Iterable) Futures.allAsList(arrayList2).get()));
            LOG.debug("Splitting the source based on file pattern " + this.fileOrPatternSpec + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            listeningDecorator.shutdown();
            return copyOf;
        } catch (Throwable th) {
            listeningDecorator.shutdown();
            throw th;
        }
    }

    protected boolean isSplittable() throws Exception {
        return IOChannelUtils.getFactory(this.fileOrPatternSpec).isReadSeekEfficient(this.fileOrPatternSpec);
    }

    @Override // com.google.cloud.dataflow.sdk.io.BoundedSource
    public final BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
        long j;
        validate();
        if (this.mode != Mode.FILEPATTERN) {
            return createSingleFileReader(pipelineOptions);
        }
        long currentTimeMillis = System.currentTimeMillis();
        Collection<String> expandFilePattern = expandFilePattern(this.fileOrPatternSpec);
        ArrayList arrayList = new ArrayList();
        for (String str : expandFilePattern) {
            try {
                j = IOChannelUtils.getFactory(str).getSizeBytes(str);
            } catch (IOException e) {
                LOG.warn("Failed to get size of " + str, (Throwable) e);
                j = Long.MAX_VALUE;
            }
            arrayList.add(createForSubrangeOfFile(str, 0L, j).createSingleFileReader(pipelineOptions));
        }
        LOG.debug("Creating a reader for file pattern " + this.fileOrPatternSpec + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        return arrayList.size() == 1 ? (BoundedSource.BoundedReader) arrayList.get(0) : new FilePatternReader(this, arrayList);
    }

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource
    public String toString() {
        switch (this.mode) {
            case FILEPATTERN:
                return this.fileOrPatternSpec;
            case SINGLE_FILE_OR_SUBRANGE:
                return this.fileOrPatternSpec + " range " + super.toString();
            default:
                throw new IllegalStateException("Unexpected mode: " + this.mode);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource, com.google.cloud.dataflow.sdk.io.Source
    public void validate() {
        super.validate();
        switch (this.mode) {
            case FILEPATTERN:
                Preconditions.checkArgument(getStartOffset() == 0, "FileBasedSource is based on a file pattern or a full single file but the starting offset proposed " + getStartOffset() + " is not zero");
                Preconditions.checkArgument(getEndOffset() == Long.MAX_VALUE, "FileBasedSource is based on a file pattern or a full single file but the ending offset proposed " + getEndOffset() + " is not Long.MAX_VALUE");
                return;
            case SINGLE_FILE_OR_SUBRANGE:
                return;
            default:
                throw new IllegalStateException("Unknown mode: " + this.mode);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.io.OffsetBasedSource
    public final long getMaxEndOffset(PipelineOptions pipelineOptions) throws Exception {
        if (this.mode == Mode.FILEPATTERN) {
            throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
        }
        return getEndOffset() == Long.MAX_VALUE ? IOChannelUtils.getFactory(this.fileOrPatternSpec).getSizeBytes(this.fileOrPatternSpec) : getEndOffset();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static final Collection<String> expandFilePattern(String str) throws IOException {
        Collection<String> match = IOChannelUtils.getFactory(str).match(str);
        LOG.info("Matched {} files for pattern {}", Integer.valueOf(match.size()), str);
        return match;
    }
}
