package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Time;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ConsumerSubscriptionCleaner.class */
public class ConsumerSubscriptionCleaner {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ConsumerSubscriptionCleaner.class);
    private final long nonExistingTopicCleanupDelayMS;
    private final TopicExistenceChecker topicExistenceChecker;
    private final Supplier<Set<PubSubTopicPartition>> currentAssignmentSupplier;
    private final IntConsumer recordNumberOfTopicsToUnsub;
    private final Consumer<Set<PubSubTopicPartition>> batchUnsubscribeFunction;
    private final Time time;
    private final int sanitizeTopicSubscriptionAfterPollTimes;
    private final Object2LongMap<String> nonExistingTopicDiscoverTimestampMap = new Object2LongOpenHashMap();
    private int pollTimesSinceLastSanitization = 0;

    public ConsumerSubscriptionCleaner(long j, int i, TopicExistenceChecker topicExistenceChecker, Supplier<Set<PubSubTopicPartition>> supplier, IntConsumer intConsumer, Consumer<Set<PubSubTopicPartition>> consumer, Time time) {
        this.nonExistingTopicCleanupDelayMS = j;
        this.sanitizeTopicSubscriptionAfterPollTimes = i;
        this.topicExistenceChecker = topicExistenceChecker;
        this.currentAssignmentSupplier = supplier;
        this.recordNumberOfTopicsToUnsub = intConsumer;
        this.batchUnsubscribeFunction = consumer;
        this.time = time;
        this.nonExistingTopicDiscoverTimestampMap.defaultReturnValue(-1L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<PubSubTopicPartition> getTopicPartitionsToUnsubscribe(Set<PubSubTopicPartition> set) {
        set.clear();
        int i = this.pollTimesSinceLastSanitization + 1;
        this.pollTimesSinceLastSanitization = i;
        if (i < this.sanitizeTopicSubscriptionAfterPollTimes) {
            return set;
        }
        this.pollTimesSinceLastSanitization = 0;
        Set<PubSubTopicPartition> set2 = this.currentAssignmentSupplier.get();
        if (set2.isEmpty()) {
            return set;
        }
        HashSet<String> hashSet = new HashSet();
        long milliseconds = this.time.getMilliseconds();
        Iterator<PubSubTopicPartition> it2 = set2.iterator();
        while (it2.hasNext()) {
            String name = it2.next().getPubSubTopic().getName();
            if (this.topicExistenceChecker.checkTopicExists(name)) {
                long removeLong = this.nonExistingTopicDiscoverTimestampMap.removeLong(name);
                if (removeLong != this.nonExistingTopicDiscoverTimestampMap.defaultReturnValue()) {
                    LOGGER.info("The non-existing topic detected previously: {} show up after {} ms and it will be removed from nonExistingTopicDiscoverTimestampMap", name, Long.valueOf(milliseconds - removeLong));
                }
            } else {
                hashSet.add(name);
            }
        }
        HashSet hashSet2 = new HashSet(hashSet);
        if (!hashSet.isEmpty()) {
            LOGGER.error("Detected the following non-existing topics: {}", hashSet);
            for (String str : hashSet) {
                long j = this.nonExistingTopicDiscoverTimestampMap.getLong(str);
                if (j == this.nonExistingTopicDiscoverTimestampMap.defaultReturnValue()) {
                    this.nonExistingTopicDiscoverTimestampMap.put((Object2LongMap<String>) str, milliseconds);
                    j = milliseconds;
                }
                long j2 = milliseconds - j;
                if (j2 >= this.nonExistingTopicCleanupDelayMS) {
                    LOGGER.error("The non-existing topic hasn't showed up after {} ms, so we will fail the attached ingestion task", Long.valueOf(j2));
                    this.nonExistingTopicDiscoverTimestampMap.removeLong(str);
                } else {
                    hashSet2.remove(str);
                }
            }
        }
        this.recordNumberOfTopicsToUnsub.accept(hashSet2.size());
        HashSet hashSet3 = new HashSet();
        for (PubSubTopicPartition pubSubTopicPartition : set2) {
            if (hashSet2.contains(pubSubTopicPartition.getPubSubTopic().getName())) {
                set.add(pubSubTopicPartition);
            } else {
                hashSet3.add(pubSubTopicPartition);
            }
        }
        if (hashSet3.size() != set2.size()) {
            this.batchUnsubscribeFunction.accept(set);
        }
        return set;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unsubscribe(Set<PubSubTopicPartition> set) {
        if (set.isEmpty()) {
            return;
        }
        Set<PubSubTopicPartition> set2 = this.currentAssignmentSupplier.get();
        HashSet hashSet = new HashSet(set2);
        hashSet.removeAll(set);
        if (hashSet.size() == set2.size()) {
            return;
        }
        this.batchUnsubscribeFunction.accept(set);
    }
}
