package org.apache.cassandra.io.sstable.format;

import com.datastax.bdp.db.utils.leaks.detection.LeaksDetector;
import com.datastax.bdp.db.utils.leaks.detection.LeaksDetectorFactory;
import com.datastax.bdp.db.utils.leaks.detection.LeaksTracker;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.FlowableUnfilteredPartition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileAccessType;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.Rebufferer;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UnmodifiableArrayList;
import org.apache.cassandra.utils.flow.Flow;
import org.apache.cassandra.utils.flow.FlowSource;
import org.apache.cassandra.utils.flow.FlowSubscriber;
import org.apache.cassandra.utils.flow.FlowSubscription;
import org.apache.cassandra.utils.flow.FlowSubscriptionRecipient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/io/sstable/format/AsyncSSTableScanner.class */
public class AsyncSSTableScanner extends FlowSource<FlowableUnfilteredPartition> implements FlowSubscriptionRecipient {
    private static final Logger logger;
    private static final LeaksDetector<FlowSource> leaksDetector;
    private final SSTableReader sstable;
    private final List<AbstractBounds<PartitionPosition>> ranges;
    private final ColumnFilter columns;
    private final DataRange dataRange;
    private final SSTableReadsListener listener;

    @Nullable
    private volatile RandomAccessReader dfile;

    @Nullable
    private volatile Flow<FlowableUnfilteredPartition> sourceFlow;

    @Nullable
    private volatile FlowSubscription source;

    @Nullable
    private volatile LeaksTracker<FlowSource> leak;
    static final /* synthetic */ boolean $assertionsDisabled;

    private AsyncSSTableScanner(SSTableReader sSTableReader, ColumnFilter columnFilter, DataRange dataRange, List<AbstractBounds<PartitionPosition>> list, SSTableReadsListener sSTableReadsListener) {
        if (!$assertionsDisabled && sSTableReader == null) {
            throw new AssertionError();
        }
        this.sstable = sSTableReader;
        this.columns = columnFilter;
        this.dataRange = dataRange;
        this.ranges = list;
        this.listener = sSTableReadsListener;
    }

    public static AsyncSSTableScanner getScanner(SSTableReader sSTableReader) {
        return getScanner(sSTableReader, (List<AbstractBounds<PartitionPosition>>) UnmodifiableArrayList.of(SSTableScanner.fullRange(sSTableReader)));
    }

    public static AsyncSSTableScanner getScanner(SSTableReader sSTableReader, Collection<Range<Token>> collection) {
        return getScanner(sSTableReader, SSTableScanner.makeBounds(sSTableReader, collection));
    }

    public static AsyncSSTableScanner getScanner(SSTableReader sSTableReader, List<AbstractBounds<PartitionPosition>> list) {
        return new AsyncSSTableScanner(sSTableReader, ColumnFilter.all(sSTableReader.metadata()), null, list, SSTableReadsListener.NOOP_LISTENER);
    }

    public static AsyncSSTableScanner getScanner(SSTableReader sSTableReader, ColumnFilter columnFilter, DataRange dataRange, SSTableReadsListener sSTableReadsListener) {
        return new AsyncSSTableScanner(sSTableReader, columnFilter, dataRange, SSTableScanner.makeBounds(sSTableReader, dataRange), sSTableReadsListener);
    }

    private Flow<FlowableUnfilteredPartition> flow() {
        return this.ranges.size() == 1 ? this.sstable.coveredKeysFlow(this.dfile, this.ranges.get(0)).flatMap(this::partitions) : Flow.fromIterable(this.ranges).flatMap(abstractBounds -> {
            return this.sstable.coveredKeysFlow(this.dfile, abstractBounds);
        }).flatMap(this::partitions);
    }

    private Flow<FlowableUnfilteredPartition> partitions(IndexFileEntry indexFileEntry) {
        if (this.dataRange == null) {
            return this.sstable.flow(indexFileEntry, this.dfile, this.listener);
        }
        ClusteringIndexFilter clusteringIndexFilter = this.dataRange.clusteringIndexFilter(indexFileEntry.key);
        return this.sstable.flow(indexFileEntry, this.dfile, clusteringIndexFilter.getSlices(this.sstable.metadata()), this.columns, clusteringIndexFilter.isReversed(), this.listener);
    }

    private void lateInitialization() {
        this.dfile = this.sstable.openDataReader(Rebufferer.ReaderConstraint.ASYNC, FileAccessType.SEQUENTIAL);
        this.sourceFlow = flow();
        this.leak = leaksDetector.trackForDebug(this);
        if (logger.isTraceEnabled()) {
            logger.trace("Scanning {} with {}", this.dfile.getPath(), this.dfile);
        }
    }

    @Override // org.apache.cassandra.utils.flow.FlowSubscriptionRecipient
    public void onSubscribe(FlowSubscription flowSubscription) {
        this.source = flowSubscription;
    }

    @Override // org.apache.cassandra.utils.flow.FlowSource, org.apache.cassandra.utils.flow.Flow
    public void requestFirst(FlowSubscriber<FlowableUnfilteredPartition> flowSubscriber, FlowSubscriptionRecipient flowSubscriptionRecipient) {
        subscribe(flowSubscriber, flowSubscriptionRecipient);
        try {
            lateInitialization();
            this.listener.onScanningStarted(this.sstable);
            this.sourceFlow.requestFirst(flowSubscriber, this);
        } catch (Throwable th) {
            flowSubscriber.onError(th);
        }
    }

    @Override // org.apache.cassandra.utils.flow.FlowSubscription
    public void requestNext() {
        this.source.requestNext();
    }

    @Override // org.apache.cassandra.utils.flow.FlowSubscription, java.lang.AutoCloseable
    public void close() throws Exception {
        Throwable closeNonNull = Throwables.closeNonNull((Throwable) null, this.dfile, this.source);
        if (this.leak != null) {
            this.leak.close(this);
        }
        Throwables.maybeFail(closeNonNull);
    }

    @Override // org.apache.cassandra.utils.flow.FlowSource, org.apache.cassandra.utils.flow.Flow
    public String toString() {
        return Flow.formatTrace(getClass().getSimpleName(), this.source);
    }

    static {
        $assertionsDisabled = !AsyncSSTableScanner.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(AsyncSSTableScanner.class);
        leaksDetector = LeaksDetectorFactory.create("AsyncSSTableScanner", FlowSource.class);
    }
}
