package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.events.TopicPoliciesEvent;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.class */
public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService {
    private final PulsarService pulsarService;
    private final HashSet localCluster;
    private final String clusterName;
    private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

    @VisibleForTesting
    final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap();
    final Map<TopicName, TopicPolicies> globalPoliciesCache = new ConcurrentHashMap();
    private final Map<NamespaceName, AtomicInteger> ownedBundlesCountPerNamespace = new ConcurrentHashMap();
    private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches = new ConcurrentHashMap();

    @VisibleForTesting
    final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap();

    @VisibleForTesting
    final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$events$ActionType = new int[ActionType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.NONE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
        this.pulsarService = pulsarService;
        this.clusterName = pulsarService.getConfiguration().getClusterName();
        this.localCluster = Sets.newHashSet(new String[]{this.clusterName});
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
        return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies topicPolicies) {
        return sendTopicPolicyEvent(topicName, ActionType.UPDATE, topicPolicies);
    }

    private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType, TopicPolicies topicPolicies) {
        if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            createSystemTopicFactoryIfNeeded();
            this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()).newWriterAsync().whenComplete((writer, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    PulsarEvent pulsarEvent = getPulsarEvent(topicName, actionType, topicPolicies);
                    (ActionType.DELETE.equals(actionType) ? writer.deleteAsync(pulsarEvent) : writer.writeAsync(pulsarEvent)).whenComplete((messageId, th) -> {
                        if (th != null) {
                            completableFuture.completeExceptionally(th);
                        } else if (messageId != null) {
                            completableFuture.complete(null);
                        } else {
                            completableFuture.completeExceptionally(new RuntimeException("Got message id is null."));
                        }
                        writer.closeAsync().whenComplete((r6, th) -> {
                            if (th != null) {
                                log.error("[{}] Close writer error.", topicName, th);
                            } else if (log.isDebugEnabled()) {
                                log.debug("[{}] Close writer success.", topicName);
                            }
                        });
                    });
                }
            });
            return completableFuture;
        } catch (PulsarServerException e) {
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies topicPolicies) {
        PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
        if (topicPolicies == null || !topicPolicies.isGlobalPolicies()) {
            builder.replicateTo(this.localCluster);
        }
        return builder.actionType(actionType).eventType(EventType.TOPIC_POLICY).topicPoliciesEvent(TopicPoliciesEvent.builder().domain(topicName.getDomain().toString()).tenant(topicName.getTenant()).namespace(topicName.getNamespaceObject().getLocalName()).topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).policies(topicPolicies).build()).build();
    }

    private void notifyListener(Message<PulsarEvent> message) {
        if (message.getValue() == null) {
            TopicName topicName = TopicName.get(TopicName.get(message.getKey()).getPartitionedTopicName());
            if (this.listeners.get(topicName) != null) {
                Iterator<TopicPolicyListener<TopicPolicies>> it = this.listeners.get(topicName).iterator();
                while (it.hasNext()) {
                    it.next().onUpdate(null);
                }
                return;
            }
            return;
        }
        if (EventType.TOPIC_POLICY.equals(((PulsarEvent) message.getValue()).getEventType())) {
            TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent) message.getValue()).getTopicPoliciesEvent();
            TopicName topicName2 = TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic());
            if (this.listeners.get(topicName2) != null) {
                TopicPolicies policies = topicPoliciesEvent.getPolicies();
                Iterator<TopicPolicyListener<TopicPolicies>> it2 = this.listeners.get(topicName2).iterator();
                while (it2.hasNext()) {
                    it2.next().onUpdate(policies);
                }
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public TopicPolicies getTopicPolicies(TopicName topicName) throws BrokerServiceException.TopicPoliciesCacheNotInitException {
        return getTopicPolicies(topicName, false);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public TopicPolicies getTopicPolicies(TopicName topicName, boolean z) throws BrokerServiceException.TopicPoliciesCacheNotInitException {
        if (!this.policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
            prepareInitPoliciesCache(topicName.getNamespaceObject(), new CompletableFuture<>());
        }
        if (!this.policyCacheInitMap.containsKey(topicName.getNamespaceObject()) || this.policyCacheInitMap.get(topicName.getNamespaceObject()).booleanValue()) {
            return z ? this.globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())) : this.policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
        }
        throw new BrokerServiceException.TopicPoliciesCacheNotInitException();
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
        return this.policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
        CompletableFuture<TopicPolicies> completableFuture = new CompletableFuture<>();
        try {
            createSystemTopicFactoryIfNeeded();
            this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync().thenAccept(reader -> {
                fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, null, completableFuture);
            });
            return completableFuture;
        } catch (PulsarServerException e) {
            completableFuture.complete(null);
            return completableFuture;
        }
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
        if (NamespaceService.checkHeartbeatNamespace(namespaceObject) != null || NamespaceService.checkHeartbeatNamespaceV2(namespaceObject) != null) {
            completableFuture.complete(null);
            return completableFuture;
        }
        synchronized (this) {
            if (this.readerCaches.get(namespaceObject) != null) {
                this.ownedBundlesCountPerNamespace.get(namespaceObject).incrementAndGet();
                completableFuture.complete(null);
            } else {
                this.ownedBundlesCountPerNamespace.putIfAbsent(namespaceObject, new AtomicInteger(1));
                prepareInitPoliciesCache(namespaceObject, completableFuture);
            }
        }
        return completableFuture;
    }

    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespaceName, CompletableFuture<Void> completableFuture) {
        if (this.policyCacheInitMap.putIfAbsent(namespaceName, false) == null) {
            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry = createSystemTopicClientWithRetry(namespaceName);
            this.readerCaches.put(namespaceName, createSystemTopicClientWithRetry);
            createSystemTopicClientWithRetry.thenAccept(reader -> {
                initPolicesCache(reader, completableFuture);
                completableFuture.thenRun(() -> {
                    readMorePolicies(reader);
                });
            }).exceptionally(th -> {
                log.error("[{}] Failed to create reader on __change_events topic", namespaceName, th);
                cleanCacheAndCloseReader(namespaceName, false);
                completableFuture.completeExceptionally(th);
                return null;
            });
        }
    }

    protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(NamespaceName namespaceName) {
        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> completableFuture = new CompletableFuture<>();
        try {
            createSystemTopicFactoryIfNeeded();
            TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient = this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(namespaceName);
            Backoff backoff = new Backoff(1L, TimeUnit.SECONDS, 3L, TimeUnit.SECONDS, 10L, TimeUnit.SECONDS);
            Objects.requireNonNull(createTopicPoliciesSystemTopicClient);
            RetryUtil.retryAsynchronously(createTopicPoliciesSystemTopicClient::newReaderAsync, backoff, this.pulsarService.getExecutor(), completableFuture);
            return completableFuture;
        } catch (PulsarServerException e) {
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
        if (NamespaceService.checkHeartbeatNamespace(namespaceObject) != null || NamespaceService.checkHeartbeatNamespaceV2(namespaceObject) != null) {
            return CompletableFuture.completedFuture(null);
        }
        AtomicInteger atomicInteger = this.ownedBundlesCountPerNamespace.get(namespaceObject);
        if (atomicInteger == null || atomicInteger.decrementAndGet() <= 0) {
            cleanCacheAndCloseReader(namespaceObject, true);
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { // from class: org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.1
            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void onLoad(NamespaceBundle namespaceBundle) {
                SystemTopicBasedTopicPoliciesService.this.addOwnedNamespaceBundleAsync(namespaceBundle);
            }

            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void unLoad(NamespaceBundle namespaceBundle) {
                SystemTopicBasedTopicPoliciesService.this.removeOwnedNamespaceBundleAsync(namespaceBundle);
            }

            @Override // java.util.function.Predicate
            public boolean test(NamespaceBundle namespaceBundle) {
                return true;
            }
        });
    }

    private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> completableFuture) {
        reader.hasMoreEventsAsync().whenComplete((bool, th) -> {
            if (th != null) {
                log.error("[{}] Failed to check the move events for the system topic", reader.getSystemTopic().getTopicName(), th);
                completableFuture.completeExceptionally(th);
                cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
            } else {
                if (bool.booleanValue()) {
                    reader.readNextAsync().whenComplete((message, th) -> {
                        if (th != null) {
                            log.error("[{}] Failed to read event from the system topic.", reader.getSystemTopic().getTopicName(), th);
                            completableFuture.completeExceptionally(th);
                            cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                        } else {
                            refreshTopicPoliciesCache(message);
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Loop next event reading for system topic.", reader.getSystemTopic().getTopicName().getNamespaceObject());
                            }
                            initPolicesCache(reader, completableFuture);
                        }
                    });
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
                }
                this.policyCacheInitMap.computeIfPresent(reader.getSystemTopic().getTopicName().getNamespaceObject(), (namespaceName, bool) -> {
                    return true;
                });
                this.policiesCache.forEach((topicName, topicPolicies) -> {
                    if (this.listeners.get(topicName) != null) {
                        Iterator<TopicPolicyListener<TopicPolicies>> it = this.listeners.get(topicName).iterator();
                        while (it.hasNext()) {
                            it.next().onUpdate(topicPolicies);
                        }
                    }
                });
                completableFuture.complete(null);
            }
        });
    }

    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespaceName, boolean z) {
        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> remove = this.readerCaches.remove(namespaceName);
        this.policiesCache.entrySet().removeIf(entry -> {
            return Objects.equals(((TopicName) entry.getKey()).getNamespaceObject(), namespaceName);
        });
        if (z) {
            this.ownedBundlesCountPerNamespace.remove(namespaceName);
        }
        if (remove != null && !remove.isCompletedExceptionally()) {
            remove.thenCompose((v0) -> {
                return v0.closeAsync();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn("[{}] Close change_event reader fail.", namespaceName, th);
                return null;
            });
        }
        this.policyCacheInitMap.remove(namespaceName);
    }

    private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
        reader.readNextAsync().thenAccept(message -> {
            refreshTopicPoliciesCache(message);
            notifyListener(message);
        }).whenComplete((r6, th) -> {
            if (th == null) {
                readMorePolicies(reader);
            } else if (FutureUtil.unwrapCompletionException(th) instanceof PulsarClientException.AlreadyClosedException) {
                log.error("Read more topic policies exception, close the read now!", th);
                cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
            } else {
                log.warn("Read more topic polices exception, read again.", th);
                readMorePolicies(reader);
            }
        });
    }

    private void refreshTopicPoliciesCache(Message<PulsarEvent> message) {
        if (message.getValue() == null) {
            TopicName topicName = TopicName.get(TopicName.get(message.getKey()).getPartitionedTopicName());
            if (hasReplicateTo(message)) {
                this.globalPoliciesCache.remove(topicName);
                return;
            } else {
                this.policiesCache.remove(topicName);
                return;
            }
        }
        if (EventType.TOPIC_POLICY.equals(((PulsarEvent) message.getValue()).getEventType())) {
            TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent) message.getValue()).getTopicPoliciesEvent();
            TopicName topicName2 = TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic());
            switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$common$events$ActionType[((PulsarEvent) message.getValue()).getActionType().ordinal()]) {
                case 1:
                    if ((topicPoliciesEvent.getPolicies().isGlobalPolicies() ? this.globalPoliciesCache.putIfAbsent(topicName2, topicPoliciesEvent.getPolicies()) : this.policiesCache.putIfAbsent(topicName2, topicPoliciesEvent.getPolicies())) != null) {
                        log.warn("Policy insert failed, the topic: {}' policy already exist", topicName2);
                        return;
                    }
                    return;
                case 2:
                    if (topicPoliciesEvent.getPolicies().isGlobalPolicies()) {
                        this.globalPoliciesCache.put(topicName2, topicPoliciesEvent.getPolicies());
                        return;
                    } else {
                        this.policiesCache.put(topicName2, topicPoliciesEvent.getPolicies());
                        return;
                    }
                case 3:
                    this.policiesCache.remove(topicName2);
                    try {
                        createSystemTopicFactoryIfNeeded();
                        this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName2.getNamespaceObject()).newWriterAsync().thenAccept(writer -> {
                            writer.deleteAsync(getPulsarEvent(topicName2, ActionType.DELETE, null)).whenComplete((messageId, th) -> {
                                writer.closeAsync().whenComplete((r4, th) -> {
                                    if (th != null) {
                                        log.error("close writer failed ", th);
                                    }
                                });
                            });
                        });
                        return;
                    } catch (PulsarServerException e) {
                        log.error("Failed to create system topic factory");
                        return;
                    }
                case 4:
                    return;
                default:
                    log.warn("Unknown event action type: {}", ((PulsarEvent) message.getValue()).getActionType());
                    return;
            }
        }
    }

    private boolean hasReplicateTo(Message<?> message) {
        if (message instanceof MessageImpl) {
            if (((MessageImpl) message).hasReplicateTo()) {
                return (((MessageImpl) message).getReplicateTo().size() == 1 && ((MessageImpl) message).getReplicateTo().contains(this.clusterName)) ? false : true;
            }
            return false;
        }
        if (message instanceof TopicMessageImpl) {
            return hasReplicateTo(((TopicMessageImpl) message).getMessage());
        }
        return false;
    }

    private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
        if (this.namespaceEventsSystemTopicFactory == null) {
            synchronized (this) {
                if (this.namespaceEventsSystemTopicFactory == null) {
                    try {
                        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(this.pulsarService.getClient());
                    } catch (PulsarServerException e) {
                        log.error("Create namespace event system topic factory error.", e);
                        throw e;
                    }
                }
            }
        }
    }

    private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent> reader, TopicName topicName, TopicPolicies topicPolicies, CompletableFuture<TopicPolicies> completableFuture) {
        reader.hasMoreEventsAsync().whenComplete((bool, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            }
            if (bool.booleanValue()) {
                reader.readNextAsync().whenComplete((message, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    }
                    if (message.getValue() == null || !EventType.TOPIC_POLICY.equals(((PulsarEvent) message.getValue()).getEventType())) {
                        return;
                    }
                    TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent) message.getValue()).getTopicPoliciesEvent();
                    if (topicName.equals(TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic()))) {
                        fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, topicPoliciesEvent.getPolicies(), completableFuture);
                    } else {
                        fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, topicPolicies, completableFuture);
                    }
                });
            } else {
                completableFuture.complete(topicPolicies);
                reader.closeAsync().whenComplete((r6, th2) -> {
                    if (th2 != null) {
                        log.error("[{}] Close reader error.", topicName, th2);
                    }
                });
            }
        });
    }

    @VisibleForTesting
    long getPoliciesCacheSize() {
        return this.policiesCache.size();
    }

    @VisibleForTesting
    long getReaderCacheCount() {
        return this.readerCaches.size();
    }

    @VisibleForTesting
    boolean checkReaderIsCached(NamespaceName namespaceName) {
        return this.readerCaches.get(namespaceName) != null;
    }

    @VisibleForTesting
    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
        return this.policyCacheInitMap.get(namespaceName);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> topicPolicyListener) {
        this.listeners.compute(topicName, (topicName2, list) -> {
            if (list == null) {
                list = Lists.newCopyOnWriteArrayList();
            }
            list.add(topicPolicyListener);
            return list;
        });
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> topicPolicyListener) {
        this.listeners.compute(topicName, (topicName2, list) -> {
            if (list != null) {
                list.remove(topicPolicyListener);
                if (list.isEmpty()) {
                    list = null;
                }
            }
            return list;
        });
    }

    @VisibleForTesting
    protected Map<TopicName, TopicPolicies> getPoliciesCache() {
        return this.policiesCache;
    }

    @VisibleForTesting
    protected Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> getListeners() {
        return this.listeners;
    }
}
