package org.apache.cassandra.service;

import com.datastax.dse.byos.shade.com.google.common.base.Throwables;
import com.datastax.dse.byos.shade.com.google.common.collect.ImmutableSet;
import com.datastax.dse.byos.shade.com.google.common.collect.Iterables;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadContext;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.rows.FlowablePartition;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.FailureResponse;
import org.apache.cassandra.net.MessageCallback;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Response;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.flow.DeferredFlow;
import org.apache.cassandra.utils.flow.Flow;
import org.apache.cassandra.utils.time.ApolloTime;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/service/ReadCallback.class */
public class ReadCallback<T> implements MessageCallback<ReadResponse> {
    protected static final Logger logger;
    final ResponseResolver<T> resolver;
    final List<InetAddress> endpoints;
    private final int blockfor;
    private final AtomicInteger received = new AtomicInteger(0);
    private final AtomicInteger failures = new AtomicInteger(0);
    private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap();
    private final DeferredFlow<T> result;
    private final Supplier<Consumer<Flow<T>>> notification;
    private volatile Consumer<Flow<T>> notificationAction;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/service/ReadCallback$AsyncRepairRunner.class */
    public class AsyncRepairRunner implements Runnable {
        private final TraceState traceState;
        static final /* synthetic */ boolean $assertionsDisabled;

        private AsyncRepairRunner(TraceState traceState) {
            this.traceState = traceState;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r4v0, types: [java.lang.Throwable] */
        @Override // java.lang.Runnable
        public void run() {
            try {
                ReadCallback.this.resolver.compareResponses().blockingAwait();
            } catch (Throwable th) {
                boolean z = th instanceof RuntimeException;
                DigestMismatchException digestMismatchException = th;
                if (z) {
                    Throwable cause = th.getCause();
                    digestMismatchException = th;
                    if (cause != null) {
                        digestMismatchException = th.getCause();
                    }
                }
                if (!(digestMismatchException instanceof DigestMismatchException)) {
                    throw Throwables.propagate(digestMismatchException);
                }
                retryOnDigestMismatch(digestMismatchException);
            }
        }

        private void retryOnDigestMismatch(DigestMismatchException digestMismatchException) {
            if (!$assertionsDisabled && !(ReadCallback.this.resolver instanceof DigestResolver)) {
                throw new AssertionError();
            }
            if (this.traceState != null) {
                this.traceState.trace("Digest mismatch: {}", digestMismatchException.toString());
            }
            if (ReadCallback.logger.isTraceEnabled()) {
                ReadCallback.logger.trace("Digest mismatch: {}", digestMismatchException.toString());
            }
            ReadRepairMetrics.repairedBackground.mark();
            Response<ReadResponse> response = ((DigestResolver) ReadCallback.this.resolver).dataResponse;
            if (!$assertionsDisabled && response == null) {
                throw new AssertionError();
            }
            AsyncRepairCallback asyncRepairCallback = new AsyncRepairCallback(new DataResolver(ReadCallback.this.command(), ReadCallback.this.readContext(), ReadCallback.this.endpoints.size()), ReadCallback.this.endpoints.size());
            asyncRepairCallback.onResponse(response);
            MessagingService.instance().send(ReadCallback.this.command().dispatcherTo(ReadCallback.this.subtractTarget(ReadCallback.this.endpoints, response.from())), asyncRepairCallback);
        }

        static {
            $assertionsDisabled = !ReadCallback.class.desiredAssertionStatus();
        }
    }

    private ReadCallback(ResponseResolver<T> responseResolver, List<InetAddress> list) {
        this.resolver = responseResolver;
        this.endpoints = list;
        this.blockfor = responseResolver.ctx.blockFor(list);
        long nanos = TimeUnit.MILLISECONDS.toNanos(command().getTimeout());
        this.notification = () -> {
            return this.notificationAction;
        };
        this.result = DeferredFlow.create(queryStartNanos() + nanos, responseResolver.command.getSchedulerSupplier(), this::generateFlowOnTimeout, this.notification);
        if (logger.isTraceEnabled()) {
            logger.trace("Blockfor is {}; setting up requests to {}", Integer.valueOf(this.blockfor), StringUtils.join(this.endpoints, ","));
        }
        if (readContext().readObserver != null) {
            readContext().readObserver.queried(list);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ReadCallback<T> forResolver(ResponseResolver<T> responseResolver, List<InetAddress> list) {
        return new ReadCallback<>(responseResolver, list);
    }

    public static ReadCallback<FlowablePartition> forInitialRead(ReadCommand readCommand, List<InetAddress> list, ReadContext readContext) {
        return forResolver(readContext.withDigests ? new DigestResolver(readCommand, readContext, list.size()) : new DataResolver(readCommand, readContext, list.size()), list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pair<ReadCallback<FlowablePartition>, Collection<InetAddress>> forDigestMismatchRepair(List<InetAddress> list) {
        if (!$assertionsDisabled && !(this.resolver instanceof DigestResolver)) {
            throw new AssertionError();
        }
        DigestResolver digestResolver = (DigestResolver) this.resolver;
        if (!$assertionsDisabled && !digestResolver.isDataPresent()) {
            throw new AssertionError();
        }
        Response<ReadResponse> response = digestResolver.dataResponse;
        ReadCallback forResolver = forResolver(new DataResolver(command(), readContext(), list.size()), list);
        forResolver.onResponse(response);
        return Pair.create(forResolver, subtractTarget(list, response.from()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<InetAddress> subtractTarget(List<InetAddress> list, InetAddress inetAddress) {
        if (!$assertionsDisabled && list.isEmpty()) {
            throw new AssertionError("We shouldn't have got a mismatch with no targets");
        }
        ArrayList arrayList = new ArrayList(list.size() - 1);
        for (InetAddress inetAddress2 : list) {
            if (!inetAddress2.equals(inetAddress)) {
                arrayList.add(inetAddress2);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadCommand command() {
        return this.resolver.command;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadContext readContext() {
        return this.resolver.ctx;
    }

    private ConsistencyLevel consistency() {
        return this.resolver.consistency();
    }

    private long queryStartNanos() {
        return readContext().queryStartNanos;
    }

    public Flow<T> result() {
        return this.result;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasResult() {
        return this.result.hasSource();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onResult(Consumer<Flow<T>> consumer) {
        this.notificationAction = consumer;
    }

    public int blockFor() {
        return this.blockfor;
    }

    private Flow<T> generateFlowOnSuccess(int i) {
        if (readContext().readObserver != null) {
            readContext().readObserver.responsesReceived(i == this.endpoints.size() ? this.endpoints : ImmutableSet.copyOf(Iterables.transform(this.resolver.getMessages(), (v0) -> {
                return v0.from();
            })));
        }
        try {
            return (this.blockfor == 1 ? this.resolver.getData() : this.resolver.resolve()).doOnError(this::onError);
        } catch (Throwable th) {
            if (logger.isTraceEnabled()) {
                logger.trace("Got error: {}/{}", th.getClass().getName(), th.getMessage());
            }
            return Flow.error(th);
        }
    }

    private Flow<T> generateFlowOnTimeout() {
        int i = this.received.get();
        return (i < readContext().requiredResponses() || !this.resolver.isDataPresent()) ? Flow.error(new ReadTimeoutException(consistency(), i, this.blockfor, this.resolver.isDataPresent())) : generateFlowOnSuccess(i);
    }

    @Override // org.apache.cassandra.net.MessageCallback
    public void onResponse(Response<ReadResponse> response) {
        if (logger.isTraceEnabled()) {
            logger.trace("Received response: {}", response);
        }
        this.resolver.preprocess(response);
        int incrementAndGet = waitingFor(response.from()) ? this.received.incrementAndGet() : this.received.get();
        if (incrementAndGet < this.blockfor || !this.resolver.isDataPresent()) {
            return;
        }
        if (this.result.onSource(generateFlowOnSuccess(incrementAndGet)) && logger.isTraceEnabled()) {
            logger.trace("Read: {} ms.", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(ApolloTime.approximateNanoTime() - queryStartNanos())));
        }
        if (this.blockfor >= this.endpoints.size() || incrementAndGet != this.endpoints.size()) {
            return;
        }
        TraceState traceState = Tracing.instance.get();
        if (traceState != null) {
            traceState.trace("Initiating read-repair");
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Initiating read-repair");
        }
        StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState));
    }

    @Override // org.apache.cassandra.net.MessageCallback
    public void onTimeout(InetAddress inetAddress) {
        if (Tracing.isTracing()) {
            Tracing.trace("Request to host {} timed out", inetAddress.toString());
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Request to host {} timed out", inetAddress.toString());
        }
        this.result.onSource(generateFlowOnTimeout());
    }

    private boolean waitingFor(InetAddress inetAddress) {
        return !consistency().isDatacenterLocal() || DatabaseDescriptor.getEndpointSnitch().isInLocalDatacenter(inetAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assureSufficientLiveNodes() throws UnavailableException {
        consistency().assureSufficientLiveNodes(readContext().keyspace(), this.endpoints);
    }

    @Override // org.apache.cassandra.net.MessageCallback
    public void onFailure(FailureResponse<ReadResponse> failureResponse) {
        int incrementAndGet = waitingFor(failureResponse.from()) ? this.failures.incrementAndGet() : this.failures.get();
        this.failureReasonByEndpoint.put(failureResponse.from(), failureResponse.reason());
        if (Tracing.isTracing()) {
            Tracing.trace("Received failure response: {}. Reason is as follow {}", failureResponse, this.failureReasonByEndpoint);
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Received failure response: {}. Reason is as follow {}", failureResponse, this.failureReasonByEndpoint);
        }
        if (this.blockfor + incrementAndGet <= this.endpoints.size() || this.result.hasSource()) {
            return;
        }
        this.result.onSource(Flow.error(new ReadFailureException(consistency(), this.received.get(), this.blockfor, this.resolver.isDataPresent(), this.failureReasonByEndpoint)));
    }

    private void onError(Throwable th) {
        int i = this.received.get();
        boolean z = th instanceof ReadFailureException;
        if (!(th instanceof ReadTimeoutException) && !z) {
            if ((th instanceof DigestMismatchException) || (th instanceof UnavailableException)) {
                return;
            }
            logger.error("Unexpected error handling read responses for {}. Have received {} of {} responses.", this.resolver.command, Integer.valueOf(i), Integer.valueOf(this.blockfor), th);
            return;
        }
        if (Tracing.isTracing()) {
            String str = i > 0 ? this.resolver.isDataPresent() ? " (including data)" : " (only digests)" : "";
            Object[] objArr = new Object[5];
            objArr[0] = z ? "Failed" : "Timed out";
            objArr[1] = Integer.valueOf(i);
            objArr[2] = Integer.valueOf(this.blockfor);
            objArr[3] = str;
            objArr[4] = this.endpoints;
            Tracing.trace("{}; received {} of {} responses {} from {}", objArr);
        }
        if (logger.isDebugEnabled()) {
            String str2 = i > 0 ? this.resolver.isDataPresent() ? " (including data)" : " (only digests)" : "";
            Logger logger2 = logger;
            Object[] objArr2 = new Object[5];
            objArr2[0] = z ? "Failed" : "Timed out";
            objArr2[1] = Integer.valueOf(i);
            objArr2[2] = Integer.valueOf(this.blockfor);
            objArr2[3] = str2;
            objArr2[4] = this.endpoints;
            logger2.debug("{}; received {} of {} responses {} from {}", objArr2);
        }
    }

    static {
        $assertionsDisabled = !ReadCallback.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) ReadCallback.class);
    }
}
