package com.datastax.bdp.db.nodesync;

import com.datastax.bdp.db.nodesync.NodeSyncTracing;
import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import com.datastax.dse.byos.shade.com.google.common.collect.Sets;
import com.datastax.dse.byos.shade.com.google.common.primitives.Ints;
import com.datastax.dse.byos.shade.com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.cassandra.concurrent.TPC;
import org.apache.cassandra.concurrent.TPCTaskType;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.NodeSyncConfig;
import org.apache.cassandra.cql3.PageSize;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.NodeSyncReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadContext;
import org.apache.cassandra.db.ReadReconciliationObserver;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.FlowablePartition;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.RequestTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.exceptions.UnknownKeyspaceException;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ReadRepairDecision;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.flow.Flow;
import org.apache.cassandra.utils.flow.Threads;
import org.apache.cassandra.utils.units.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/datastax/bdp/db/nodesync/Validator.class */
public class Validator {
    private static final Logger logger;
    private final ValidationLifecycle lifecycle;
    private final RateLimiter limiter;
    private final PageSize pageSize;
    protected final Observer observer;
    protected final int replicationFactor;
    private final Set<InetAddress> segmentReplicas;
    private CompletableFuture<Void> flowFuture;

    @Nullable
    private volatile Set<InetAddress> missingNodes;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ValidationMetrics metrics = new ValidationMetrics();
    private final Future completionFuture = new Future();
    private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);
    private volatile ValidationOutcome validationOutcome = ValidationOutcome.FULL_IN_SYNC;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/bdp/db/nodesync/Validator$Future.class */
    public class Future extends CompletableFuture<ValidationInfo> {
        private Future() {
        }

        Validator validator() {
            return Validator.this;
        }

        @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return Validator.this.cancel("cancelled through validation future");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void actuallyCancel() {
            super.cancel(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:com/datastax/bdp/db/nodesync/Validator$Observer.class */
    public class Observer implements ReadReconciliationObserver {
        private volatile boolean isComplete;
        private volatile boolean hasMismatch;
        private volatile long limiterWaitTimeMicrosForPage;
        private volatile long dataValidatedBytesForPage;
        private volatile long responseReceivedTimeNanos;
        private volatile long timeIdleBeforeProcessingPage;
        private volatile ValidationMetrics pageMetrics;
        static final /* synthetic */ boolean $assertionsDisabled;

        protected Observer() {
        }

        @VisibleForTesting
        protected void onNewPage() {
            this.hasMismatch = false;
            this.limiterWaitTimeMicrosForPage = 0L;
            this.dataValidatedBytesForPage = 0L;
            this.timeIdleBeforeProcessingPage = 0L;
            if (Validator.this.tracing().isEnabled()) {
                this.pageMetrics = new ValidationMetrics();
            }
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void queried(Collection<InetAddress> collection) {
            Validator.this.tracing().trace("Querying {} replica: {}", Integer.valueOf(collection.size()), collection);
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void responsesReceived(Collection<InetAddress> collection) {
            this.responseReceivedTimeNanos = System.nanoTime();
            this.isComplete = collection.size() == Validator.this.replicationFactor;
            if (this.isComplete) {
                Validator.this.tracing().trace("All replica responded ({})", collection);
                return;
            }
            if (Validator.this.missingNodes == null) {
                Validator.this.missingNodes = Sets.newConcurrentHashSet();
            }
            Sets.SetView difference = Sets.difference(Validator.this.segmentReplicas, setOf(collection));
            Validator.this.missingNodes.addAll(difference);
            if (Validator.this.tracing().isEnabled()) {
                if (difference.isEmpty()) {
                    Validator.this.tracing().trace("Partial responses: {} responded ({}) but RF={}", Integer.valueOf(collection.size()), collection, Integer.valueOf(Validator.this.replicationFactor));
                } else {
                    Validator.this.tracing().trace("Partial responses: {} responded ({}) but {} did not ({})", Integer.valueOf(collection.size()), collection, Integer.valueOf(difference.size()), difference);
                }
            }
        }

        private Set<InetAddress> setOf(Collection<InetAddress> collection) {
            return collection instanceof Set ? (Set) collection : new HashSet(collection);
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onDigestMatch() {
            this.hasMismatch = false;
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onDigestMismatch() {
            this.hasMismatch = true;
            Validator.this.tracing().trace("Digest mismatch, issuing full data request");
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onPartition(DecoratedKey decoratedKey) {
            onData(decoratedKey.getKey().remaining(), true);
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onPartitionDeletion(DeletionTime deletionTime, boolean z) {
            onRangeTombstoneMarker(null, z);
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onRow(Row row, boolean z) {
            Validator.this.metrics.incrementRowsRead(z);
            if (this.pageMetrics != null) {
                this.pageMetrics.incrementRowsRead(z);
            }
            onData(row.dataSize(), z);
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onRangeTombstoneMarker(RangeTombstoneMarker rangeTombstoneMarker, boolean z) {
            Validator.this.metrics.incrementRangeTombstoneMarkersRead(z);
            if (this.pageMetrics != null) {
                this.pageMetrics.incrementRangeTombstoneMarkersRead(z);
            }
            onData(dataSize(rangeTombstoneMarker), z);
        }

        @VisibleForTesting
        protected void onData(int i, boolean z) {
            if (!$assertionsDisabled && TPC.isTPCThread()) {
                throw new AssertionError();
            }
            if (this.timeIdleBeforeProcessingPage == 0) {
                this.timeIdleBeforeProcessingPage = System.nanoTime() - this.responseReceivedTimeNanos;
            }
            Validator.this.metrics.addDataValidated(i, z);
            if (this.pageMetrics != null) {
                this.pageMetrics.addDataValidated(i, z);
            }
            this.dataValidatedBytesForPage += i;
            this.limiterWaitTimeMicrosForPage = (long) (this.limiterWaitTimeMicrosForPage + ((i > 0 ? Validator.this.limiter.acquire(i) : 0.0d) * TimeUnit.SECONDS.toMicros(1L)));
        }

        private int dataSize(RangeTombstoneMarker rangeTombstoneMarker) {
            return 12 + (rangeTombstoneMarker == null ? 0 : rangeTombstoneMarker.clustering().dataSize());
        }

        @Override // org.apache.cassandra.db.ReadReconciliationObserver
        public void onRepair(InetAddress inetAddress, PartitionUpdate partitionUpdate) {
            Validator.this.metrics.addRepair(partitionUpdate);
            if (this.pageMetrics != null) {
                this.pageMetrics.addRepair(partitionUpdate);
            }
        }

        static {
            $assertionsDisabled = !Validator.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/bdp/db/nodesync/Validator$PageProcessingListener.class */
    public interface PageProcessingListener {
        void onNewPage();

        void onPageComplete(long j, long j2, long j3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/db/nodesync/Validator$State.class */
    public enum State {
        CREATED,
        RUNNING,
        FINISHED,
        CANCELLED;

        boolean isDone() {
            return this == FINISHED || this == CANCELLED;
        }
    }

    protected Validator(ValidationLifecycle validationLifecycle) {
        this.lifecycle = validationLifecycle;
        NodeSyncConfig config = validationLifecycle.service().config();
        this.limiter = config.rateLimiter;
        this.pageSize = new PageSize(Ints.checkedCast(config.getPageSize(SizeUnit.BYTES)), PageSize.PageUnit.BYTES);
        this.observer = new Observer();
        AbstractReplicationStrategy replicationStrategy = Keyspace.open(table().keyspace).getReplicationStrategy();
        this.replicationFactor = replicationStrategy.getReplicationFactor();
        this.segmentReplicas = new HashSet(replicationStrategy.getNaturalEndpoints(range().left));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Validator create(ValidationLifecycle validationLifecycle) {
        try {
            return new Validator(validationLifecycle);
        } catch (UnknownKeyspaceException e) {
            logger.debug("Ignoring NodeSync validation on keyspace {} as it has just been dropped", e.keyspaceName);
            validationLifecycle.cancel(String.format("Keyspace %s was dropped concurrently", e.keyspaceName));
            return null;
        } catch (Exception e2) {
            validationLifecycle.cancel(String.format("Unexpected error \"%s\" on validator creation", e2.getMessage()));
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future completionFuture() {
        return this.completionFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Segment segment() {
        return this.lifecycle.segment();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ValidationMetrics metrics() {
        return this.metrics;
    }

    private TableMetadata table() {
        return segment().table;
    }

    private Range<Token> range() {
        return segment().range;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public NodeSyncTracing.SegmentTracing tracing() {
        return this.lifecycle.tracing();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future executeOn(ValidationExecutor validationExecutor) {
        if (!this.state.compareAndSet(State.CREATED, State.RUNNING)) {
            if (this.state.get() == State.CANCELLED) {
                return this.completionFuture;
            }
            throw new IllegalStateException("Cannot call executeOn multiple times");
        }
        try {
            NodeSyncReadCommand nodeSyncReadCommand = new NodeSyncReadCommand(segment(), NodeSyncHelpers.time().currentTimeSeconds(), validationExecutor.asScheduler());
            QueryPager createPager = createPager(nodeSyncReadCommand);
            ReadContext build = ReadContext.builder(nodeSyncReadCommand, ConsistencyLevel.TWO).useDigests().blockForAllTargets().observer(this.observer).readRepairDecision(ReadRepairDecision.GLOBAL).readRepairTimeoutInMs(2 * DatabaseDescriptor.getWriteRpcTimeout()).build(System.nanoTime());
            logger.trace("Starting execution of validation on {}", segment());
            this.flowFuture = fetchAll(validationExecutor, createPager, build).lift(Threads.requestOn(validationExecutor.asScheduler(), TPCTaskType.NODESYNC_VALIDATION)).flatProcess((v0) -> {
                return v0.content();
            }).processToFuture().handleAsync((r6, th) -> {
                if (th == null) {
                    markFinished();
                    return null;
                }
                handleError(th, validationExecutor);
                return null;
            }, (Executor) validationExecutor.asExecutor());
        } catch (Throwable th2) {
            handleError(th2, validationExecutor);
        }
        return this.completionFuture;
    }

    @VisibleForTesting
    protected QueryPager createPager(ReadCommand readCommand) {
        return readCommand.getPager(null, ProtocolVersion.CURRENT);
    }

    private Flow<FlowablePartition> fetchAll(ValidationExecutor validationExecutor, QueryPager queryPager, ReadContext readContext) {
        return fetchPage(validationExecutor, queryPager, readContext).concatWith(() -> {
            if (isDone(queryPager)) {
                return null;
            }
            return fetchPage(validationExecutor, queryPager, readContext.forNewQuery(System.nanoTime()));
        });
    }

    private Flow<FlowablePartition> fetchPage(ValidationExecutor validationExecutor, QueryPager queryPager, ReadContext readContext) {
        if (!$assertionsDisabled && queryPager.isExhausted()) {
            throw new AssertionError();
        }
        this.lifecycle.onNewPage(this.pageSize);
        this.observer.onNewPage();
        validationExecutor.onNewPage();
        return Flow.concat(queryPager.fetchPage(this.pageSize, readContext), Flow.defer(() -> {
            onPageComplete(validationExecutor);
            return Flow.empty();
        }));
    }

    private boolean isDone(QueryPager queryPager) {
        return queryPager.isExhausted() || this.state.get() == State.CANCELLED;
    }

    private void onPageComplete(ValidationExecutor validationExecutor) {
        ValidationOutcome completed = ValidationOutcome.completed(!this.observer.isComplete, this.observer.hasMismatch);
        recordPage(completed, validationExecutor);
        this.lifecycle.onCompletedPage(completed, this.observer.pageMetrics);
    }

    private void handleError(Throwable th, PageProcessingListener pageProcessingListener) {
        Throwable unwrapped = Throwables.unwrapped(th);
        if (unwrapped instanceof CancellationException) {
            return;
        }
        if (unwrapped instanceof InvalidatedNodeSyncStateException) {
            cancel("Validation invalidated by either a topology change or a depth decrease. The segment will be retried.");
            return;
        }
        if (isException(unwrapped, UnknownKeyspaceException.class, RequestFailureReason.UNKNOWN_KEYSPACE)) {
            cancel(String.format("Keyspace %s was dropped while validating", table().keyspace));
            return;
        }
        if (isException(unwrapped, UnknownTableException.class, RequestFailureReason.UNKNOWN_TABLE)) {
            cancel(String.format("Table %s was dropped while validating", table()));
            return;
        }
        if (isException(unwrapped, UnknownColumnException.class, RequestFailureReason.UNKNOWN_COLUMN)) {
            cancel(String.format("A column in table %s was added or dropped while validating: the segment will be retried.", table()));
            return;
        }
        if ((unwrapped instanceof UnavailableException) || (unwrapped instanceof RequestTimeoutException)) {
            tracing().trace("Unable to complete page (and validation): {}", unwrapped.getMessage());
            recordPage(ValidationOutcome.UNCOMPLETED, pageProcessingListener);
            markFinished();
        } else {
            tracing().trace("Unexpected error: {}", unwrapped.getMessage());
            logger.error("Unexpected error during synchronization of table {} on range {}", new Object[]{table(), range(), unwrapped});
            recordPage(ValidationOutcome.FAILED, pageProcessingListener);
            markFinished();
        }
    }

    private static boolean isException(Throwable th, Class<?> cls, RequestFailureReason requestFailureReason) {
        if (cls.isInstance(th)) {
            return true;
        }
        if (th instanceof RequestFailureException) {
            return ((RequestFailureException) th).failureReasonByEndpoint.values().stream().anyMatch(requestFailureReason2 -> {
                return requestFailureReason2 == requestFailureReason;
            });
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean cancel(String str) {
        State andUpdate = this.state.getAndUpdate(state -> {
            return state.isDone() ? state : State.CANCELLED;
        });
        if (andUpdate.isDone()) {
            return andUpdate == State.CANCELLED;
        }
        doCancel(str);
        return true;
    }

    private void doCancel(String str) {
        try {
            this.lifecycle.cancel(str);
        } finally {
            if (this.flowFuture != null) {
                this.flowFuture.cancel(true);
            }
            this.completionFuture.actuallyCancel();
        }
    }

    private void markFinished() {
        if (this.lifecycle.isInvalid()) {
            cancel("Validation invalidated by either a topology change or a depth decrease. The segment will be retried.");
            return;
        }
        if (this.state.getAndUpdate(state -> {
            return state.isDone() ? state : State.FINISHED;
        }).isDone()) {
            return;
        }
        try {
            this.lifecycle.service().updateMetrics(table(), this.metrics);
            ValidationInfo validationInfo = new ValidationInfo(this.lifecycle.startTime(), this.validationOutcome, this.missingNodes);
            this.lifecycle.onCompletion(validationInfo, this.metrics);
            this.completionFuture.complete(validationInfo);
        } catch (Throwable th) {
            doCancel("Failed to mark validation finished due to " + th.getMessage());
        }
    }

    private void recordPage(ValidationOutcome validationOutcome, PageProcessingListener pageProcessingListener) {
        this.metrics.addPageOutcome(validationOutcome);
        this.validationOutcome = this.validationOutcome.composeWith(validationOutcome);
        pageProcessingListener.onPageComplete(this.observer.dataValidatedBytesForPage, this.observer.limiterWaitTimeMicrosForPage, this.observer.timeIdleBeforeProcessingPage);
    }

    static {
        $assertionsDisabled = !Validator.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(Validator.class);
    }
}
