package org.apache.cassandra.dht;

import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import com.datastax.dse.byos.shade.com.google.common.collect.ArrayListMultimap;
import com.datastax.dse.byos.shade.com.google.common.collect.HashMultimap;
import com.datastax.dse.byos.shade.com.google.common.collect.Multimap;
import com.datastax.dse.byos.shade.com.google.common.collect.Sets;
import java.net.InetAddress;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/dht/RangeStreamer.class */
public class RangeStreamer {
    private static final Logger logger;
    private final Collection<Token> tokens;
    private final TokenMetadata metadata;
    private final InetAddress address;
    private final String description;
    private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
    private final ISourceFilter sourceFilter;
    private final StreamPlan streamPlan;
    private final boolean useStrictConsistency;
    private final IEndpointSnitch snitch;
    private final StreamStateStore stateStore;
    private StreamResultFuture streamFuture;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/cassandra/dht/RangeStreamer$ISourceFilter.class */
    public interface ISourceFilter {
        boolean shouldInclude(InetAddress inetAddress);
    }

    public RangeStreamer(TokenMetadata tokenMetadata, Collection<Token> collection, InetAddress inetAddress, StreamOperation streamOperation, boolean z, IEndpointSnitch iEndpointSnitch, StreamStateStore streamStateStore, boolean z2, ISourceFilter iSourceFilter) {
        this.metadata = tokenMetadata;
        this.tokens = collection;
        this.address = inetAddress;
        this.description = streamOperation.getDescription();
        this.streamPlan = new StreamPlan(streamOperation, true);
        this.useStrictConsistency = z;
        this.snitch = iEndpointSnitch;
        this.stateStore = streamStateStore;
        this.streamPlan.listeners(this.stateStore, new StreamEventHandler[0]);
        this.sourceFilter = iSourceFilter;
    }

    public void addRanges(String str, Collection<Range<Token>> collection) {
        if (Keyspace.open(str).getReplicationStrategy() instanceof LocalStrategy) {
            logger.info("Not adding ranges for Local Strategy keyspace={}", str);
            return;
        }
        boolean useStrictSourcesForRanges = useStrictSourcesForRanges(str);
        Multimap<Range<Token>, InetAddress> allRangesWithStrictSourcesFor = useStrictSourcesForRanges ? getAllRangesWithStrictSourcesFor(str, collection) : getAllRangesWithSourcesFor(str, collection);
        Logger logger2 = logger;
        Object[] objArr = new Object[3];
        objArr[0] = str;
        objArr[1] = useStrictSourcesForRanges ? " with strict sources" : "";
        objArr[2] = allRangesWithStrictSourcesFor.keySet();
        logger2.info("Adding keyspace '{}'{} for ranges {}", objArr);
        for (Map.Entry<Range<Token>, InetAddress> entry : allRangesWithStrictSourcesFor.entries()) {
            logger.info("{}: range {} exists on {} for keyspace {}", new Object[]{this.description, entry.getKey(), entry.getValue(), str});
        }
        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry2 : getRangeFetchMap(allRangesWithStrictSourcesFor, this.sourceFilter, str, this.useStrictConsistency).asMap().entrySet()) {
            if (logger.isTraceEnabled()) {
                Iterator<Range<Token>> it = entry2.getValue().iterator();
                while (it.hasNext()) {
                    logger.trace(String.format("%s: range %s from source %s for keyspace %s", this.description, it.next(), entry2.getKey(), str));
                }
            }
            this.toFetch.put(str, entry2);
        }
    }

    private boolean useStrictSourcesForRanges(String str) {
        return (!this.useStrictConsistency || this.tokens == null || this.metadata.getAllEndpoints().size() == Keyspace.open(str).getReplicationStrategy().getReplicationFactor()) ? false : true;
    }

    private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String str, Collection<Range<Token>> collection) {
        Multimap<Range<Token>, InetAddress> rangeAddresses = Keyspace.open(str).getReplicationStrategy().getRangeAddresses(this.metadata.cloneOnlyTokenMap());
        ArrayListMultimap create = ArrayListMultimap.create();
        for (Range<Token> range : collection) {
            Iterator<Range<Token>> it = rangeAddresses.keySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Range<Token> next = it.next();
                if (next.contains(range)) {
                    create.putAll(range, this.snitch.getSortedListByProximity(this.address, rangeAddresses.get(next)));
                    break;
                }
            }
            if (!create.keySet().contains(range)) {
                throw new IllegalStateException("No sources found for " + range);
            }
        }
        return create;
    }

    private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String str, Collection<Range<Token>> collection) {
        if (!$assertionsDisabled && this.tokens == null) {
            throw new AssertionError();
        }
        AbstractReplicationStrategy replicationStrategy = Keyspace.open(str).getReplicationStrategy();
        TokenMetadata cloneOnlyTokenMap = this.metadata.cloneOnlyTokenMap();
        Multimap<Range<Token>, InetAddress> rangeAddresses = replicationStrategy.getRangeAddresses(cloneOnlyTokenMap);
        cloneOnlyTokenMap.updateNormalTokens(this.tokens, this.address);
        Multimap<Range<Token>, InetAddress> rangeAddresses2 = replicationStrategy.getRangeAddresses(cloneOnlyTokenMap);
        ArrayListMultimap create = ArrayListMultimap.create();
        for (Range<Token> range : collection) {
            for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : rangeAddresses.asMap().entrySet()) {
                if (entry.getKey().contains(range)) {
                    HashSet newHashSet = Sets.newHashSet(entry.getValue());
                    HashSet newHashSet2 = Sets.newHashSet(rangeAddresses2.get(range));
                    if (!newHashSet2.containsAll(newHashSet)) {
                        newHashSet.removeAll(newHashSet2);
                        if (!$assertionsDisabled && newHashSet.size() != 1) {
                            throw new AssertionError("Expected 1 endpoint but found " + newHashSet.size());
                        }
                    }
                    create.put(range, newHashSet.iterator().next());
                }
            }
            Collection<V> collection2 = create.get((ArrayListMultimap) range);
            if (collection2 == 0 || collection2.isEmpty()) {
                throw new IllegalStateException("No sources found for " + range);
            }
            if (collection2.size() > 1) {
                throw new IllegalStateException("Multiple endpoints found for " + range);
            }
            InetAddress inetAddress = (InetAddress) collection2.iterator().next();
            EndpointState endpointStateForEndpoint = Gossiper.instance.getEndpointStateForEndpoint(inetAddress);
            if (Gossiper.instance.isEnabled() && (endpointStateForEndpoint == null || !endpointStateForEndpoint.isAlive())) {
                throw new RuntimeException("A node required to move the data consistently is down (" + inetAddress + "). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false");
            }
        }
        return create;
    }

    private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> multimap, ISourceFilter iSourceFilter, String str, boolean z) {
        HashMultimap create = HashMultimap.create();
        for (Range<Token> range : multimap.keySet()) {
            boolean z2 = false;
            Iterator<InetAddress> it = multimap.get(range).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                InetAddress next = it.next();
                if (iSourceFilter.shouldInclude(next)) {
                    if (!next.equals(FBUtilities.getBroadcastAddress())) {
                        logger.info("Including {} for streaming range {} in keyspace {}", new Object[]{next, range, str});
                        create.put(next, range);
                        z2 = true;
                        break;
                    }
                    z2 = true;
                }
            }
            if (!z2) {
                handleSourceNotFound(str, z, range);
            }
        }
        return create;
    }

    static void handleSourceNotFound(String str, boolean z, Range<Token> range) {
        AbstractReplicationStrategy replicationStrategy = Keyspace.isInitialized() ? Keyspace.open(str).getReplicationStrategy() : null;
        if (replicationStrategy == null || replicationStrategy.getReplicationFactor() != 1) {
            throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + str);
        }
        if (z) {
            throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + str + " with RF=1. Ensure this keyspace contains replicas in the source datacenter.");
        }
        logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. Keyspace might be missing data.", range, str);
    }

    public static Multimap<InetAddress, Range<Token>> getWorkMapForMove(Multimap<Range<Token>, InetAddress> multimap, String str, IFailureDetector iFailureDetector, boolean z) {
        return getRangeFetchMap(multimap, SourceFilters.failureDetectorFilter(iFailureDetector), str, z);
    }

    @VisibleForTesting
    Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch() {
        return this.toFetch;
    }

    public StreamResultFuture fetchAsync() {
        for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : this.toFetch.entries()) {
            String key = entry.getKey();
            InetAddress key2 = entry.getValue().getKey();
            InetAddress preferredIP = SystemKeyspace.getPreferredIP(key2);
            Collection<Range<Token>> value = entry.getValue().getValue();
            Set<Range<Token>> availableRanges = this.stateStore.getAvailableRanges(key, StorageService.instance.getTokenMetadata().partitioner);
            if (value.removeAll(availableRanges)) {
                logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("{}ing from {} ranges {}", new Object[]{this.description, key2, StringUtils.join(value, ", ")});
            }
            this.streamPlan.requestRanges(key2, preferredIP, key, value);
        }
        this.streamFuture = this.streamPlan.execute();
        return this.streamFuture;
    }

    public void abort(String str) {
        if (this.streamFuture == null) {
            throw new IllegalStateException("Range streaming has not been started");
        }
        this.streamFuture.abort(str);
    }

    static {
        $assertionsDisabled = !RangeStreamer.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(RangeStreamer.class);
    }
}
