package org.apache.cassandra.schema;

import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import com.datastax.dse.byos.shade.com.google.common.base.Preconditions;
import com.datastax.dse.byos.shade.com.google.common.collect.Sets;
import com.datastax.dse.byos.shade.com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.PropertyConfiguration;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verbs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/cassandra/schema/PullRequestScheduler.class */
public final class PullRequestScheduler {
    private static final int MAX_PULL_REQUESTS_CONCURRENCY;
    private static final int PULL_REQUEST_WAIT_IN_SECONDS;
    private static final int MAX_RETRIES;
    private static final int INITIAL_RETRY_DELAY_IN_MILLISECONDS;
    private static final int MAX_RETRY_DELAY_IN_MILLISECONDS;
    private static final Logger logger;

    @VisibleForTesting
    final Set<PullRequestFuture> scheduledPullRequests;

    @VisibleForTesting
    final Map<PullRequestFuture, InetAddress> pendingPullRequests;

    @VisibleForTesting
    final Map<InetAddress, PullRequestFuture> inFlightPullRequests;

    @VisibleForTesting
    final ScheduledExecutorService pullRequestExecutor;
    private final PullRequestSender pullRequestSender;
    private final int maxConcurrency;
    private final int maxRetries;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/schema/PullRequestScheduler$PullRequestFuture.class */
    public static class PullRequestFuture extends CompletableFuture<Void> {
        private final AtomicInteger retriesAttempted;
        private final long baseDelayInMillis;
        private final long maxBackoffTimeInMillis;
        private final int maxRetries;

        private PullRequestFuture(int i, int i2, int i3) {
            Preconditions.checkArgument(i >= 0);
            Preconditions.checkArgument(i2 >= 0);
            Preconditions.checkArgument(i3 >= 0);
            this.maxRetries = i;
            this.baseDelayInMillis = i2;
            this.maxBackoffTimeInMillis = i3;
            this.retriesAttempted = new AtomicInteger(0);
        }

        private void increaseRetryCount() {
            this.retriesAttempted.incrementAndGet();
        }

        private int retriesAttempted() {
            return this.retriesAttempted.get();
        }

        boolean shouldRetry() {
            return !isDone() && retriesAttempted() < this.maxRetries;
        }

        long delayInMillisBeforeNextRetry() {
            long min = Math.min((1 << retriesAttempted()) * this.baseDelayInMillis, this.maxBackoffTimeInMillis);
            increaseRetryCount();
            return min;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/cassandra/schema/PullRequestScheduler$PullRequestSender.class */
    public interface PullRequestSender {
        CompletableFuture<?> sendPullRequest(InetAddress inetAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullRequestScheduler() {
        this(PullRequestScheduler::sendPullRequest, ScheduledExecutors.nonPeriodicTasks, MAX_PULL_REQUESTS_CONCURRENCY, MAX_RETRIES);
    }

    @VisibleForTesting
    PullRequestScheduler(PullRequestSender pullRequestSender, ScheduledExecutorService scheduledExecutorService, int i, int i2) {
        this.scheduledPullRequests = Sets.newLinkedHashSet();
        this.pendingPullRequests = new LinkedHashMap();
        this.inFlightPullRequests = new LinkedHashMap();
        this.pullRequestSender = pullRequestSender;
        this.pullRequestExecutor = scheduledExecutorService;
        this.maxConcurrency = i;
        this.maxRetries = i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void schedulePullRequest(InetAddress inetAddress, long j, TimeUnit timeUnit) {
        schedulePullRequest(inetAddress, createPullRequestFuture(), j, timeUnit);
    }

    private synchronized void schedulePullRequest(InetAddress inetAddress, PullRequestFuture pullRequestFuture, long j, TimeUnit timeUnit) {
        this.scheduledPullRequests.add(pullRequestFuture);
        this.pullRequestExecutor.schedule(() -> {
            tryToSubmitPullRequest(inetAddress, pullRequestFuture);
        }, j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void submitPullRequest(InetAddress inetAddress) {
        tryToSubmitPullRequest(inetAddress, createPullRequestFuture());
    }

    private synchronized void tryToSubmitPullRequest(InetAddress inetAddress, PullRequestFuture pullRequestFuture) {
        this.scheduledPullRequests.remove(pullRequestFuture);
        this.pendingPullRequests.put(pullRequestFuture, inetAddress);
        maybeSendNext();
    }

    private synchronized void maybeSendNext() {
        Iterator<Map.Entry<PullRequestFuture, InetAddress>> it2 = this.pendingPullRequests.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<PullRequestFuture, InetAddress> next = it2.next();
            InetAddress value = next.getValue();
            if (!this.inFlightPullRequests.containsKey(value)) {
                if (this.inFlightPullRequests.size() >= this.maxConcurrency) {
                    return;
                }
                this.inFlightPullRequests.put(value, next.getKey());
                it2.remove();
                this.pullRequestSender.sendPullRequest(value).whenCompleteAsync((obj, th) -> {
                    onCompletePullRequest(value, th);
                }, (Executor) this.pullRequestExecutor);
            }
        }
    }

    private synchronized void onCompletePullRequest(InetAddress inetAddress, Throwable th) {
        PullRequestFuture remove = this.inFlightPullRequests.remove(inetAddress);
        if (th == null) {
            remove.complete(null);
        } else if (remove.shouldRetry()) {
            long delayInMillisBeforeNextRetry = remove.delayInMillisBeforeNextRetry();
            logger.warn(String.format("Error while sending a SCHEMA.PULL request to '%s', retrying in '%d' ms", inetAddress, Long.valueOf(delayInMillisBeforeNextRetry)));
            schedulePullRequest(inetAddress, remove, delayInMillisBeforeNextRetry, TimeUnit.MILLISECONDS);
        } else {
            remove.completeExceptionally(th);
        }
        maybeSendNext();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean awaitPullRequestsUntilReady(BooleanSupplier booleanSupplier) {
        for (CompletableFuture<Void> completableFuture : nonCompletedPullRequest()) {
            if (booleanSupplier.getAsBoolean()) {
                break;
            }
            try {
                Uninterruptibles.getUninterruptibly(completableFuture, PULL_REQUEST_WAIT_IN_SECONDS, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error("Error while waiting for schema PULL request to finish.", (Throwable) e);
            }
        }
        return booleanSupplier.getAsBoolean();
    }

    private synchronized Set<CompletableFuture<Void>> nonCompletedPullRequest() {
        LinkedHashSet linkedHashSet = new LinkedHashSet(this.inFlightPullRequests.values());
        linkedHashSet.addAll(this.pendingPullRequests.keySet());
        linkedHashSet.addAll(this.scheduledPullRequests);
        return linkedHashSet;
    }

    private PullRequestFuture createPullRequestFuture() {
        return new PullRequestFuture(this.maxRetries, INITIAL_RETRY_DELAY_IN_MILLISECONDS, MAX_RETRY_DELAY_IN_MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<?> sendPullRequest(InetAddress inetAddress) {
        if (!FailureDetector.instance.isAlive(inetAddress)) {
            logger.warn("Can't send schema pull request: node {} is down.", inetAddress);
            return CompletableFuture.completedFuture(null);
        }
        UUID schemaVersion = Gossiper.instance.getSchemaVersion(inetAddress);
        if (schemaVersion == null || !schemaVersion.equals(Schema.instance.getVersion())) {
            logger.debug("Pulling schema from endpoint {}", inetAddress);
            return MessagingService.instance().sendSingleTarget(Verbs.SCHEMA.PULL.newRequest(inetAddress, (InetAddress) PullRequest.create())).whenCompleteAsync((schemaMigration, th) -> {
                if (th != null) {
                    logger.warn("Problem while pulling schema from {}", inetAddress, th);
                    return;
                }
                if (!$assertionsDisabled && schemaMigration == null) {
                    throw new AssertionError();
                }
                try {
                    if (!schemaMigration.isCompatible) {
                        logger.debug("Tried Schema migration from {}, but it has incompatible schema", inetAddress);
                        return;
                    }
                    logger.debug("Got schema migration response from {}", inetAddress);
                    Schema.instance.mergeAndAnnounceVersion(schemaMigration);
                    logger.debug("Merged schema migration response from {}", inetAddress);
                } catch (ConfigurationException e) {
                    logger.error("Configuration exception merging remote schema", (Throwable) e);
                }
            }, (Executor) StageManager.getStage(Stage.MIGRATION));
        }
        logger.debug("Cancelling pull request to {} for local schema version {}.", inetAddress, Schema.instance.getVersion());
        return CompletableFuture.completedFuture(null);
    }

    static {
        $assertionsDisabled = !PullRequestScheduler.class.desiredAssertionStatus();
        MAX_PULL_REQUESTS_CONCURRENCY = PropertyConfiguration.getInteger("dse.schema.max_pull_requests_concurrency", 3, "The maximum number of in-flight schema pull requests.");
        PULL_REQUEST_WAIT_IN_SECONDS = PropertyConfiguration.getInteger("cassandra.migration_task_wait_in_seconds", 1);
        MAX_RETRIES = PropertyConfiguration.getInteger("cassandra.max_pull_requests_retries", 3);
        INITIAL_RETRY_DELAY_IN_MILLISECONDS = PropertyConfiguration.getInteger("cassandra.initial_pull_request_retry_delay_in_ms", 1000);
        MAX_RETRY_DELAY_IN_MILLISECONDS = PropertyConfiguration.getInteger("cassandra.max_pull_request_retry_delay_in_ms", 10000);
        logger = LoggerFactory.getLogger((Class<?>) PullRequestScheduler.class);
    }
}
