package com.datastax.bdp.gcore.netmsg;

import com.datastax.bdp.dht.Router;
import com.datastax.bdp.dht.RoutingPlan;
import com.datastax.bdp.dht.SetCoverFinder;
import com.datastax.bdp.dht.SetCoverFinderConfig;
import com.datastax.bdp.dht.endpoint.Endpoint;
import com.datastax.bdp.dht.endpoint.SeededComparator;
import com.datastax.bdp.gcore.context.Context;
import com.datastax.bdp.graph.config.ConfigurationDefinitions;
import com.datastax.bdp.graph.impl.DseGraphCounter;
import com.datastax.bdp.graph.impl.data.DDLQueryBuilder;
import com.datastax.bdp.node.transport.ClientContext;
import com.datastax.bdp.node.transport.Message;
import com.datastax.bdp.node.transport.MessageType;
import com.datastax.bdp.node.transport.internode.InternodeClient;
import com.datastax.bdp.snitch.EndpointStateTracker;
import com.datastax.dse.byos.shade.com.google.common.collect.ArrayListMultimap;
import com.datastax.dse.byos.shade.com.google.common.collect.ImmutableMap;
import com.datastax.dse.byos.shade.com.google.common.collect.Multimap;
import com.datastax.dse.byos.shade.javax.inject.Inject;
import io.netty.channel.Channel;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/bdp/gcore/netmsg/BasicMessengerImpl.class */
public class BasicMessengerImpl implements BasicMessenger {
    private static final Logger log = LoggerFactory.getLogger(BasicMessengerImpl.class);
    private final InternodeClient messagingClient;
    private final Router router;
    private final int comparatorSeed;
    private final AtomicReference<SeededComparator<Endpoint>> endpointComparatorRef;
    private final ScheduledExecutorService scheduler;
    private final Duration requestTimeout;
    private final ConcurrentMap<Endpoint, EndpointPerformanceLog> perfLogs;
    private final IPartitioner partitioner;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/gcore/netmsg/BasicMessengerImpl$SmartClientContext.class */
    public static class SmartClientContext<A> extends ClientContext<A> {
        private AnnotatedFuture<?, A> future;

        public SmartClientContext(AnnotatedFuture<?, A> annotatedFuture) {
            this.future = annotatedFuture;
        }

        public void onResponse(A a) {
            this.future.complete(a);
        }

        public void onError(Channel channel, Throwable th) {
            this.future.completeExceptionally(th);
        }
    }

    @Inject
    public BasicMessengerImpl(InternodeClient internodeClient, DseGraphCounter dseGraphCounter, Context context) {
        this(internodeClient, new ScheduledThreadPoolExecutor(4), (Duration) context.get(ConfigurationDefinitions.GRAPH_MSG_TIMEOUT, new String[0]), DatabaseDescriptor.getPartitioner(), makeAndRegisterRouter(dseGraphCounter));
    }

    private static Router makeAndRegisterRouter(DseGraphCounter dseGraphCounter) {
        Callable callable = () -> {
            Set<String> keyspaceNames = dseGraphCounter.getKeyspaceNames();
            HashSet hashSet = new HashSet(keyspaceNames.size() * 2);
            for (String str : keyspaceNames) {
                hashSet.add(str);
                hashSet.add(DDLQueryBuilder.Keyspace.Pvt.keyspace(str));
            }
            return hashSet;
        };
        EndpointStateTracker endpointStateTracker = EndpointStateTracker.instance;
        endpointStateTracker.getClass();
        IEndpointStateChangeSubscriber router = new Router(callable, endpointStateTracker::getIsGraphServer);
        Gossiper.instance.register(router);
        router.update();
        return router;
    }

    public BasicMessengerImpl(InternodeClient internodeClient, ScheduledExecutorService scheduledExecutorService, Duration duration, IPartitioner iPartitioner, Router router) {
        this.perfLogs = new ConcurrentHashMap();
        this.comparatorSeed = 0;
        this.endpointComparatorRef = new AtomicReference<>(new SeededComparator(new TiedEndpointComparator(), this.comparatorSeed));
        this.messagingClient = internodeClient;
        this.router = router;
        this.scheduler = scheduledExecutorService;
        this.requestTimeout = duration;
        this.partitioner = iPartitioner;
        this.scheduler.scheduleAtFixedRate(this::updateComparator, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.datastax.bdp.gcore.netmsg.BasicMessenger
    public <Q, A, R, X> List<AnnotatedFuture<X, A>> sendTargeted(String str, ConversationType<Q, A, R> conversationType, Iterable<X> iterable, Function<X, Token> function, Function<Collection<X>, Q> function2) {
        Multimap<Endpoint, X> routeMessages = routeMessages(str, iterable, function);
        ArrayList arrayList = new ArrayList(routeMessages.size());
        for (Endpoint endpoint : routeMessages.keys()) {
            ArrayList arrayList2 = new ArrayList(routeMessages.get(endpoint));
            Q apply = function2.apply(arrayList2);
            AnnotatedFuture<X, A> annotatedFuture = new AnnotatedFuture<>(arrayList2);
            sendToOneEndpoint(annotatedFuture, endpoint, conversationType, apply);
            arrayList.add(annotatedFuture);
        }
        return arrayList;
    }

    @Override // com.datastax.bdp.gcore.netmsg.BasicMessenger
    public <Q, A, R> List<AnnotatedFuture<Range<Token>, A>> sendBroadcast(String str, ConversationType<Q, A, R> conversationType, Function<Collection<Range<Token>>, Q> function) {
        AcceptAllFilter acceptAllFilter = new AcceptAllFilter();
        SeededComparator<Endpoint> seededComparator = this.endpointComparatorRef.get();
        RoutingPlan routingPlan = null;
        try {
            routingPlan = this.router.route(str, seededComparator, acceptAllFilter, coverFinderConfigForMessenger());
        } catch (IllegalArgumentException e) {
            this.router.refreshEndpoints();
        }
        if (null == routingPlan) {
            routingPlan = this.router.route(str, seededComparator, acceptAllFilter, coverFinderConfigForMessenger());
        }
        Set<Endpoint> endpoints = routingPlan.getEndpoints();
        ArrayList arrayList = new ArrayList(endpoints.size());
        for (Endpoint endpoint : endpoints) {
            Collection<Range<Token>> providedTokenRanges = endpoint.getProvidedTokenRanges();
            Q apply = function.apply(providedTokenRanges);
            AnnotatedFuture annotatedFuture = new AnnotatedFuture(providedTokenRanges);
            sendToOneEndpoint(annotatedFuture, endpoint, conversationType, apply);
            arrayList.add(annotatedFuture);
        }
        return arrayList;
    }

    private SetCoverFinderConfig coverFinderConfigForMessenger() {
        return new SetCoverFinderConfig(SetCoverFinder.Kind.DYNAMIC, 0);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }

    private <X> Multimap<Endpoint, X> routeMessages(String str, Iterable<X> iterable, Function<X, Token> function) {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (X x : iterable) {
            create.put(function.apply(x), x);
        }
        TokenListFilter tokenListFilter = new TokenListFilter(create.keySet());
        RoutingPlan route = this.router.route(str, this.endpointComparatorRef.get(), tokenListFilter, coverFinderConfigForMessenger());
        ArrayListMultimap create2 = ArrayListMultimap.create();
        for (K k : create.keySet()) {
            for (Endpoint endpoint : route.getEndpoints()) {
                Iterator it2 = endpoint.getProvidedTokenRanges().iterator();
                while (it2.hasNext()) {
                    if (((Range) it2.next()).contains((Range) k)) {
                        create2.putAll(endpoint, create.get((ArrayListMultimap) k));
                    }
                }
            }
        }
        return create2;
    }

    private <Q, A, R, X> void sendToOneEndpoint(AnnotatedFuture<X, A> annotatedFuture, Endpoint endpoint, ConversationType<Q, A, R> conversationType, Q q) {
        MessageType queryType = conversationType.getQueryType();
        Instant now = Instant.now();
        SmartClientContext smartClientContext = new SmartClientContext(annotatedFuture);
        log.debug("Sending {} (typecode {}) query with id#{} to {}", new Object[]{conversationType, queryType, Long.valueOf(((ClientContext) smartClientContext).id), endpoint.getAddress().getHostAddress()});
        this.messagingClient.sendAsync(endpoint.getAddress(), smartClientContext, new Message(((ClientContext) smartClientContext).id, queryType, q));
        annotatedFuture.whenComplete((BiConsumer<? super A, ? super Throwable>) new ResponseTimeLogger(now, this.perfLogs.computeIfAbsent(endpoint, EndpointPerformanceLog::new)));
        ScheduledFuture<?> schedule = this.scheduler.schedule(new FutureTimeoutEnforcer(annotatedFuture, "no reply from %s after at least %s", endpoint, this.requestTimeout), this.requestTimeout.toMillis(), TimeUnit.MILLISECONDS);
        annotatedFuture.thenRun(() -> {
            schedule.cancel(false);
        });
    }

    private void updateComparator() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        this.perfLogs.forEach((endpoint, endpointPerformanceLog) -> {
            builder.put(endpoint, endpointPerformanceLog.copy());
        });
        this.endpointComparatorRef.set(new SeededComparator<>(new ResponseTimeEndpointComparator(builder.build()), this.comparatorSeed));
    }
}
