package org.apache.pulsar.broker.service.persistent;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.shade.com.carrotsearch.hppc.ObjectObjectHashMap;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationStats;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic.class */
public class PersistentTopic extends AbstractTopic implements Topic, AsyncCallbacks.AddEntryCallback {
    protected final ManagedLedger ledger;
    private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
    private final ConcurrentOpenHashMap<String, Replicator> replicators;
    static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
    private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
    private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5d;
    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
    public boolean msgChunkPublished;
    private Optional<DispatchRateLimiter> dispatchRateLimiter;
    private final Object dispatchRateLimiterLock;
    private Optional<SubscribeRateLimiter> subscribeRateLimiter;
    private final long backloggedCursorThresholdEntries;
    public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
    protected final MessageDeduplication messageDeduplication;
    private static final long COMPACTION_NEVER_RUN = -4273917950L;
    private CompletableFuture<Long> currentCompaction;
    private final CompactedTopic compactedTopic;
    private CompletableFuture<MessageIdImpl> currentOffload;
    private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController;
    private final AtomicLong pendingWriteOps;
    private volatile double lastUpdatedAvgPublishRateInMsg;
    private volatile double lastUpdatedAvgPublishRateInByte;
    private volatile int maxUnackedMessagesOnSubscriptionApplied;
    private volatile boolean isClosingOrDeleting;
    private ScheduledFuture<?> fencedTopicMonitoringTask;
    protected final TransactionBuffer transactionBuffer;
    private long lastDataMessagePublishedTimestamp;
    private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.1
        AnonymousClass1() {
        }

        @Override // org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal
        public TopicStatsHelper initialValue() {
            return new TopicStatsHelper();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$1 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$1.class */
    class AnonymousClass1 extends FastThreadLocal<TopicStatsHelper> {
        AnonymousClass1() {
        }

        @Override // org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal
        public TopicStatsHelper initialValue() {
            return new TopicStatsHelper();
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$10 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$10.class */
    public class AnonymousClass10 implements AsyncCallbacks.ReadEntryCallback {
        final /* synthetic */ int val$backlogQuotaLimitInSecond;
        final /* synthetic */ CompletableFuture val$future;
        final /* synthetic */ TopicName val$topicName;

        AnonymousClass10(int i, CompletableFuture completableFuture, TopicName topicName) {
            r5 = i;
            r6 = completableFuture;
            r7 = topicName;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
        public void readEntryComplete(Entry entry, Object obj) {
            try {
                try {
                    boolean isEntryExpired = MessageImpl.isEntryExpired(r5, Commands.getEntryTimestamp(entry.getDataBuffer()));
                    if (isEntryExpired && PersistentTopic.log.isDebugEnabled()) {
                        PersistentTopic.log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlogexceeded quota {}", ((ManagedLedgerImpl) PersistentTopic.this.ledger).getSlowestConsumer().getName(), Integer.valueOf(r5));
                    }
                    r6.complete(Boolean.valueOf(isEntryExpired));
                    entry.release();
                } catch (Exception e) {
                    PersistentTopic.log.error("[{}][{}] Error deserializing message for backlog check", r7, e);
                    r6.complete(false);
                    entry.release();
                }
            } catch (Throwable th) {
                entry.release();
                throw th;
            }
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
        public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentTopic.log.error("[{}][{}] Error reading entry for precise time based  backlog check", r7, managedLedgerException);
            r6.complete(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$11 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$11.class */
    public class AnonymousClass11 implements AsyncCallbacks.TerminateCallback {
        final /* synthetic */ CompletableFuture val$future;

        AnonymousClass11(CompletableFuture completableFuture) {
            r5 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback
        public void terminateComplete(Position position, Object obj) {
            PersistentTopic.this.producers.values().forEach((v0) -> {
                v0.disconnect();
            });
            PersistentTopic.this.subscriptions.forEach((str, persistentSubscription) -> {
                persistentSubscription.topicTerminated();
            });
            PositionImpl positionImpl = (PositionImpl) position;
            MessageIdImpl messageIdImpl = new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), -1);
            PersistentTopic.log.info("[{}] Topic terminated at {}", PersistentTopic.this.getName(), messageIdImpl);
            r5.complete(messageIdImpl);
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback
        public void terminateFailed(ManagedLedgerException managedLedgerException, Object obj) {
            r5.completeExceptionally(managedLedgerException);
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$12 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$12.class */
    class AnonymousClass12 implements AsyncCallbacks.ReadEntryCallback {
        final /* synthetic */ CompletableFuture val$completableFuture;
        final /* synthetic */ PositionImpl val$position;
        final /* synthetic */ int val$partitionIndex;

        AnonymousClass12(CompletableFuture completableFuture, PositionImpl positionImpl, int i) {
            r5 = completableFuture;
            r6 = positionImpl;
            r7 = i;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
        public void readEntryComplete(Entry entry, Object obj) {
            try {
                MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
                if (parseMessageMetadata.hasNumMessagesInBatch()) {
                    r5.complete(new BatchMessageIdImpl(r6.getLedgerId(), r6.getEntryId(), r7, parseMessageMetadata.getNumMessagesInBatch() - 1));
                } else {
                    r5.complete(new MessageIdImpl(r6.getLedgerId(), r6.getEntryId(), r7));
                }
            } finally {
                entry.release();
            }
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
        public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
            r5.completeExceptionally(managedLedgerException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$13 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$13.class */
    public class AnonymousClass13 implements AsyncCallbacks.OffloadCallback {
        final /* synthetic */ MessageIdImpl val$messageId;
        final /* synthetic */ CompletableFuture val$promise;

        AnonymousClass13(MessageIdImpl messageIdImpl, CompletableFuture completableFuture) {
            r5 = messageIdImpl;
            r6 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback
        public void offloadComplete(Position position, Object obj) {
            PositionImpl positionImpl = (PositionImpl) position;
            PersistentTopic.log.info("[{}] Completed successfully offload operation at messageId {}", PersistentTopic.this.topic, r5);
            r6.complete(new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), -1));
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback
        public void offloadFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentTopic.log.warn("[{}] Failed offload operation at messageId {}", new Object[]{PersistentTopic.this.topic, r5, managedLedgerException});
            r6.completeExceptionally(managedLedgerException);
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$2 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$2.class */
    public class AnonymousClass2 implements AsyncCallbacks.UpdatePropertiesCallback {
        final /* synthetic */ long val$newEpoch;
        final /* synthetic */ CompletableFuture val$future;

        AnonymousClass2(long j, CompletableFuture completableFuture) {
            r6 = j;
            r8 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback
        public void updatePropertiesComplete(Map<String, String> map, Object obj) {
            PersistentTopic.log.info("[{}] Updated topic epoch to {}", PersistentTopic.this.getName(), Long.valueOf(r6));
            r8.complete(Long.valueOf(r6));
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback
        public void updatePropertiesFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentTopic.log.warn("[{}] Failed to update topic epoch to {}: {}", new Object[]{PersistentTopic.this.getName(), Long.valueOf(r6), managedLedgerException.getMessage()});
            r8.completeExceptionally(managedLedgerException);
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$3 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$3.class */
    public class AnonymousClass3 implements AsyncCallbacks.OpenCursorCallback {
        final /* synthetic */ String val$subscriptionName;
        final /* synthetic */ boolean val$replicated;
        final /* synthetic */ Map val$subscriptionProperties;
        final /* synthetic */ CompletableFuture val$subscriptionFuture;
        final /* synthetic */ long val$startMessageRollbackDurationSec;

        AnonymousClass3(String str, boolean z, Map map, CompletableFuture completableFuture, long j) {
            r6 = str;
            r7 = z;
            r8 = map;
            r9 = completableFuture;
            r10 = j;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
        public void openCursorComplete(ManagedCursor managedCursor, Object obj) {
            if (PersistentTopic.log.isDebugEnabled()) {
                PersistentTopic.log.debug("[{}][{}] Opened cursor", PersistentTopic.this.topic, r6);
            }
            PersistentSubscription persistentSubscription = (PersistentSubscription) PersistentTopic.this.subscriptions.get(r6);
            if (persistentSubscription == null) {
                ConcurrentOpenHashMap concurrentOpenHashMap = PersistentTopic.this.subscriptions;
                String str = r6;
                String str2 = r6;
                boolean z = r7;
                Map map = r8;
                persistentSubscription = (PersistentSubscription) concurrentOpenHashMap.computeIfAbsent(str, str3 -> {
                    return PersistentTopic.this.createPersistentSubscription(str2, managedCursor, z, map);
                });
            } else if (persistentSubscription.getCursor() != null && !persistentSubscription.getCursor().isDurable()) {
                r9.completeExceptionally(new BrokerServiceException.NotAllowedException("NonDurable subscription with the same name already exists."));
                return;
            }
            if (r7 && !persistentSubscription.isReplicated()) {
                persistentSubscription.setReplicated(r7);
            }
            if (r10 > 0) {
                PersistentTopic.this.resetSubscriptionCursor(persistentSubscription, r9, r10);
            } else {
                r9.complete(persistentSubscription);
            }
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
        public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentTopic.log.warn("[{}] Failed to create subscription for {}: {}", new Object[]{PersistentTopic.this.topic, r6, managedLedgerException.getMessage()});
            PersistentTopic.this.decrementUsageCount();
            r9.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                PersistentTopic.this.close();
            }
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$4 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$4.class */
    public class AnonymousClass4 implements AsyncCallbacks.DeleteLedgerCallback {
        final /* synthetic */ String val$subscriptionName;
        final /* synthetic */ CompletableFuture val$unsubscribeFuture;

        AnonymousClass4(String str, CompletableFuture completableFuture) {
            r5 = str;
            r6 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
        public void deleteLedgerComplete(Object obj) {
            PersistentTopic.this.asyncDeleteCursor(r5, r6);
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
        public void deleteLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
            if (managedLedgerException instanceof ManagedLedgerException.MetadataNotFoundException) {
                PersistentTopic.this.asyncDeleteCursor(r5, r6);
            } else {
                r6.completeExceptionally(managedLedgerException);
                PersistentTopic.log.error("[{}][{}] Error deleting subscription pending ack store", new Object[]{PersistentTopic.this.topic, r5, managedLedgerException});
            }
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$5 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$5.class */
    public class AnonymousClass5 implements AsyncCallbacks.DeleteCursorCallback {
        final /* synthetic */ String val$subscriptionName;
        final /* synthetic */ CompletableFuture val$unsubscribeFuture;

        AnonymousClass5(String str, CompletableFuture completableFuture) {
            r5 = str;
            r6 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
        public void deleteCursorComplete(Object obj) {
            if (PersistentTopic.log.isDebugEnabled()) {
                PersistentTopic.log.debug("[{}][{}] Cursor deleted successfully", PersistentTopic.this.topic, r5);
            }
            PersistentTopic.this.removeSubscription(r5);
            r6.complete(null);
            PersistentTopic.access$902(PersistentTopic.this, System.nanoTime());
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
        public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
            if (PersistentTopic.log.isDebugEnabled()) {
                PersistentTopic.log.debug("[{}][{}] Error deleting cursor for subscription", new Object[]{PersistentTopic.this.topic, r5, managedLedgerException});
            }
            if (!(managedLedgerException instanceof ManagedLedgerException.ManagedLedgerNotFoundException)) {
                r6.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            } else {
                r6.complete(null);
                PersistentTopic.access$1102(PersistentTopic.this, System.nanoTime());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$6 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$6.class */
    public class AnonymousClass6 implements AsyncCallbacks.DeleteLedgerCallback {
        final /* synthetic */ CompletableFuture val$deleteFuture;

        AnonymousClass6(CompletableFuture completableFuture) {
            r5 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
        public void deleteLedgerComplete(Object obj) {
            PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
            PersistentTopic.this.dispatchRateLimiter.ifPresent((v0) -> {
                v0.close();
            });
            PersistentTopic.this.subscribeRateLimiter.ifPresent((v0) -> {
                v0.close();
            });
            PersistentTopic.this.unregisterTopicPolicyListener();
            PersistentTopic.log.info("[{}] Topic deleted", PersistentTopic.this.topic);
            r5.complete(null);
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
        public void deleteLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
            if (managedLedgerException.getCause() instanceof MetadataStoreException.NotFoundException) {
                PersistentTopic.log.info("[{}] Topic is already deleted {}", PersistentTopic.this.topic, managedLedgerException.getMessage());
                deleteLedgerComplete(obj);
            } else {
                PersistentTopic.this.unfenceTopicToResume();
                PersistentTopic.log.error("[{}] Error deleting topic", PersistentTopic.this.topic, managedLedgerException);
                r5.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$7 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$7.class */
    public class AnonymousClass7 implements AsyncCallbacks.CloseCallback {
        final /* synthetic */ CompletableFuture val$closeFuture;

        AnonymousClass7(CompletableFuture completableFuture) {
            r5 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback
        public void closeComplete(Object obj) {
            CompletableFuture<Void> removeTopicFromCache = PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
            CompletableFuture completableFuture = r5;
            CompletableFuture<Void> thenRun = removeTopicFromCache.thenRun(() -> {
                PersistentTopic.this.replicatedSubscriptionsController.ifPresent((v0) -> {
                    v0.close();
                });
                PersistentTopic.this.dispatchRateLimiter.ifPresent((v0) -> {
                    v0.close();
                });
                PersistentTopic.this.subscribeRateLimiter.ifPresent((v0) -> {
                    v0.close();
                });
                PersistentTopic.this.unregisterTopicPolicyListener();
                PersistentTopic.log.info("[{}] Topic closed", PersistentTopic.this.topic);
                PersistentTopic.this.cancelFencedTopicMonitoringTask();
                completableFuture.complete(null);
            });
            CompletableFuture completableFuture2 = r5;
            thenRun.exceptionally(th -> {
                completableFuture2.completeExceptionally(th);
                return null;
            });
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback
        public void closeFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentTopic.log.error("[{}] Failed to close managed ledger, proceeding anyway.", PersistentTopic.this.topic, managedLedgerException);
            PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
            r5.complete(null);
        }
    }

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$8 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$8.class */
    public class AnonymousClass8 implements AsyncCallbacks.OpenCursorCallback {
        final /* synthetic */ String val$remoteCluster;
        final /* synthetic */ CompletableFuture val$future;

        AnonymousClass8(String str, CompletableFuture completableFuture) {
            r5 = str;
            r6 = completableFuture;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
        public void openCursorComplete(ManagedCursor managedCursor, Object obj) {
            CompletableFuture<Void> addReplicationCluster = PersistentTopic.this.addReplicationCluster(r5, managedCursor, PersistentTopic.this.brokerService.pulsar().getConfiguration().getClusterName());
            CompletableFuture completableFuture = r6;
            addReplicationCluster.whenComplete((r4, th) -> {
                if (th == null) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            });
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
        public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
            r6.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentTopic$9 */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$9.class */
    public class AnonymousClass9 implements AsyncCallbacks.DeleteCursorCallback {
        final /* synthetic */ String val$remoteCluster;
        final /* synthetic */ CompletableFuture val$future;
        final /* synthetic */ String val$name;

        AnonymousClass9(String str, CompletableFuture completableFuture, String str2) {
            r5 = str;
            r6 = completableFuture;
            r7 = str2;
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
        public void deleteCursorComplete(Object obj) {
            PersistentTopic.this.replicators.remove(r5);
            r6.complete(null);
        }

        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
        public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
            PersistentTopic.log.error("[{}] Failed to delete cursor {} {}", new Object[]{PersistentTopic.this.topic, r7, managedLedgerException.getMessage(), managedLedgerException});
            r6.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$TopicStatsHelper.class */
    public static class TopicStatsHelper {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgThrottlingFailure;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats = new ObjectObjectHashMap<>();

        public TopicStatsHelper() {
            reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0d;
            this.aggMsgRateIn = 0.0d;
            this.aggMsgThroughputIn = 0.0d;
            this.aggMsgRateOut = 0.0d;
            this.aggMsgThrottlingFailure = 0.0d;
            this.aggMsgThroughputOut = 0.0d;
            this.remotePublishersStats.clear();
        }
    }

    public PersistentTopic(String str, ManagedLedger managedLedger, BrokerService brokerService) {
        super(str, brokerService);
        this.dispatchRateLimiter = Optional.empty();
        this.dispatchRateLimiterLock = new Object();
        this.subscribeRateLimiter = Optional.empty();
        this.currentCompaction = CompletableFuture.completedFuture(Long.valueOf(COMPACTION_NEVER_RUN));
        this.currentOffload = CompletableFuture.completedFuture((MessageIdImpl) MessageId.earliest);
        this.replicatedSubscriptionsController = Optional.empty();
        this.pendingWriteOps = new AtomicLong(0L);
        this.lastUpdatedAvgPublishRateInMsg = 0.0d;
        this.lastUpdatedAvgPublishRateInByte = 0.0d;
        this.isClosingOrDeleting = false;
        this.fencedTopicMonitoringTask = null;
        this.lastDataMessagePublishedTimestamp = 0L;
        this.ledger = managedLedger;
        this.subscriptions = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        this.replicators = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
        registerTopicPolicyListener();
        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
        for (ManagedCursor managedCursor : managedLedger.getCursors()) {
            if (!managedCursor.getName().equals(DEDUPLICATION_CURSOR_NAME) && !managedCursor.getName().startsWith(this.replicatorPrefix)) {
                String decode = Codec.decode(managedCursor.getName());
                this.subscriptions.put(decode, createPersistentSubscription(decode, managedCursor, PersistentSubscription.isCursorFromReplicatedSubscription(managedCursor), managedCursor.getCursorProperties()));
                this.subscriptions.get(decode).deactivateCursor();
            }
        }
        this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, managedLedger);
        if (managedLedger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
            this.topicEpoch = Optional.of(Long.valueOf(Long.parseLong(managedLedger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME))));
        }
        checkReplicatedSubscriptionControllerState();
        TopicName topicName = TopicName.get(str);
        if (!brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() || EventsTopicNames.checkTopicIsEventsNames(topicName)) {
            this.transactionBuffer = new TransactionBufferDisable();
        } else {
            this.transactionBuffer = brokerService.getPulsar().getTransactionBufferProvider().newTransactionBuffer(this);
        }
        this.transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) managedLedger.getLastConfirmedEntry());
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> initialize() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(initTopicPolicy());
        for (ManagedCursor managedCursor : this.ledger.getCursors()) {
            if (managedCursor.getName().startsWith(this.replicatorPrefix)) {
                arrayList.add(addReplicationCluster(PersistentReplicator.getRemoteCluster(managedCursor.getName()), managedCursor, this.brokerService.pulsar().getConfiguration().getClusterName()));
            }
        }
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList).thenCompose(r4 -> {
            return this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(TopicName.get(this.topic).getNamespaceObject()).thenAccept(optional -> {
                if (!optional.isPresent()) {
                    this.isEncryptionRequired = false;
                    updatePublishDispatcher();
                    updateResourceGroupLimiter(optional);
                    initializeDispatchRateLimiterIfNeeded();
                    updateSubscribeRateLimiter();
                    return;
                }
                Policies policies = (Policies) optional.get();
                updateTopicPolicyByNamespacePolicy(policies);
                initializeDispatchRateLimiterIfNeeded();
                updateSubscribeRateLimiter();
                updatePublishDispatcher();
                updateResourceGroupLimiter(optional);
                this.isEncryptionRequired = policies.encryption_required;
                this.isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
                this.schemaValidationEnforced = policies.schema_validation_enforced;
            }).exceptionally(th -> {
                log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", this.topic, th.getMessage());
                this.isEncryptionRequired = false;
                return null;
            });
        });
    }

    @VisibleForTesting
    PersistentTopic(String str, BrokerService brokerService, ManagedLedger managedLedger, MessageDeduplication messageDeduplication) {
        super(str, brokerService);
        this.dispatchRateLimiter = Optional.empty();
        this.dispatchRateLimiterLock = new Object();
        this.subscribeRateLimiter = Optional.empty();
        this.currentCompaction = CompletableFuture.completedFuture(Long.valueOf(COMPACTION_NEVER_RUN));
        this.currentOffload = CompletableFuture.completedFuture((MessageIdImpl) MessageId.earliest);
        this.replicatedSubscriptionsController = Optional.empty();
        this.pendingWriteOps = new AtomicLong(0L);
        this.lastUpdatedAvgPublishRateInMsg = 0.0d;
        this.lastUpdatedAvgPublishRateInByte = 0.0d;
        this.isClosingOrDeleting = false;
        this.fencedTopicMonitoringTask = null;
        this.lastDataMessagePublishedTimestamp = 0L;
        this.ledger = managedLedger;
        this.messageDeduplication = messageDeduplication;
        this.subscriptions = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        this.replicators = ConcurrentOpenHashMap.newBuilder().expectedItems(16).concurrencyLevel(1).build();
        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
        this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
            this.transactionBuffer = brokerService.getPulsar().getTransactionBufferProvider().newTransactionBuffer(this);
        } else {
            this.transactionBuffer = new TransactionBufferDisable();
        }
    }

    private void initializeDispatchRateLimiterIfNeeded() {
        synchronized (this.dispatchRateLimiterLock) {
            if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateEnabled(this.topicPolicies.getDispatchRate().get())) {
                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, DispatchRateLimiter.Type.TOPIC));
            }
        }
    }

    public PersistentSubscription createPersistentSubscription(String str, ManagedCursor managedCursor, boolean z, Map<String, String> map) {
        Preconditions.checkNotNull(this.compactedTopic);
        return str.equals(Compactor.COMPACTION_SUBSCRIPTION) ? new CompactorSubscription(this, this.compactedTopic, str, managedCursor) : new PersistentSubscription(this, str, managedCursor, z, map);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void publishMessage(ByteBuf byteBuf, Topic.PublishContext publishContext) {
        this.pendingWriteOps.incrementAndGet();
        if (this.isFenced) {
            publishContext.completed(new BrokerServiceException.TopicFencedException("fenced"), -1L, -1L);
            decrementPendingWriteOpsAndCheck();
            return;
        }
        if (isExceedMaximumMessageSize(byteBuf.readableBytes(), publishContext)) {
            publishContext.completed(new BrokerServiceException.NotAllowedException("Exceed maximum message size"), -1L, -1L);
            decrementPendingWriteOpsAndCheck();
            return;
        }
        switch (this.messageDeduplication.isDuplicate(publishContext, byteBuf)) {
            case NotDup:
                asyncAddEntry(byteBuf, publishContext);
                return;
            case Dup:
                publishContext.completed(null, -1L, -1L);
                decrementPendingWriteOpsAndCheck();
                return;
            default:
                publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1L, -1L);
                decrementPendingWriteOpsAndCheck();
                return;
        }
    }

    public void updateSubscribeRateLimiter() {
        SubscribeRate subscribeRate = getSubscribeRate();
        synchronized (this.subscribeRateLimiter) {
            if (SubscribeRateLimiter.isSubscribeRateEnabled(subscribeRate)) {
                if (this.subscribeRateLimiter.isPresent()) {
                    this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate);
                } else {
                    this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
                }
            } else if (this.subscribeRateLimiter.isPresent()) {
                this.subscribeRateLimiter.get().close();
                this.subscribeRateLimiter = Optional.empty();
            }
        }
    }

    private void asyncAddEntry(ByteBuf byteBuf, Topic.PublishContext publishContext) {
        if (this.brokerService.isBrokerEntryMetadataEnabled()) {
            this.ledger.asyncAddEntry(byteBuf, (int) publishContext.getNumberOfMessages(), this, publishContext);
        } else {
            this.ledger.asyncAddEntry(byteBuf, this, publishContext);
        }
    }

    public void asyncReadEntry(PositionImpl positionImpl, AsyncCallbacks.ReadEntryCallback readEntryCallback, Object obj) {
        if (this.ledger instanceof ManagedLedgerImpl) {
            ((ManagedLedgerImpl) this.ledger).asyncReadEntry(positionImpl, readEntryCallback, obj);
        } else {
            readEntryCallback.readEntryFailed(new ManagedLedgerException("Unexpected managedledger implementation, doesn't support direct read entry operation."), obj);
        }
    }

    public PositionImpl getPositionAfterN(PositionImpl positionImpl, long j) throws ManagedLedgerException {
        if (this.ledger instanceof ManagedLedgerImpl) {
            return ((ManagedLedgerImpl) this.ledger).getPositionAfterN(positionImpl, j, ManagedLedgerImpl.PositionBound.startExcluded);
        }
        throw new ManagedLedgerException("Unexpected managedledger implementation, doesn't support getPositionAfterN operation.");
    }

    public PositionImpl getFirstPosition() throws ManagedLedgerException {
        if (this.ledger instanceof ManagedLedgerImpl) {
            return ((ManagedLedgerImpl) this.ledger).getFirstPosition();
        }
        throw new ManagedLedgerException("Unexpected managedledger implementation, doesn't support getFirstPosition operation.");
    }

    public long getNumberOfEntries() {
        return this.ledger.getNumberOfEntries();
    }

    private void decrementPendingWriteOpsAndCheck() {
        if (this.pendingWriteOps.decrementAndGet() == 0 && this.isFenced && !this.isClosingOrDeleting) {
            synchronized (this) {
                if (this.isFenced && !this.isClosingOrDeleting) {
                    this.messageDeduplication.resetHighestSequenceIdPushed();
                    log.info("[{}] Un-fencing topic...", this.topic);
                    this.ledger.readyToCreateNewLedger();
                    unfence();
                }
            }
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback
    public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
        Topic.PublishContext publishContext = (Topic.PublishContext) obj;
        PositionImpl positionImpl = (PositionImpl) position;
        this.messageDeduplication.recordMessagePersisted(publishContext, positionImpl);
        if (!publishContext.isMarkerMessage()) {
            this.lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
        }
        this.transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) this.ledger.getLastConfirmedEntry());
        publishContext.setMetadataFromEntryData(byteBuf);
        publishContext.completed(null, positionImpl.getLedgerId(), positionImpl.getEntryId());
        decrementPendingWriteOpsAndCheck();
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback
    public synchronized void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
        CompletableFuture<Void> completedFuture;
        if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            close();
            return;
        }
        fence();
        if (this.producers.size() > 0) {
            ArrayList newArrayList = Lists.newArrayList();
            this.producers.forEach((str, producer) -> {
                newArrayList.add(producer.disconnect());
            });
            completedFuture = FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList);
        } else {
            completedFuture = CompletableFuture.completedFuture(null);
        }
        completedFuture.handle((BiFunction<? super Void, Throwable, ? extends U>) (r3, th) -> {
            decrementPendingWriteOpsAndCheck();
            return null;
        });
        Topic.PublishContext publishContext = (Topic.PublishContext) obj;
        if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to persist msg in store: {}", this.topic, managedLedgerException.getMessage());
            }
            publishContext.completed(new BrokerServiceException.TopicClosedException(managedLedgerException), -1L, -1L);
        } else {
            log.warn("[{}] Failed to persist msg in store: {}", this.topic, managedLedgerException.getMessage());
            if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerTerminatedException) {
                publishContext.completed(new BrokerServiceException.TopicTerminatedException(managedLedgerException), -1L, -1L);
            } else {
                publishContext.completed(new BrokerServiceException.PersistenceException(managedLedgerException), -1L, -1L);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic, org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> completableFuture) {
        return super.addProducer(producer, completableFuture).thenCompose(optional -> {
            this.messageDeduplication.producerAdded(producer.getProducerName());
            return startReplProducers().thenApply(r3 -> {
                return optional;
            });
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean z) {
        return getTransactionBuffer().checkIfTBRecoverCompletely(z);
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> optional) {
        return setTopicEpoch(optional.orElse(-1L).longValue() + 1);
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    protected CompletableFuture<Long> setTopicEpoch(long j) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        this.ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(j), new AsyncCallbacks.UpdatePropertiesCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.2
            final /* synthetic */ long val$newEpoch;
            final /* synthetic */ CompletableFuture val$future;

            AnonymousClass2(long j2, CompletableFuture completableFuture2) {
                r6 = j2;
                r8 = completableFuture2;
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback
            public void updatePropertiesComplete(Map<String, String> map, Object obj) {
                PersistentTopic.log.info("[{}] Updated topic epoch to {}", PersistentTopic.this.getName(), Long.valueOf(r6));
                r8.complete(Long.valueOf(r6));
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback
            public void updatePropertiesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTopic.log.warn("[{}] Failed to update topic epoch to {}: {}", new Object[]{PersistentTopic.this.getName(), Long.valueOf(r6), managedLedgerException.getMessage()});
                r8.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture2;
    }

    private boolean hasRemoteProducers() {
        if (this.producers.isEmpty()) {
            return false;
        }
        Iterator<Producer> it = this.producers.values().iterator();
        while (it.hasNext()) {
            if (it.next().isRemote()) {
                return true;
            }
        }
        return false;
    }

    public CompletableFuture<Void> startReplProducers() {
        return this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(TopicName.get(this.topic).getNamespaceObject()).thenAccept(optional -> {
            if (!optional.isPresent()) {
                this.replicators.forEach((str, replicator) -> {
                    replicator.startProducer();
                });
            } else if (((Policies) optional.get()).replication_clusters != null) {
                TreeSet newTreeSet = Sets.newTreeSet(((Policies) optional.get()).replication_clusters);
                this.replicators.forEach((str2, replicator2) -> {
                    if (newTreeSet.contains(str2)) {
                        replicator2.startProducer();
                    }
                });
            }
        }).exceptionally(th -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies while starting repl-producers {}", this.topic, th.getMessage());
            }
            this.replicators.forEach((str, replicator) -> {
                replicator.startProducer();
            });
            return null;
        });
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList newArrayList = Lists.newArrayList();
        this.replicators.forEach((str, replicator) -> {
            newArrayList.add(replicator.disconnect());
        });
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList);
    }

    private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
        ArrayList newArrayList = Lists.newArrayList();
        this.replicators.forEach((str, replicator) -> {
            newArrayList.add(replicator.disconnect(true));
        });
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList);
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    public void handleProducerRemoved(Producer producer) {
        super.handleProducerRemoved(producer);
        this.messageDeduplication.producerRemoved(producer.getProducerName());
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Consumer> subscribe(SubscriptionOption subscriptionOption) {
        return internalSubscribe(subscriptionOption.getCnx(), subscriptionOption.getSubscriptionName(), subscriptionOption.getConsumerId(), subscriptionOption.getSubType(), subscriptionOption.getPriorityLevel(), subscriptionOption.getConsumerName(), subscriptionOption.isDurable(), subscriptionOption.getStartMessageId(), subscriptionOption.getMetadata(), subscriptionOption.isReadCompacted(), subscriptionOption.getInitialPosition(), subscriptionOption.getStartMessageRollbackDurationSec(), subscriptionOption.isReplicatedSubscriptionStateArg(), subscriptionOption.getKeySharedMeta(), subscriptionOption.getSubscriptionProperties().orElse(Collections.emptyMap()), subscriptionOption.getConsumerEpoch());
    }

    private CompletableFuture<Consumer> internalSubscribe(TransportCnx transportCnx, String str, long j, CommandSubscribe.SubType subType, int i, String str2, boolean z, MessageId messageId, Map<String, String> map, boolean z2, CommandSubscribe.InitialPosition initialPosition, long j2, boolean z3, KeySharedMeta keySharedMeta, Map<String, String> map2, long j3) {
        return (!z2 || subType == CommandSubscribe.SubType.Failover || subType == CommandSubscribe.SubType.Exclusive) ? this.brokerService.checkTopicNsOwnership(getName()).thenCompose(r38 -> {
            boolean z4 = z3;
            if (z4 && !this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
                log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
                z4 = false;
            }
            if (subType == CommandSubscribe.SubType.Key_Shared && !this.brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
                return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Key_Shared subscription is disabled by broker."));
            }
            try {
                if (!this.topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME) && !checkSubscriptionTypesEnable(subType)) {
                    return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Topic[{" + this.topic + "}] doesn't support " + subType.name() + " sub type!"));
                }
                if (StringUtils.isBlank(str)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Empty subscription name", this.topic);
                    }
                    return FutureUtil.failedFuture(new BrokerServiceException.NamingException("Empty subscription name"));
                }
                if (this.hasBatchMessagePublished && !transportCnx.isBatchMessageCompatibleVersion()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Consumer doesn't support batch-message {}", this.topic, str);
                    }
                    return FutureUtil.failedFuture(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
                }
                if (str.startsWith(this.replicatorPrefix) || str.equals(DEDUPLICATION_CURSOR_NAME)) {
                    log.warn("[{}] Failed to create subscription for {}", this.topic, str);
                    return FutureUtil.failedFuture(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
                }
                if (transportCnx.clientAddress() != null && transportCnx.clientAddress().toString().contains(":")) {
                    SubscribeRateLimiter.ConsumerIdentifier consumerIdentifier = new SubscribeRateLimiter.ConsumerIdentifier(transportCnx.clientAddress().toString().split(":")[0], str2, j);
                    if (this.subscribeRateLimiter.isPresent() && (!this.subscribeRateLimiter.get().subscribeAvailable(consumerIdentifier) || !this.subscribeRateLimiter.get().tryAcquire(consumerIdentifier))) {
                        log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}", new Object[]{this.topic, str, consumerIdentifier, this.subscribeRateLimiter.get().getSubscribeRate(), Long.valueOf(this.subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumerIdentifier))});
                        return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
                    }
                }
                this.lock.readLock().lock();
                try {
                    if (this.isFenced) {
                        log.warn("[{}] Attempting to subscribe to a fenced topic", this.topic);
                        CompletableFuture failedFuture = FutureUtil.failedFuture(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                        this.lock.readLock().unlock();
                        return failedFuture;
                    }
                    handleConsumerAdded(str, str2);
                    this.lock.readLock().unlock();
                    CompletableFuture<? extends Subscription> durableSubscription = z ? getDurableSubscription(str, initialPosition, j2, z4, map2) : getNonDurableSubscription(str, messageId, initialPosition, j2, z2, map2);
                    CompletableFuture<U> thenCompose = durableSubscription.thenCompose(subscription -> {
                        Consumer consumer = new Consumer(subscription, subType, this.topic, j, i, str2, z, transportCnx, transportCnx.getAuthRole(), map, z2, initialPosition, keySharedMeta, messageId, j3);
                        return addConsumerToSubscription(subscription, consumer).thenCompose(r15 -> {
                            checkBackloggedCursors();
                            if (transportCnx.isActive()) {
                                checkReplicatedSubscriptionControllerState();
                                log.info("[{}][{}] Created new subscription for {}", new Object[]{this.topic, str, Long.valueOf(j)});
                                return CompletableFuture.completedFuture(consumer);
                            }
                            try {
                                consumer.close();
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", new Object[]{this.topic, str, consumer.consumerName(), Long.valueOf(currentUsageCount())});
                                }
                                decrementUsageCount();
                                return FutureUtil.failedFuture(new BrokerServiceException("Connection was closed while the opening the cursor "));
                            } catch (BrokerServiceException e) {
                                if (e instanceof BrokerServiceException.ConsumerBusyException) {
                                    log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, str, Long.valueOf(j), str2});
                                } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                                    log.warn("[{}][{}] {}", new Object[]{this.topic, str, e.getMessage()});
                                }
                                decrementUsageCount();
                                return FutureUtil.failedFuture(e);
                            }
                        });
                    });
                    thenCompose.exceptionally((Function<Throwable, ? extends U>) th -> {
                        decrementUsageCount();
                        if (!(th.getCause() instanceof BrokerServiceException.ConsumerBusyException)) {
                            if (th.getCause() instanceof BrokerServiceException.SubscriptionBusyException) {
                                log.warn("[{}][{}] {}", new Object[]{this.topic, str, th.getMessage()});
                                return null;
                            }
                            log.error("[{}] Failed to create subscription: {}", new Object[]{this.topic, str, th});
                            return null;
                        }
                        log.warn("[{}][{}] Consumer {} {} already connected", new Object[]{this.topic, str, Long.valueOf(j), str2});
                        Consumer consumer = null;
                        try {
                            consumer = durableSubscription.isDone() ? getActiveConsumer((Subscription) durableSubscription.get()) : null;
                            if (consumer != null && !consumer.cnx().isActive()) {
                                consumer.close();
                            }
                            return null;
                        } catch (Exception e) {
                            log.error("Failed to clean up consumer on closed connection {}, {}", consumer, e.getMessage());
                            return null;
                        }
                    });
                    return thenCompose;
                } catch (Throwable th2) {
                    this.lock.readLock().unlock();
                    throw th2;
                }
            } catch (Exception e) {
                return FutureUtil.failedFuture(e);
            }
        }) : FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Consumer> subscribe(TransportCnx transportCnx, String str, long j, CommandSubscribe.SubType subType, int i, String str2, boolean z, MessageId messageId, Map<String, String> map, boolean z2, CommandSubscribe.InitialPosition initialPosition, long j2, boolean z3, KeySharedMeta keySharedMeta) {
        return internalSubscribe(transportCnx, str, j, subType, i, str2, z, messageId, map, z2, initialPosition, j2, z3, keySharedMeta, null, -1L);
    }

    private CompletableFuture<Subscription> getDurableSubscription(String str, CommandSubscribe.InitialPosition initialPosition, long j, boolean z, Map<String, String> map) {
        CompletableFuture<Subscription> completableFuture = new CompletableFuture<>();
        if (checkMaxSubscriptionsPerTopicExceed(str)) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Exceed the maximum number of subscriptions of the topic: " + this.topic));
            return completableFuture;
        }
        this.ledger.asyncOpenCursor(Codec.encode(str), initialPosition, PersistentSubscription.getBaseCursorProperties(z), map, new AsyncCallbacks.OpenCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.3
            final /* synthetic */ String val$subscriptionName;
            final /* synthetic */ boolean val$replicated;
            final /* synthetic */ Map val$subscriptionProperties;
            final /* synthetic */ CompletableFuture val$subscriptionFuture;
            final /* synthetic */ long val$startMessageRollbackDurationSec;

            AnonymousClass3(String str2, boolean z2, Map map2, CompletableFuture completableFuture2, long j2) {
                r6 = str2;
                r7 = z2;
                r8 = map2;
                r9 = completableFuture2;
                r10 = j2;
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorComplete(ManagedCursor managedCursor, Object obj) {
                if (PersistentTopic.log.isDebugEnabled()) {
                    PersistentTopic.log.debug("[{}][{}] Opened cursor", PersistentTopic.this.topic, r6);
                }
                PersistentSubscription persistentSubscription = (PersistentSubscription) PersistentTopic.this.subscriptions.get(r6);
                if (persistentSubscription == null) {
                    ConcurrentOpenHashMap concurrentOpenHashMap = PersistentTopic.this.subscriptions;
                    String str2 = r6;
                    String str22 = r6;
                    boolean z2 = r7;
                    Map map2 = r8;
                    persistentSubscription = (PersistentSubscription) concurrentOpenHashMap.computeIfAbsent(str2, str3 -> {
                        return PersistentTopic.this.createPersistentSubscription(str22, managedCursor, z2, map2);
                    });
                } else if (persistentSubscription.getCursor() != null && !persistentSubscription.getCursor().isDurable()) {
                    r9.completeExceptionally(new BrokerServiceException.NotAllowedException("NonDurable subscription with the same name already exists."));
                    return;
                }
                if (r7 && !persistentSubscription.isReplicated()) {
                    persistentSubscription.setReplicated(r7);
                }
                if (r10 > 0) {
                    PersistentTopic.this.resetSubscriptionCursor(persistentSubscription, r9, r10);
                } else {
                    r9.complete(persistentSubscription);
                }
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTopic.log.warn("[{}] Failed to create subscription for {}: {}", new Object[]{PersistentTopic.this.topic, r6, managedLedgerException.getMessage()});
                PersistentTopic.this.decrementUsageCount();
                r9.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                    PersistentTopic.this.close();
                }
            }
        }, null);
        return completableFuture2;
    }

    private CompletableFuture<? extends Subscription> getNonDurableSubscription(String str, MessageId messageId, CommandSubscribe.InitialPosition initialPosition, long j, boolean z, Map<String, String> map) {
        log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}", new Object[]{this.topic, str, messageId, map});
        CompletableFuture<Subscription> completableFuture = new CompletableFuture<>();
        if (checkMaxSubscriptionsPerTopicExceed(str)) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Exceed the maximum number of subscriptions of the topic: " + this.topic));
            return completableFuture;
        }
        synchronized (this.ledger) {
            PersistentSubscription persistentSubscription = this.subscriptions.get(str);
            if (persistentSubscription == null) {
                MessageIdImpl messageIdImpl = messageId != null ? (MessageIdImpl) messageId : (MessageIdImpl) MessageId.latest;
                long ledgerId = messageIdImpl.getLedgerId();
                long entryId = messageIdImpl.getEntryId();
                if (ledgerId >= 0 && entryId >= 0 && (messageIdImpl instanceof BatchMessageIdImpl)) {
                    entryId = messageIdImpl.getEntryId() - 1;
                }
                try {
                    persistentSubscription = new PersistentSubscription(this, str, this.ledger.newNonDurableCursor(new PositionImpl(ledgerId, entryId), str, initialPosition, z), false, map);
                    this.subscriptions.put(str, persistentSubscription);
                } catch (ManagedLedgerException e) {
                    return FutureUtil.failedFuture(e);
                }
            } else if (persistentSubscription.getCursor() != null && persistentSubscription.getCursor().isDurable()) {
                return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Durable subscription with the same name already exists."));
            }
            if (j <= 0) {
                return CompletableFuture.completedFuture(persistentSubscription);
            }
            resetSubscriptionCursor(persistentSubscription, completableFuture, j);
            return completableFuture;
        }
    }

    public void resetSubscriptionCursor(Subscription subscription, CompletableFuture<Subscription> completableFuture, long j) {
        subscription.resetCursor(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(j)).handle((r14, th) -> {
            if (th != null) {
                log.warn("[{}] Failed to reset cursor {} position at timestamp {}, caused by {}", new Object[]{this.topic, subscription.getName(), Long.valueOf(j), th.getMessage()});
            }
            completableFuture.complete(subscription);
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Subscription> createSubscription(String str, CommandSubscribe.InitialPosition initialPosition, boolean z, Map<String, String> map) {
        return getDurableSubscription(str, initialPosition, 0L, z, map);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> unsubscribe(String str) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
            getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(this.topic, Codec.encode(str))).getPersistenceNamingEncoding(), new AsyncCallbacks.DeleteLedgerCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.4
                final /* synthetic */ String val$subscriptionName;
                final /* synthetic */ CompletableFuture val$unsubscribeFuture;

                AnonymousClass4(String str2, CompletableFuture completableFuture2) {
                    r5 = str2;
                    r6 = completableFuture2;
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
                public void deleteLedgerComplete(Object obj) {
                    PersistentTopic.this.asyncDeleteCursor(r5, r6);
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
                public void deleteLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    if (managedLedgerException instanceof ManagedLedgerException.MetadataNotFoundException) {
                        PersistentTopic.this.asyncDeleteCursor(r5, r6);
                    } else {
                        r6.completeExceptionally(managedLedgerException);
                        PersistentTopic.log.error("[{}][{}] Error deleting subscription pending ack store", new Object[]{PersistentTopic.this.topic, r5, managedLedgerException});
                    }
                }
            }, null);
        } else {
            asyncDeleteCursor(str2, completableFuture2);
        }
        return completableFuture2;
    }

    public void asyncDeleteCursor(String str, CompletableFuture<Void> completableFuture) {
        this.ledger.asyncDeleteCursor(Codec.encode(str), new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.5
            final /* synthetic */ String val$subscriptionName;
            final /* synthetic */ CompletableFuture val$unsubscribeFuture;

            AnonymousClass5(String str2, CompletableFuture completableFuture2) {
                r5 = str2;
                r6 = completableFuture2;
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
            public void deleteCursorComplete(Object obj) {
                if (PersistentTopic.log.isDebugEnabled()) {
                    PersistentTopic.log.debug("[{}][{}] Cursor deleted successfully", PersistentTopic.this.topic, r5);
                }
                PersistentTopic.this.removeSubscription(r5);
                r6.complete(null);
                PersistentTopic.access$902(PersistentTopic.this, System.nanoTime());
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
            public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                if (PersistentTopic.log.isDebugEnabled()) {
                    PersistentTopic.log.debug("[{}][{}] Error deleting cursor for subscription", new Object[]{PersistentTopic.this.topic, r5, managedLedgerException});
                }
                if (!(managedLedgerException instanceof ManagedLedgerException.ManagedLedgerNotFoundException)) {
                    r6.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                } else {
                    r6.complete(null);
                    PersistentTopic.access$1102(PersistentTopic.this, System.nanoTime());
                }
            }
        }, null);
    }

    public void removeSubscription(String str) {
        PersistentSubscription remove = this.subscriptions.remove(str);
        if (remove != null) {
            SubscriptionStatsImpl stats = remove.getStats(false, false, false);
            this.bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
            this.msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> delete() {
        return delete(false, false, false);
    }

    private CompletableFuture<Void> delete(boolean z, boolean z2, boolean z3) {
        return delete(z, z2, false, z3);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> deleteForcefully() {
        return delete(false, false, true, false);
    }

    private CompletableFuture<Void> delete(boolean z, boolean z2, boolean z3, boolean z4) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.lock.writeLock().lock();
        try {
            if (this.isClosingOrDeleting) {
                log.warn("[{}] Topic is already being closed or deleted", this.topic);
                CompletableFuture<Void> failedFuture = FutureUtil.failedFuture(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                this.lock.writeLock().unlock();
                return failedFuture;
            }
            if (z && !this.subscriptions.isEmpty()) {
                CompletableFuture<Void> failedFuture2 = FutureUtil.failedFuture(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                this.lock.writeLock().unlock();
                return failedFuture2;
            }
            if (z2 && hasBacklogs()) {
                CompletableFuture<Void> failedFuture3 = FutureUtil.failedFuture(new BrokerServiceException.TopicBusyException("Topic has subscriptions did not catch up"));
                this.lock.writeLock().unlock();
                return failedFuture3;
            }
            fenceTopicToCloseOrDelete();
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (z3) {
                ArrayList newArrayList = Lists.newArrayList();
                this.replicators.forEach((str, replicator) -> {
                    newArrayList.add(replicator.disconnect());
                });
                this.producers.values().forEach(producer -> {
                    newArrayList.add(producer.disconnect());
                });
                this.subscriptions.forEach((str2, persistentSubscription) -> {
                    newArrayList.add(persistentSubscription.disconnect());
                });
                FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList).thenRun(() -> {
                    completableFuture2.complete(null);
                }).exceptionally(th -> {
                    log.error("[{}] Error closing clients", this.topic, th);
                    unfenceTopicToResume();
                    completableFuture2.completeExceptionally(th);
                    return null;
                });
            } else {
                completableFuture2.complete(null);
            }
            completableFuture2.thenAccept(r12 -> {
                if (currentUsageCount() != 0 && (!z3 || z)) {
                    unfenceTopicToResume();
                    completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + currentUsageCount() + " connected producers/consumers"));
                } else {
                    CompletableFuture<Void> completableFuture3 = new CompletableFuture<>();
                    this.brokerService.deleteTopicAuthenticationWithRetry(this.topic, completableFuture3, 5);
                    completableFuture3.thenCompose(r4 -> {
                        return z4 ? deleteSchema() : CompletableFuture.completedFuture(null);
                    }).thenAccept((java.util.function.Consumer<? super U>) schemaVersion -> {
                        deleteTopicPolicies();
                    }).thenCompose(r3 -> {
                        return transactionBufferCleanupAndClose();
                    }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r7, th2) -> {
                        if (th2 != null) {
                            log.error("[{}] Error deleting topic", this.topic, th2);
                            unfenceTopicToResume();
                            completableFuture.completeExceptionally(th2);
                        } else {
                            ArrayList arrayList = new ArrayList();
                            this.subscriptions.forEach((str3, persistentSubscription2) -> {
                                arrayList.add(unsubscribe(str3));
                            });
                            FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList).whenComplete((r8, th2) -> {
                                if (th2 == null) {
                                    this.ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.6
                                        final /* synthetic */ CompletableFuture val$deleteFuture;

                                        AnonymousClass6(CompletableFuture completableFuture4) {
                                            r5 = completableFuture4;
                                        }

                                        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
                                        public void deleteLedgerComplete(Object obj) {
                                            PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                                            PersistentTopic.this.dispatchRateLimiter.ifPresent((v0) -> {
                                                v0.close();
                                            });
                                            PersistentTopic.this.subscribeRateLimiter.ifPresent((v0) -> {
                                                v0.close();
                                            });
                                            PersistentTopic.this.unregisterTopicPolicyListener();
                                            PersistentTopic.log.info("[{}] Topic deleted", PersistentTopic.this.topic);
                                            r5.complete(null);
                                        }

                                        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
                                        public void deleteLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                                            if (managedLedgerException.getCause() instanceof MetadataStoreException.NotFoundException) {
                                                PersistentTopic.log.info("[{}] Topic is already deleted {}", PersistentTopic.this.topic, managedLedgerException.getMessage());
                                                deleteLedgerComplete(obj);
                                            } else {
                                                PersistentTopic.this.unfenceTopicToResume();
                                                PersistentTopic.log.error("[{}] Error deleting topic", PersistentTopic.this.topic, managedLedgerException);
                                                r5.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                                            }
                                        }
                                    }, null);
                                    return;
                                }
                                log.error("[{}] Error deleting topic", this.topic, th2);
                                unfenceTopicToResume();
                                completableFuture4.completeExceptionally(th2);
                            });
                        }
                    });
                }
            }).exceptionally(th2 -> {
                unfenceTopicToResume();
                completableFuture4.completeExceptionally(new BrokerServiceException.TopicBusyException("Failed to close clients before deleting topic."));
                return null;
            });
            this.lock.writeLock().unlock();
            return completableFuture4;
        } catch (Throwable th3) {
            this.lock.writeLock().unlock();
            throw th3;
        }
    }

    public CompletableFuture<Void> close() {
        return close(false);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> close(boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.lock.writeLock().lock();
        try {
            if (this.isClosingOrDeleting && !z) {
                log.warn("[{}] Topic is already being closed or deleted", this.topic);
                completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                this.lock.writeLock().unlock();
                return completableFuture;
            }
            fenceTopicToCloseOrDelete();
            this.lock.writeLock().unlock();
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.add(this.transactionBuffer.closeAsync());
            this.replicators.forEach((str, replicator) -> {
                newArrayList.add(replicator.disconnect());
            });
            this.producers.values().forEach(producer -> {
                newArrayList.add(producer.disconnect());
            });
            if (this.topicPublishRateLimiter != null) {
                this.topicPublishRateLimiter.close();
            }
            this.subscriptions.forEach((str2, persistentSubscription) -> {
                newArrayList.add(persistentSubscription.disconnect());
            });
            if (this.resourceGroupPublishLimiter != null) {
                this.resourceGroupPublishLimiter.unregisterRateLimitFunction(getName());
            }
            (z ? CompletableFuture.completedFuture(null) : FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList)).thenRun(() -> {
                this.ledger.asyncClose(new AsyncCallbacks.CloseCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.7
                    final /* synthetic */ CompletableFuture val$closeFuture;

                    AnonymousClass7(CompletableFuture completableFuture2) {
                        r5 = completableFuture2;
                    }

                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback
                    public void closeComplete(Object obj) {
                        CompletableFuture<Void> removeTopicFromCache = PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                        CompletableFuture completableFuture2 = r5;
                        CompletableFuture<Void> thenRun = removeTopicFromCache.thenRun(() -> {
                            PersistentTopic.this.replicatedSubscriptionsController.ifPresent((v0) -> {
                                v0.close();
                            });
                            PersistentTopic.this.dispatchRateLimiter.ifPresent((v0) -> {
                                v0.close();
                            });
                            PersistentTopic.this.subscribeRateLimiter.ifPresent((v0) -> {
                                v0.close();
                            });
                            PersistentTopic.this.unregisterTopicPolicyListener();
                            PersistentTopic.log.info("[{}] Topic closed", PersistentTopic.this.topic);
                            PersistentTopic.this.cancelFencedTopicMonitoringTask();
                            completableFuture2.complete(null);
                        });
                        CompletableFuture completableFuture22 = r5;
                        thenRun.exceptionally(th -> {
                            completableFuture22.completeExceptionally(th);
                            return null;
                        });
                    }

                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback
                    public void closeFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        PersistentTopic.log.error("[{}] Failed to close managed ledger, proceeding anyway.", PersistentTopic.this.topic, managedLedgerException);
                        PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                        r5.complete(null);
                    }
                }, null);
            }).exceptionally(th -> {
                log.error("[{}] Error closing topic", this.topic, th);
                unfenceTopicToResume();
                completableFuture2.completeExceptionally(th);
                return null;
            });
            return completableFuture2;
        } catch (Throwable th2) {
            this.lock.writeLock().unlock();
            throw th2;
        }
    }

    @VisibleForTesting
    CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        checkReplication().thenAccept(r6 -> {
            log.info("[{}] Policies updated successfully", this.topic);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", new Object[]{this.topic, th.getMessage(), Long.valueOf(POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS), th});
            if (!(th.getCause() instanceof BrokerServiceException.TopicFencedException)) {
                this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
            }
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Void> checkDeduplicationStatus() {
        return this.messageDeduplication.checkStatus();
    }

    private CompletableFuture<Void> checkPersistencePolicies() {
        TopicName topicName = TopicName.get(this.topic);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.brokerService.getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> {
            this.ledger.setConfig(managedLedgerConfig);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.warn("[{}] Failed to update persistence-policies {}", this.topic, th.getMessage());
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> checkReplication() {
        TopicName topicName = TopicName.get(this.topic);
        if (!topicName.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        if (topicName.getNamespaceObject().equals(this.brokerService.pulsar().getHeartbeatNamespaceV2())) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", topicName);
        }
        List<String> list = this.topicPolicies.getReplicationClusters().get();
        int intValue = this.topicPolicies.getMessageTTLInSeconds().get().intValue();
        String clusterName = this.brokerService.pulsar().getConfiguration().getClusterName();
        if (TopicName.get(this.topic).isGlobal() && !list.contains(clusterName)) {
            log.info("Deleting topic [{}] because local cluster is not part of  global namespace repl list {}", this.topic, list);
            return deleteForcefully();
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (String str : list) {
            if (!str.equals(clusterName) && !this.replicators.containsKey(str)) {
                newArrayList.add(startReplicator(str));
            }
        }
        this.replicators.forEach((str2, replicator) -> {
            ((PersistentReplicator) replicator).updateMessageTTL(intValue);
            if (str2.equals(clusterName) || list.contains(str2)) {
                return;
            }
            newArrayList.add(removeReplicator(str2));
        });
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkMessageExpiry() {
        int intValue = this.topicPolicies.getMessageTTLInSeconds().get().intValue();
        if (intValue != 0) {
            this.subscriptions.forEach((str, persistentSubscription) -> {
                persistentSubscription.expireMessages(intValue);
            });
            this.replicators.forEach((str2, replicator) -> {
                ((PersistentReplicator) replicator).expireMessages(intValue);
            });
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkMessageDeduplicationInfo() {
        this.messageDeduplication.purgeInactiveProducers();
    }

    public boolean isCompactionEnabled() {
        Long l = this.topicPolicies.getCompactionThreshold().get();
        return l != null && l.longValue() > 0;
    }

    public void checkCompaction() {
        long totalSize;
        TopicName topicName = TopicName.get(this.topic);
        try {
            long longValue = this.topicPolicies.getCompactionThreshold().get().longValue();
            if (isCompactionEnabled() && this.currentCompaction.isDone()) {
                PersistentSubscription persistentSubscription = this.subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION);
                if (persistentSubscription != null) {
                    totalSize = persistentSubscription.estimateBacklogSize();
                } else {
                    totalSize = (this.subscriptions.isEmpty() || this.subscriptions.values().stream().noneMatch(persistentSubscription2 -> {
                        return persistentSubscription2.getCursor().isDurable();
                    })) ? this.ledger.getTotalSize() : this.ledger.getEstimatedBacklogSize();
                }
                if (totalSize > longValue) {
                    try {
                        triggerCompaction();
                    } catch (BrokerServiceException.AlreadyRunningException e) {
                        log.debug("[{}] Compaction already running, so don't trigger again, even though backlog({}) is over threshold({})", new Object[]{topicName, Long.valueOf(totalSize), Long.valueOf(longValue)});
                    }
                }
            }
        } catch (Exception e2) {
            log.debug("[{}] Error getting policies", this.topic);
        }
    }

    public CompletableFuture<Void> preCreateSubscriptionForCompactionIfNeeded() {
        if (!this.subscriptions.containsKey(Compactor.COMPACTION_SUBSCRIPTION) && isCompactionEnabled()) {
            return createSubscription(Compactor.COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null).thenCompose(subscription -> {
                return CompletableFuture.completedFuture(null);
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    CompletableFuture<Void> startReplicator(String str) {
        log.info("[{}] Starting replicator to remote: {}", this.topic, str);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.ledger.asyncOpenCursor(PersistentReplicator.getReplicatorName(this.replicatorPrefix, str), new AsyncCallbacks.OpenCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.8
            final /* synthetic */ String val$remoteCluster;
            final /* synthetic */ CompletableFuture val$future;

            AnonymousClass8(String str2, CompletableFuture completableFuture2) {
                r5 = str2;
                r6 = completableFuture2;
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorComplete(ManagedCursor managedCursor, Object obj) {
                CompletableFuture<Void> addReplicationCluster = PersistentTopic.this.addReplicationCluster(r5, managedCursor, PersistentTopic.this.brokerService.pulsar().getConfiguration().getClusterName());
                CompletableFuture completableFuture2 = r6;
                addReplicationCluster.whenComplete((r4, th) -> {
                    if (th == null) {
                        completableFuture2.complete(null);
                    } else {
                        completableFuture2.completeExceptionally(th);
                    }
                });
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                r6.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }, null);
        return completableFuture2;
    }

    protected CompletableFuture<Void> addReplicationCluster(String str, ManagedCursor managedCursor, String str2) {
        return AbstractReplicator.validatePartitionedTopicAsync(getName(), this.brokerService).thenCompose(r6 -> {
            return this.brokerService.pulsar().getPulsarResources().getClusterResources().getClusterAsync(str).thenApply(optional -> {
                return this.brokerService.getReplicationClient(str, optional);
            });
        }).thenAccept((java.util.function.Consumer<? super U>) pulsarClient -> {
            if (this.replicators.computeIfAbsent(str, str3 -> {
                try {
                    return new PersistentReplicator(this, managedCursor, str2, str, this.brokerService, (PulsarClientImpl) pulsarClient);
                } catch (PulsarServerException e) {
                    log.error("[{}] Replicator startup failed {}", new Object[]{this.topic, str, e});
                    return null;
                }
            }) == null) {
                this.replicators.removeNullValue(str);
            }
        });
    }

    CompletableFuture<Void> removeReplicator(String str) {
        log.info("[{}] Removing replicator to {}", this.topic, str);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        String replicatorName = PersistentReplicator.getReplicatorName(this.replicatorPrefix, str);
        this.replicators.get(str).disconnect().thenRun(() -> {
            this.ledger.asyncDeleteCursor(replicatorName, new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.9
                final /* synthetic */ String val$remoteCluster;
                final /* synthetic */ CompletableFuture val$future;
                final /* synthetic */ String val$name;

                AnonymousClass9(String str2, CompletableFuture completableFuture2, String replicatorName2) {
                    r5 = str2;
                    r6 = completableFuture2;
                    r7 = replicatorName2;
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
                public void deleteCursorComplete(Object obj) {
                    PersistentTopic.this.replicators.remove(r5);
                    r6.complete(null);
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
                public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    PersistentTopic.log.error("[{}] Failed to delete cursor {} {}", new Object[]{PersistentTopic.this.topic, r7, managedLedgerException.getMessage(), managedLedgerException});
                    r6.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                }
            }, null);
        }).exceptionally(th -> {
            log.error("[{}] Failed to close replication producer {} {}", new Object[]{this.topic, replicatorName2, th.getMessage(), th});
            completableFuture2.completeExceptionally(th);
            return null;
        });
        return completableFuture2;
    }

    public boolean isDeduplicationEnabled() {
        return this.messageDeduplication.isEnabled();
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    public int getNumberOfConsumers() {
        int i = 0;
        Iterator<PersistentSubscription> it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            i += it.next().getConsumers().size();
        }
        return i;
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    public int getNumberOfSameAddressConsumers(String str) {
        return getNumberOfSameAddressConsumers(str, this.subscriptions.values());
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public PersistentSubscription getSubscription(String str) {
        return this.subscriptions.get(str);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
        return this.replicators;
    }

    public Replicator getPersistentReplicator(String str) {
        return this.replicators.get(str);
    }

    public ManagedLedger getManagedLedger() {
        return this.ledger;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void updateRates(NamespaceStats namespaceStats, NamespaceBundleStats namespaceBundleStats, StatsOutputStream statsOutputStream, ClusterReplicationMetrics clusterReplicationMetrics, String str, boolean z) {
        this.publishRateLimitedTimes = 0L;
        TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
        topicStatsHelper.reset();
        this.replicators.forEach((str2, replicator) -> {
            replicator.updateRates();
        });
        namespaceStats.producerCount += this.producers.size();
        namespaceBundleStats.producerCount += this.producers.size();
        statsOutputStream.startObject(this.topic);
        statsOutputStream.startList("publishers");
        this.producers.values().forEach(producer -> {
            producer.updateRates();
            PublisherStatsImpl stats = producer.getStats();
            topicStatsHelper.aggMsgRateIn += stats.msgRateIn;
            topicStatsHelper.aggMsgThroughputIn += stats.msgThroughputIn;
            if (producer.isRemote()) {
                topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), stats);
            }
            if (z) {
                StreamingStats.writePublisherStats(statsOutputStream, stats);
            }
        });
        statsOutputStream.endList();
        this.lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > this.lastUpdatedAvgPublishRateInMsg ? topicStatsHelper.aggMsgRateIn : (topicStatsHelper.aggMsgRateIn + this.lastUpdatedAvgPublishRateInMsg) / 2.0d;
        this.lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > this.lastUpdatedAvgPublishRateInByte ? topicStatsHelper.aggMsgThroughputIn : (topicStatsHelper.aggMsgThroughputIn + this.lastUpdatedAvgPublishRateInByte) / 2.0d;
        statsOutputStream.startObject(ReplicationStats.REPLICATION_SCOPE);
        namespaceStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
        this.replicators.forEach((str3, replicator2) -> {
            try {
                ((PersistentReplicator) replicator2).updateCursorState();
            } catch (Exception e) {
                log.warn("[{}] Failed to update cursro state ", this.topic, e);
            }
            ReplicatorStatsImpl stats = replicator2.getStats();
            PublisherStatsImpl publisherStatsImpl = topicStatsHelper.remotePublishersStats.get(replicator2.getRemoteCluster());
            stats.msgRateIn = publisherStatsImpl != null ? publisherStatsImpl.msgRateIn : 0.0d;
            stats.msgThroughputIn = publisherStatsImpl != null ? publisherStatsImpl.msgThroughputIn : 0.0d;
            stats.inboundConnection = publisherStatsImpl != null ? publisherStatsImpl.getAddress() : null;
            stats.inboundConnectedSince = publisherStatsImpl != null ? publisherStatsImpl.getConnectedSince() : null;
            topicStatsHelper.aggMsgRateOut += stats.msgRateOut;
            topicStatsHelper.aggMsgThroughputOut += stats.msgThroughputOut;
            statsOutputStream.startObject(str3);
            statsOutputStream.writePair("connected", stats.connected);
            statsOutputStream.writePair("msgRateExpired", stats.msgRateExpired);
            statsOutputStream.writePair("msgRateIn", stats.msgRateIn);
            statsOutputStream.writePair("msgRateOut", stats.msgRateOut);
            statsOutputStream.writePair("msgThroughputIn", stats.msgThroughputIn);
            statsOutputStream.writePair("msgThroughputOut", stats.msgThroughputOut);
            statsOutputStream.writePair("replicationBacklog", stats.replicationBacklog);
            statsOutputStream.writePair("replicationDelayInSeconds", stats.replicationDelayInSeconds);
            statsOutputStream.writePair("inboundConnection", stats.inboundConnection);
            statsOutputStream.writePair("inboundConnectedSince", stats.inboundConnectedSince);
            statsOutputStream.writePair("outboundConnection", stats.outboundConnection);
            statsOutputStream.writePair("outboundConnectedSince", stats.outboundConnectedSince);
            statsOutputStream.endObject();
            namespaceStats.msgReplBacklog += stats.replicationBacklog;
            if (clusterReplicationMetrics.isMetricsEnabled()) {
                String keyName = clusterReplicationMetrics.getKeyName(str, str3);
                ReplicationMetrics replicationMetrics = clusterReplicationMetrics.get(keyName);
                boolean z2 = false;
                if (replicationMetrics == null) {
                    replicationMetrics = ReplicationMetrics.get();
                    z2 = true;
                }
                replicationMetrics.connected += stats.connected ? 1 : 0;
                replicationMetrics.msgRateOut += stats.msgRateOut;
                replicationMetrics.msgThroughputOut += stats.msgThroughputOut;
                replicationMetrics.msgReplBacklog += stats.replicationBacklog;
                if (z2) {
                    clusterReplicationMetrics.put(keyName, replicationMetrics);
                }
                if (stats.replicationDelayInSeconds > replicationMetrics.maxMsgReplDelayInSeconds) {
                    replicationMetrics.maxMsgReplDelayInSeconds = stats.replicationDelayInSeconds;
                }
            }
        });
        statsOutputStream.endObject();
        statsOutputStream.startObject("subscriptions");
        namespaceStats.subsCount = (int) (namespaceStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((str4, persistentSubscription) -> {
            double d = 0.0d;
            double d2 = 0.0d;
            double d3 = 0.0d;
            double d4 = 0.0d;
            try {
                statsOutputStream.startObject(str4);
                statsOutputStream.startList("consumers");
                for (Consumer consumer : persistentSubscription.getConsumers()) {
                    namespaceStats.consumerCount++;
                    namespaceBundleStats.consumerCount++;
                    consumer.updateRates();
                    ConsumerStatsImpl stats = consumer.getStats();
                    d += stats.msgRateOut;
                    d4 += stats.messageAckRate;
                    d2 += stats.msgThroughputOut;
                    d3 += stats.msgRateRedeliver;
                    StreamingStats.writeConsumerStats(statsOutputStream, persistentSubscription.getType(), stats);
                }
                statsOutputStream.endList();
                statsOutputStream.writePair("msgBacklog", persistentSubscription.getNumberOfEntriesInBacklog(true));
                statsOutputStream.writePair("msgRateExpired", persistentSubscription.getExpiredMessageRate());
                statsOutputStream.writePair("msgRateOut", d);
                statsOutputStream.writePair("messageAckRate", d4);
                statsOutputStream.writePair("msgThroughputOut", d2);
                statsOutputStream.writePair("msgRateRedeliver", d3);
                statsOutputStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", persistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage());
                statsOutputStream.writePair("totalNonContiguousDeletedMessagesRange", persistentSubscription.getTotalNonContiguousDeletedMessagesRange());
                statsOutputStream.writePair("type", persistentSubscription.getTypeString());
                Dispatcher dispatcher = persistentSubscription.getDispatcher();
                if (null != dispatcher) {
                    statsOutputStream.writePair("filterProcessedMsgCount", dispatcher.getFilterProcessedMsgCount());
                    statsOutputStream.writePair("filterAcceptedMsgCount", dispatcher.getFilterAcceptedMsgCount());
                    statsOutputStream.writePair("filterRejectedMsgCount", dispatcher.getFilterRejectedMsgCount());
                    statsOutputStream.writePair("filterRescheduledMsgCount", dispatcher.getFilterRescheduledMsgCount());
                }
                if (Subscription.isIndividualAckMode(persistentSubscription.getType()) && (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers)) {
                    statsOutputStream.writePair("blockedSubscriptionOnUnackedMsgs", ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher()).isBlockedDispatcherOnUnackedMsgs());
                    statsOutputStream.writePair("unackedMessages", r0.getTotalUnackedMessages());
                }
                statsOutputStream.endObject();
                topicStatsHelper.aggMsgRateOut += d;
                topicStatsHelper.aggMsgThroughputOut += d2;
                namespaceStats.msgBacklog += persistentSubscription.getNumberOfEntriesInBacklog(false);
                if (this.brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
                    persistentSubscription.checkAndUnblockIfStuck();
                }
            } catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", new Object[]{str4, e.getMessage(), e});
            }
        });
        statsOutputStream.endObject();
        topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0d ? 0.0d : topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn;
        statsOutputStream.writePair("producerCount", this.producers.size());
        statsOutputStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
        statsOutputStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
        statsOutputStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
        statsOutputStream.writePair("msgInCount", getMsgInCounter());
        statsOutputStream.writePair("bytesInCount", getBytesInCounter());
        statsOutputStream.writePair("msgOutCount", getMsgOutCounter());
        statsOutputStream.writePair("bytesOutCount", getBytesOutCounter());
        statsOutputStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
        statsOutputStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
        statsOutputStream.writePair("storageSize", this.ledger.getTotalSize());
        statsOutputStream.writePair("backlogSize", this.ledger.getEstimatedBacklogSize());
        statsOutputStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) this.ledger).getPendingAddEntriesCount());
        statsOutputStream.writePair("filteredEntriesCount", getFilteredEntriesCount());
        namespaceStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        namespaceStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        namespaceStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        namespaceStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        namespaceStats.storageSize += this.ledger.getEstimatedBacklogSize();
        namespaceBundleStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        namespaceBundleStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        namespaceBundleStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        namespaceBundleStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        namespaceBundleStats.cacheSize += ((ManagedLedgerImpl) this.ledger).getCacheSize();
        statsOutputStream.endObject();
        this.addEntryLatencyStatsUsec.refresh();
        NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), namespaceStats.addLatencyBucket);
        this.addEntryLatencyStatsUsec.reset();
    }

    public double getLastUpdatedAvgPublishRateInMsg() {
        return this.lastUpdatedAvgPublishRateInMsg;
    }

    public double getLastUpdatedAvgPublishRateInByte() {
        return this.lastUpdatedAvgPublishRateInByte;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public TopicStatsImpl getStats(boolean z, boolean z2, boolean z3) {
        try {
            return asyncGetStats(z, z2, z3).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("[{}] Fail to get stats", this.topic, e);
            return null;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean z, boolean z2, boolean z3) {
        CompletableFuture<TopicStatsImpl> completableFuture = new CompletableFuture<>();
        TopicStatsImpl topicStatsImpl = new TopicStatsImpl();
        ObjectObjectHashMap objectObjectHashMap = new ObjectObjectHashMap();
        this.producers.values().forEach(producer -> {
            PublisherStatsImpl stats = producer.getStats();
            topicStatsImpl.msgRateIn += stats.msgRateIn;
            topicStatsImpl.msgThroughputIn += stats.msgThroughputIn;
            if (producer.isRemote()) {
                objectObjectHashMap.put(producer.getRemoteCluster(), stats);
            } else {
                topicStatsImpl.addPublisher(stats);
            }
        });
        topicStatsImpl.averageMsgSize = topicStatsImpl.msgRateIn == 0.0d ? 0.0d : topicStatsImpl.msgThroughputIn / topicStatsImpl.msgRateIn;
        topicStatsImpl.msgInCounter = getMsgInCounter();
        topicStatsImpl.bytesInCounter = getBytesInCounter();
        topicStatsImpl.msgChunkPublished = this.msgChunkPublished;
        topicStatsImpl.waitingPublishers = getWaitingProducersCount();
        topicStatsImpl.bytesOutCounter = this.bytesOutFromRemovedSubscriptions.longValue();
        topicStatsImpl.msgOutCounter = this.msgOutFromRemovedSubscriptions.longValue();
        topicStatsImpl.publishRateLimitedTimes = this.publishRateLimitedTimes;
        this.subscriptions.forEach((str, persistentSubscription) -> {
            SubscriptionStatsImpl stats = persistentSubscription.getStats(Boolean.valueOf(z), z2, z3);
            topicStatsImpl.msgRateOut += stats.msgRateOut;
            topicStatsImpl.msgThroughputOut += stats.msgThroughputOut;
            topicStatsImpl.bytesOutCounter += stats.bytesOutCounter;
            topicStatsImpl.msgOutCounter += stats.msgOutCounter;
            topicStatsImpl.subscriptions.put(str, stats);
            topicStatsImpl.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
            topicStatsImpl.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
        });
        this.replicators.forEach((str2, replicator) -> {
            ReplicatorStatsImpl stats = replicator.getStats();
            PublisherStatsImpl publisherStatsImpl = (PublisherStatsImpl) objectObjectHashMap.get(replicator.getRemoteCluster());
            if (publisherStatsImpl != null) {
                stats.msgRateIn = publisherStatsImpl.msgRateIn;
                stats.msgThroughputIn = publisherStatsImpl.msgThroughputIn;
                stats.inboundConnection = publisherStatsImpl.getAddress();
                stats.inboundConnectedSince = publisherStatsImpl.getConnectedSince();
            }
            topicStatsImpl.msgRateOut += stats.msgRateOut;
            topicStatsImpl.msgThroughputOut += stats.msgThroughputOut;
            topicStatsImpl.replication.put(replicator.getRemoteCluster(), stats);
        });
        topicStatsImpl.storageSize = this.ledger.getTotalSize();
        topicStatsImpl.backlogSize = this.ledger.getEstimatedBacklogSize();
        topicStatsImpl.deduplicationStatus = this.messageDeduplication.getStatus().toString();
        topicStatsImpl.topicEpoch = this.topicEpoch.orElse(null);
        topicStatsImpl.offloadedStorageSize = this.ledger.getOffloadedSize();
        topicStatsImpl.lastOffloadLedgerId = this.ledger.getLastOffloadedLedgerId();
        topicStatsImpl.lastOffloadSuccessTimeStamp = this.ledger.getLastOffloadedSuccessTimestamp();
        topicStatsImpl.lastOffloadFailureTimeStamp = this.ledger.getLastOffloadedFailureTimestamp();
        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean();
        topicStatsImpl.compaction.reset();
        compactorMXBean.flatMap(compactorMXBean2 -> {
            return compactorMXBean2.getCompactionRecordForTopic(this.topic);
        }).map(compactionRecord -> {
            topicStatsImpl.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
            topicStatsImpl.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
            topicStatsImpl.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
            topicStatsImpl.compaction.lastCompactionDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
            return compactionRecord;
        });
        if (!z3 || topicStatsImpl.backlogSize == 0) {
            completableFuture.complete(topicStatsImpl);
        } else {
            this.ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((l, th) -> {
                if (th != null) {
                    log.error("[{}] Failed to get earliest message publish time in backlog", this.topic, th);
                    completableFuture.completeExceptionally(th);
                } else {
                    topicStatsImpl.earliestMsgPublishTimeInBacklogs = l.longValue();
                    completableFuture.complete(topicStatsImpl);
                }
            });
        }
        return completableFuture;
    }

    private Optional<CompactorMXBean> getCompactorMXBean() {
        Compactor compactor = null;
        try {
            compactor = this.brokerService.pulsar().getCompactor(false);
        } catch (PulsarServerException e) {
            log.warn("get compactor error", e);
        }
        return Optional.ofNullable(compactor).map(compactor2 -> {
            return compactor2.getStats();
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean z) {
        CompletableFuture<PersistentTopicInternalStats> completableFuture = new CompletableFuture<>();
        PersistentTopicInternalStats persistentTopicInternalStats = new PersistentTopicInternalStats();
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) this.ledger;
        persistentTopicInternalStats.entriesAddedCounter = managedLedgerImpl.getEntriesAddedCounter();
        persistentTopicInternalStats.numberOfEntries = managedLedgerImpl.getNumberOfEntries();
        persistentTopicInternalStats.totalSize = managedLedgerImpl.getTotalSize();
        persistentTopicInternalStats.currentLedgerEntries = managedLedgerImpl.getCurrentLedgerEntries();
        persistentTopicInternalStats.currentLedgerSize = managedLedgerImpl.getCurrentLedgerSize();
        persistentTopicInternalStats.lastLedgerCreatedTimestamp = DateFormatter.format(managedLedgerImpl.getLastLedgerCreatedTimestamp());
        if (managedLedgerImpl.getLastLedgerCreationFailureTimestamp() != 0) {
            persistentTopicInternalStats.lastLedgerCreationFailureTimestamp = DateFormatter.format(managedLedgerImpl.getLastLedgerCreationFailureTimestamp());
        }
        persistentTopicInternalStats.waitingCursorsCount = managedLedgerImpl.getWaitingCursorsCount();
        persistentTopicInternalStats.pendingAddEntriesCount = managedLedgerImpl.getPendingAddEntriesCount();
        persistentTopicInternalStats.lastConfirmedEntry = managedLedgerImpl.getLastConfirmedEntry().toString();
        persistentTopicInternalStats.state = managedLedgerImpl.getState().toString();
        persistentTopicInternalStats.ledgers = Lists.newArrayList();
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        newConcurrentHashSet.add(this.brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync().whenComplete((set, th) -> {
            if (th == null) {
                managedLedgerImpl.getLedgersInfo().forEach((l, ledgerInfo) -> {
                    ManagedLedgerInternalStats.LedgerInfo ledgerInfo = new ManagedLedgerInternalStats.LedgerInfo();
                    ledgerInfo.ledgerId = ledgerInfo.getLedgerId();
                    ledgerInfo.entries = ledgerInfo.getEntries();
                    ledgerInfo.size = ledgerInfo.getSize();
                    ledgerInfo.offloaded = ledgerInfo.hasOffloadContext() && ledgerInfo.getOffloadContext().getComplete();
                    persistentTopicInternalStats.ledgers.add(ledgerInfo);
                    if (z) {
                        newConcurrentHashSet.add(managedLedgerImpl.getLedgerMetadata(ledgerInfo.getLedgerId()).handle((str, th) -> {
                            if (th != null) {
                                return null;
                            }
                            ledgerInfo.metadata = str;
                            return null;
                        }));
                        newConcurrentHashSet.add(managedLedgerImpl.getEnsemblesAsync(ledgerInfo.getLedgerId()).handle((set, th2) -> {
                            if (th2 != null) {
                                return null;
                            }
                            ledgerInfo.underReplicated = !set.containsAll((Collection) set.stream().map((v0) -> {
                                return v0.toString();
                            }).collect(Collectors.toList()));
                            return null;
                        }));
                    }
                });
            } else {
                log.error("[{}] Failed to fetch available bookies.", this.topic, th);
                completableFuture.completeExceptionally(th);
            }
        }));
        ManagedLedgerInternalStats.LedgerInfo ledgerInfo = new ManagedLedgerInternalStats.LedgerInfo();
        ledgerInfo.ledgerId = -1L;
        ledgerInfo.entries = -1L;
        ledgerInfo.size = -1L;
        Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
        if (compactedTopicContext.isPresent()) {
            CompactedTopicContext compactedTopicContext2 = compactedTopicContext.get();
            ledgerInfo.ledgerId = compactedTopicContext2.getLedger().getId();
            ledgerInfo.entries = compactedTopicContext2.getLedger().getLastAddConfirmed() + 1;
            ledgerInfo.size = compactedTopicContext2.getLedger().getLength();
        }
        persistentTopicInternalStats.compactedLedger = ledgerInfo;
        persistentTopicInternalStats.cursors = Maps.newTreeMap();
        managedLedgerImpl.getCursors().forEach(managedCursor -> {
            ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) managedCursor;
            ManagedLedgerInternalStats.CursorStats cursorStats = new ManagedLedgerInternalStats.CursorStats();
            cursorStats.markDeletePosition = managedCursorImpl.getMarkDeletedPosition().toString();
            cursorStats.readPosition = managedCursorImpl.getReadPosition().toString();
            cursorStats.waitingReadOp = managedCursorImpl.hasPendingReadRequest();
            cursorStats.pendingReadOps = managedCursorImpl.getPendingReadOpsCount();
            cursorStats.messagesConsumedCounter = managedCursorImpl.getMessagesConsumedCounter();
            cursorStats.cursorLedger = managedCursorImpl.getCursorLedger();
            cursorStats.cursorLedgerLastEntry = managedCursorImpl.getCursorLedgerLastEntry();
            cursorStats.individuallyDeletedMessages = managedCursorImpl.getIndividuallyDeletedMessages();
            cursorStats.lastLedgerSwitchTimestamp = DateFormatter.format(managedCursorImpl.getLastLedgerSwitchTimestamp());
            cursorStats.state = managedCursorImpl.getState();
            cursorStats.numberOfEntriesSinceFirstNotAckedMessage = managedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage();
            cursorStats.totalNonContiguousDeletedMessagesRange = managedCursorImpl.getTotalNonContiguousDeletedMessagesRange();
            cursorStats.properties = managedCursorImpl.getProperties();
            PersistentSubscription persistentSubscription = this.subscriptions.get(Codec.decode(managedCursor.getName()));
            if (persistentSubscription != null) {
                if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                    PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = (PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher();
                    cursorStats.subscriptionHavePendingRead = persistentDispatcherMultipleConsumers.havePendingRead;
                    cursorStats.subscriptionHavePendingReplayRead = persistentDispatcherMultipleConsumers.havePendingReplayRead;
                } else if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
                    cursorStats.subscriptionHavePendingRead = ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher()).havePendingRead;
                }
            }
            persistentTopicInternalStats.cursors.put(managedCursorImpl.getName(), cursorStats);
        });
        try {
            String schemaName = TopicName.get(this.topic).getSchemaName();
            CompletableFuture completableFuture2 = new CompletableFuture();
            persistentTopicInternalStats.schemaLedgers = Collections.synchronizedList(new ArrayList());
            if (this.brokerService.getPulsar().getSchemaStorage() == null || !(this.brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage)) {
                completableFuture2.complete(null);
            } else {
                ((BookkeeperSchemaStorage) this.brokerService.getPulsar().getSchemaStorage()).getStoreLedgerIdsBySchemaId(schemaName).thenAccept(list -> {
                    ArrayList arrayList = new ArrayList();
                    list.forEach(l -> {
                        CompletableFuture completableFuture3 = new CompletableFuture();
                        arrayList.add(completableFuture3);
                        CompletableFuture<LedgerMetadata> completableFuture4 = null;
                        try {
                            completableFuture4 = this.brokerService.getPulsar().getBookKeeperClient().getLedgerMetadata(l.longValue());
                        } catch (NullPointerException e) {
                            if (log.isDebugEnabled()) {
                                log.debug("{{}} Failed to get ledger metadata for the schema ledger {}", new Object[]{this.topic, l, e});
                            }
                        }
                        if (completableFuture4 != null) {
                            completableFuture4.thenAccept(ledgerMetadata -> {
                                ManagedLedgerInternalStats.LedgerInfo ledgerInfo2 = new ManagedLedgerInternalStats.LedgerInfo();
                                ledgerInfo2.ledgerId = ledgerMetadata.getLedgerId();
                                ledgerInfo2.entries = ledgerMetadata.getLastEntryId() + 1;
                                ledgerInfo2.size = ledgerMetadata.getLength();
                                if (z) {
                                    ledgerInfo.metadata = ledgerMetadata.toSafeString();
                                }
                                persistentTopicInternalStats.schemaLedgers.add(ledgerInfo2);
                                completableFuture3.complete(null);
                            }).exceptionally(th2 -> {
                                completableFuture3.completeExceptionally(th2);
                                return null;
                            });
                        } else {
                            completableFuture3.complete(null);
                        }
                    });
                    FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList).thenRun(() -> {
                        completableFuture2.complete(null);
                    }).exceptionally(th2 -> {
                        completableFuture2.completeExceptionally(th2);
                        return null;
                    });
                }).exceptionally(th2 -> {
                    completableFuture2.completeExceptionally(th2);
                    return null;
                });
            }
            completableFuture2.thenRun(() -> {
                FutureUtil.waitForAll(newConcurrentHashSet).handle((r5, th3) -> {
                    completableFuture.complete(persistentTopicInternalStats);
                    return null;
                });
            }).exceptionally(th3 -> {
                completableFuture.completeExceptionally(th3);
                return null;
            });
            return completableFuture;
        } catch (Throwable th4) {
            completableFuture.completeExceptionally(th4);
            return completableFuture;
        }
    }

    public Optional<CompactedTopicContext> getCompactedTopicContext() {
        try {
            return ((CompactedTopicImpl) this.compactedTopic).getCompactedTopicContext();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("[{}]Fail to get ledger information for compacted topic.", this.topic);
            return Optional.empty();
        }
    }

    public long getBacklogSize() {
        return this.ledger.getEstimatedBacklogSize();
    }

    public boolean isActive(InactiveTopicDeleteMode inactiveTopicDeleteMode) {
        switch (inactiveTopicDeleteMode) {
            case delete_when_no_subscriptions:
                if (!this.subscriptions.isEmpty()) {
                    return true;
                }
                break;
            case delete_when_subscriptions_caught_up:
                if (hasBacklogs()) {
                    return true;
                }
                break;
        }
        return TopicName.get(this.topic).isGlobal() ? hasLocalProducers() : currentUsageCount() != 0;
    }

    private boolean hasBacklogs() {
        return this.subscriptions.values().stream().anyMatch(persistentSubscription -> {
            return persistentSubscription.getNumberOfEntriesInBacklog(false) > 0;
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkGC() {
        if (isDeleteWhileInactive()) {
            InactiveTopicDeleteMode inactiveTopicDeleteMode = this.topicPolicies.getInactiveTopicPolicies().get().getInactiveTopicDeleteMode();
            int maxInactiveDurationSeconds = this.topicPolicies.getInactiveTopicPolicies().get().getMaxInactiveDurationSeconds();
            if (isActive(inactiveTopicDeleteMode)) {
                this.lastActive = System.nanoTime();
                return;
            }
            if (System.nanoTime() - this.lastActive >= TimeUnit.SECONDS.toNanos(maxInactiveDurationSeconds) && !shouldTopicBeRetained()) {
                CompletableFuture completableFuture = new CompletableFuture();
                if (TopicName.get(this.topic).isGlobal()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", this.topic, Integer.valueOf(maxInactiveDurationSeconds));
                    }
                    closeReplProducersIfNoBacklog().thenRun(() -> {
                        if (!hasRemoteProducers()) {
                            log.info("[{}] Global topic inactive for {} seconds, closed repl producers", this.topic, Integer.valueOf(maxInactiveDurationSeconds));
                            completableFuture.complete(null);
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Global topic has connected remote producers. Not a candidate for GC", this.topic);
                            }
                            completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has connected remote producers"));
                        }
                    }).exceptionally(th -> {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Global topic has replication backlog. Not a candidate for GC", this.topic);
                        }
                        completableFuture.completeExceptionally(th.getCause());
                        return null;
                    });
                } else {
                    completableFuture.complete(null);
                }
                completableFuture.thenCompose(r7 -> {
                    return delete(inactiveTopicDeleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, inactiveTopicDeleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true);
                }).thenApply(r3 -> {
                    return tryToDeletePartitionedMetadata();
                }).thenRun(() -> {
                    log.info("[{}] Topic deleted successfully due to inactivity", this.topic);
                }).exceptionally(th2 -> {
                    if (!(th2.getCause() instanceof BrokerServiceException.TopicBusyException)) {
                        log.warn("[{}] Inactive topic deletion failed", this.topic, th2);
                        return null;
                    }
                    if (!log.isDebugEnabled()) {
                        return null;
                    }
                    log.debug("[{}] Did not delete busy topic: {}", this.topic, th2.getCause().getMessage());
                    return null;
                });
            }
        }
    }

    private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
        if (TopicName.get(this.topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
            return CompletableFuture.completedFuture(null);
        }
        TopicName topicName = TopicName.get(TopicName.get(this.topic).getPartitionedTopicName());
        NamespaceResources.PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
        return partitionedTopicResources.partitionedTopicExistsAsync(topicName).thenCompose(bool -> {
            return !bool.booleanValue() ? CompletableFuture.completedFuture(null) : getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(partitionedTopicMetadata -> {
                ArrayList arrayList = new ArrayList(partitionedTopicMetadata.partitions);
                for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                    arrayList.add(this.brokerService.getPulsar().getPulsarResources().getTopicResources().persistentTopicExists(topicName.getPartition(i)));
                }
                List unmodifiableList = Collections.unmodifiableList(arrayList);
                return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) unmodifiableList).thenCompose(r11 -> {
                    if (!unmodifiableList.stream().map((v0) -> {
                        return v0.join();
                    }).filter(bool -> {
                        return bool.booleanValue();
                    }).findAny().isPresent()) {
                        return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
                    }
                    log.error("[{}] Delete topic metadata failed because another partition exist.", topicName);
                    throw new UnsupportedOperationException(String.format("Another partition exists for [%s].", topicName));
                });
            });
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkInactiveSubscriptions() {
        try {
            Policies orElseThrow = this.brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(TopicName.get(this.topic).getNamespaceObject()).orElseThrow(() -> {
                return new MetadataStoreException.NotFoundException();
            });
            int subscriptionExpirationTimeMinutes = this.brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes();
            long millis = TimeUnit.MINUTES.toMillis(orElseThrow.subscription_expiration_time_minutes == null ? subscriptionExpirationTimeMinutes : r0.intValue());
            if (millis > 0) {
                this.subscriptions.forEach((str, persistentSubscription) -> {
                    if ((persistentSubscription.dispatcher == null || !persistentSubscription.dispatcher.isConsumerConnected()) && !persistentSubscription.isReplicated() && System.currentTimeMillis() - persistentSubscription.cursor.getLastActive() > millis) {
                        persistentSubscription.delete().thenAccept(r7 -> {
                            log.info("[{}][{}] The subscription was deleted due to expiration", this.topic, str);
                        });
                    }
                });
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies", this.topic);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkBackloggedCursors() {
        this.subscriptions.forEach((str, persistentSubscription) -> {
            if (persistentSubscription.getConsumers().isEmpty() || persistentSubscription.getCursor().getNumberOfEntries() >= this.backloggedCursorThresholdEntries) {
                persistentSubscription.getCursor().setInactive();
            } else {
                persistentSubscription.getCursor().setActive();
            }
        });
    }

    public void checkInactiveLedgers() {
        this.ledger.checkInactiveLedgerAndRollOver();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkCursorsToCacheEntries() {
        try {
            this.ledger.checkCursorsToCacheEntries();
        } catch (Exception e) {
            log.warn("Failed to check cursors to cache entries", e);
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkDeduplicationSnapshot() {
        this.messageDeduplication.takeSnapshot();
    }

    private boolean shouldTopicBeRetained() {
        long nanos = TimeUnit.MINUTES.toNanos(this.topicPolicies.getRetentionPolicies().get().getRetentionTimeInMinutes());
        return nanos < 0 || System.nanoTime() - this.lastActive < nanos;
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    public void updateDispatchRateLimiter() {
        initializeDispatchRateLimiterIfNeeded();
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.updateDispatchRate();
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> onPoliciesUpdate(Policies policies) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] isEncryptionRequired changes: {} -> {}", new Object[]{this.topic, Boolean.valueOf(this.isEncryptionRequired), Boolean.valueOf(policies.encryption_required)});
        }
        if (policies.deleted) {
            log.debug("Ignore the update because it has been deleted : {}", policies);
            return CompletableFuture.completedFuture(null);
        }
        updateTopicPolicyByNamespacePolicy(policies);
        this.isEncryptionRequired = policies.encryption_required;
        this.isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
        this.schemaValidationEnforced = policies.schema_validation_enforced;
        updateDispatchRateLimiter();
        updateSubscribeRateLimiter();
        updatePublishDispatcher();
        updateResourceGroupLimiter(Optional.of(policies));
        ArrayList arrayList = new ArrayList(this.producers.size());
        this.producers.values().forEach(producer -> {
            CompletableFuture<Void> checkPermissionsAsync = producer.checkPermissionsAsync();
            Objects.requireNonNull(producer);
            arrayList.add(checkPermissionsAsync.thenRun(producer::checkEncryption));
        });
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList).thenCompose(r4 -> {
            return updateSubscriptionsDispatcherRateLimiter().thenCompose(r6 -> {
                this.replicators.forEach((str, replicator) -> {
                    replicator.updateRateLimiter();
                });
                checkMessageExpiry();
                CompletableFuture<Void> checkReplicationAndRetryOnFailure = checkReplicationAndRetryOnFailure();
                CompletableFuture<Void> checkDeduplicationStatus = checkDeduplicationStatus();
                CompletableFuture<Void> checkPersistencePolicies = checkPersistencePolicies();
                if (this.subscribeRateLimiter.isPresent()) {
                    this.subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
                }
                return CompletableFuture.allOf(checkReplicationAndRetryOnFailure, checkDeduplicationStatus, checkPersistencePolicies, preCreateSubscriptionForCompactionIfNeeded());
            });
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            log.error("[{}] update namespace polices : {} error", new Object[]{getName(), policies, th});
            throw FutureUtil.wrapToCompletionException(th);
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
        return this.topicPolicies.getBackLogQuotaMap().get(backlogQuotaType).get();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> checkBacklogQuotaExceeded(String str, BacklogQuota.BacklogQuotaType backlogQuotaType) {
        BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType);
        if (backlogQuota != null) {
            BacklogQuota.RetentionPolicy policy = backlogQuota.getPolicy();
            if (policy != BacklogQuota.RetentionPolicy.producer_request_hold && policy != BacklogQuota.RetentionPolicy.producer_exception) {
                return CompletableFuture.completedFuture(null);
            }
            if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) {
                log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", getName(), str);
                return FutureUtil.failedFuture(new BrokerServiceException.TopicBacklogQuotaExceededException(policy));
            }
            if (backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age) {
                return checkTimeBacklogExceeded().thenCompose(bool -> {
                    if (!bool.booleanValue()) {
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", getName(), str);
                    return FutureUtil.failedFuture(new BrokerServiceException.TopicBacklogQuotaExceededException(policy));
                });
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    public boolean isSizeBacklogExceeded() {
        long limitSize = getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
        if (limitSize < 0) {
            return false;
        }
        long backlogSize = getBacklogSize();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Storage size = [{}], backlog quota limit [{}]", new Object[]{getName(), Long.valueOf(backlogSize), Long.valueOf(limitSize)});
        }
        return backlogSize >= limitSize;
    }

    public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
        TopicName topicName = TopicName.get(getName());
        int limitTime = getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
        if (limitTime <= 0 || ((ManagedCursorContainer) this.ledger.getCursors()).getSlowestReaderPosition() == null) {
            return CompletableFuture.completedFuture(false);
        }
        if (this.brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
            CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
            ((ManagedLedgerImpl) this.ledger).asyncReadEntry(((ManagedLedgerImpl) this.ledger).getNextValidPosition(((ManagedCursorContainer) this.ledger.getCursors()).getSlowestReaderPosition()), new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.10
                final /* synthetic */ int val$backlogQuotaLimitInSecond;
                final /* synthetic */ CompletableFuture val$future;
                final /* synthetic */ TopicName val$topicName;

                AnonymousClass10(int limitTime2, CompletableFuture completableFuture2, TopicName topicName2) {
                    r5 = limitTime2;
                    r6 = completableFuture2;
                    r7 = topicName2;
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
                public void readEntryComplete(Entry entry, Object obj) {
                    try {
                        try {
                            boolean isEntryExpired = MessageImpl.isEntryExpired(r5, Commands.getEntryTimestamp(entry.getDataBuffer()));
                            if (isEntryExpired && PersistentTopic.log.isDebugEnabled()) {
                                PersistentTopic.log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlogexceeded quota {}", ((ManagedLedgerImpl) PersistentTopic.this.ledger).getSlowestConsumer().getName(), Integer.valueOf(r5));
                            }
                            r6.complete(Boolean.valueOf(isEntryExpired));
                            entry.release();
                        } catch (Exception e) {
                            PersistentTopic.log.error("[{}][{}] Error deserializing message for backlog check", r7, e);
                            r6.complete(false);
                            entry.release();
                        }
                    } catch (Throwable th) {
                        entry.release();
                        throw th;
                    }
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
                public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    PersistentTopic.log.error("[{}][{}] Error reading entry for precise time based  backlog check", r7, managedLedgerException);
                    r6.complete(false);
                }
            }, null);
            return completableFuture2;
        }
        try {
            return slowestReaderTimeBasedBacklogQuotaCheck(((ManagedCursorContainer) this.ledger.getCursors()).getSlowestReaderPosition());
        } catch (Exception e) {
            log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName2, e);
            return CompletableFuture.completedFuture(false);
        }
    }

    private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl positionImpl) throws ExecutionException, InterruptedException {
        int compareTo;
        int limitTime = getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
        Long valueOf = Long.valueOf(positionImpl.getLedgerId());
        if (((ManagedLedgerImpl) this.ledger).getLedgersInfo().lastKey().equals(valueOf)) {
            return CompletableFuture.completedFuture(false);
        }
        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = this.ledger.getLedgerInfo(valueOf.longValue()).get();
        if (ledgerInfo == null || !ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() <= 0 || ((ManagedLedgerImpl) this.ledger).getClock().millis() - ledgerInfo.getTimestamp() <= limitTime * 1000 || (compareTo = positionImpl.compareTo(new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))) > 0) {
            return CompletableFuture.completedFuture(false);
        }
        if (compareTo >= 0) {
            return slowestReaderTimeBasedBacklogQuotaCheck(((ManagedLedgerImpl) this.ledger).getNextValidPosition(positionImpl));
        }
        if (log.isDebugEnabled()) {
            log.debug("Time based backlog quota exceeded, quota {}, age of ledger slowest cursor currently on {}", Integer.valueOf(limitTime * 1000), Long.valueOf(((ManagedLedgerImpl) this.ledger).getClock().millis() - ledgerInfo.getTimestamp()));
        }
        return CompletableFuture.completedFuture(true);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isReplicated() {
        return !this.replicators.isEmpty();
    }

    public CompletableFuture<MessageId> terminate() {
        CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
        this.ledger.asyncTerminate(new AsyncCallbacks.TerminateCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.11
            final /* synthetic */ CompletableFuture val$future;

            AnonymousClass11(CompletableFuture completableFuture2) {
                r5 = completableFuture2;
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback
            public void terminateComplete(Position position, Object obj) {
                PersistentTopic.this.producers.values().forEach((v0) -> {
                    v0.disconnect();
                });
                PersistentTopic.this.subscriptions.forEach((str, persistentSubscription) -> {
                    persistentSubscription.topicTerminated();
                });
                PositionImpl positionImpl = (PositionImpl) position;
                MessageIdImpl messageIdImpl = new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), -1);
                PersistentTopic.log.info("[{}] Topic terminated at {}", PersistentTopic.this.getName(), messageIdImpl);
                r5.complete(messageIdImpl);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback
            public void terminateFailed(ManagedLedgerException managedLedgerException, Object obj) {
                r5.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture2;
    }

    public boolean isOldestMessageExpired(ManagedCursor managedCursor, int i) {
        Entry entry = null;
        boolean z = false;
        try {
            try {
                entry = managedCursor.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Include);
                if (entry != null) {
                    z = MessageImpl.isEntryExpired((int) (i * 1.5d), Commands.getEntryTimestamp(entry.getDataBuffer()));
                }
                if (entry != null) {
                    entry.release();
                }
            } catch (Exception e) {
                log.warn("[{}] Error while getting the oldest message", this.topic, e);
                if (entry != null) {
                    entry.release();
                }
            }
            return z;
        } catch (Throwable th) {
            if (entry != null) {
                entry.release();
            }
            throw th;
        }
    }

    public CompletableFuture<Void> clearBacklog() {
        log.info("[{}] Clearing backlog on all cursors in the topic.", this.topic);
        ArrayList newArrayList = Lists.newArrayList();
        List<String> keys = getSubscriptions().keys();
        keys.addAll(getReplicators().keys());
        Iterator<String> it = keys.iterator();
        while (it.hasNext()) {
            newArrayList.add(clearBacklog(it.next()));
        }
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) newArrayList);
    }

    public CompletableFuture<Void> clearBacklog(String str) {
        log.info("[{}] Clearing backlog for cursor {} in the topic.", this.topic, str);
        PersistentSubscription subscription = getSubscription(str);
        if (subscription != null) {
            return subscription.clearBacklog();
        }
        PersistentReplicator persistentReplicator = (PersistentReplicator) getPersistentReplicator(str);
        return persistentReplicator != null ? persistentReplicator.clearBacklog() : FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
        return Optional.ofNullable(this.brokerService.getBrokerDispatchRateLimiter());
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
        return this.subscribeRateLimiter;
    }

    public long getLastPublishedSequenceId(String str) {
        return this.messageDeduplication.getLastPublishedSequenceId(str);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Position getLastPosition() {
        return this.ledger.getLastConfirmedEntry();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<MessageId> getLastMessageId() {
        CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
        PositionImpl positionImpl = (PositionImpl) this.ledger.getLastConfirmedEntry();
        String name = getName();
        int partitionIndex = TopicName.getPartitionIndex(name);
        if (log.isDebugEnabled()) {
            log.debug("getLastMessageId {}, partitionIndex{}, position {}", new Object[]{name, Integer.valueOf(partitionIndex), positionImpl});
        }
        if (positionImpl.getEntryId() == -1) {
            completableFuture.complete(new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), partitionIndex));
            return completableFuture;
        }
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) this.ledger;
        if (managedLedgerImpl.ledgerExists(positionImpl.getLedgerId())) {
            managedLedgerImpl.asyncReadEntry(positionImpl, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.12
                final /* synthetic */ CompletableFuture val$completableFuture;
                final /* synthetic */ PositionImpl val$position;
                final /* synthetic */ int val$partitionIndex;

                AnonymousClass12(CompletableFuture completableFuture2, PositionImpl positionImpl2, int partitionIndex2) {
                    r5 = completableFuture2;
                    r6 = positionImpl2;
                    r7 = partitionIndex2;
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
                public void readEntryComplete(Entry entry, Object obj) {
                    try {
                        MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
                        if (parseMessageMetadata.hasNumMessagesInBatch()) {
                            r5.complete(new BatchMessageIdImpl(r6.getLedgerId(), r6.getEntryId(), r7, parseMessageMetadata.getNumMessagesInBatch() - 1));
                        } else {
                            r5.complete(new MessageIdImpl(r6.getLedgerId(), r6.getEntryId(), r7));
                        }
                    } finally {
                        entry.release();
                    }
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
                public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    r5.completeExceptionally(managedLedgerException);
                }
            }, null);
            return completableFuture2;
        }
        completableFuture2.complete(MessageId.earliest);
        return completableFuture2;
    }

    public synchronized void triggerCompaction() throws PulsarServerException, BrokerServiceException.AlreadyRunningException {
        if (!this.currentCompaction.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Compaction already in progress");
        }
        this.currentCompaction = this.brokerService.pulsar().getCompactor().compact(this.topic);
    }

    public synchronized LongRunningProcessStatus compactionStatus() {
        CompletableFuture<Long> completableFuture;
        completableFuture = this.currentCompaction;
        if (!completableFuture.isDone()) {
            return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            return completableFuture.join().longValue() == COMPACTION_NEVER_RUN ? LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN) : LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
        } catch (CancellationException | CompletionException e) {
            return LongRunningProcessStatus.forError(e.getMessage());
        }
    }

    public synchronized void triggerOffload(MessageIdImpl messageIdImpl) throws BrokerServiceException.AlreadyRunningException {
        if (!this.currentOffload.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Offload already in progress");
        }
        CompletableFuture<MessageIdImpl> completableFuture = new CompletableFuture<>();
        this.currentOffload = completableFuture;
        log.info("[{}] Starting offload operation at messageId {}", this.topic, messageIdImpl);
        getManagedLedger().asyncOffloadPrefix(PositionImpl.get(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId()), new AsyncCallbacks.OffloadCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.13
            final /* synthetic */ MessageIdImpl val$messageId;
            final /* synthetic */ CompletableFuture val$promise;

            AnonymousClass13(MessageIdImpl messageIdImpl2, CompletableFuture completableFuture2) {
                r5 = messageIdImpl2;
                r6 = completableFuture2;
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback
            public void offloadComplete(Position position, Object obj) {
                PositionImpl positionImpl = (PositionImpl) position;
                PersistentTopic.log.info("[{}] Completed successfully offload operation at messageId {}", PersistentTopic.this.topic, r5);
                r6.complete(new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), -1));
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback
            public void offloadFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTopic.log.warn("[{}] Failed offload operation at messageId {}", new Object[]{PersistentTopic.this.topic, r5, managedLedgerException});
                r6.completeExceptionally(managedLedgerException);
            }
        }, null);
    }

    public synchronized OffloadProcessStatus offloadStatus() {
        if (!this.currentOffload.isDone()) {
            return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            return this.currentOffload.join() == MessageId.earliest ? OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN) : OffloadProcessStatus.forSuccess(this.currentOffload.join());
        } catch (CancellationException | CompletionException e) {
            log.warn("Failed to offload", e.getCause());
            return OffloadProcessStatus.forError(e.getMessage());
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schemaData) {
        return hasSchema().thenCompose(bool -> {
            return (!bool.booleanValue() && this.producers.isEmpty() && this.subscriptions.values().stream().mapToInt(persistentSubscription -> {
                return persistentSubscription.getConsumers().size();
            }).sum() == 0 && this.ledger.getTotalSize() == 0) ? addSchema(schemaData).thenCompose(schemaVersion -> {
                return CompletableFuture.completedFuture(null);
            }) : checkSchemaCompatibleForConsumer(schemaData);
        });
    }

    public synchronized void checkReplicatedSubscriptionControllerState() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.subscriptions.forEach((str, persistentSubscription) -> {
            if (persistentSubscription.isReplicated()) {
                atomicBoolean.set(true);
            }
        });
        if (!atomicBoolean.get()) {
            log.info("[{}] There are no replicated subscriptions on the topic", this.topic);
        }
        checkReplicatedSubscriptionControllerState(atomicBoolean.get());
    }

    private synchronized void checkReplicatedSubscriptionControllerState(boolean z) {
        boolean isPresent = this.replicatedSubscriptionsController.isPresent();
        boolean isEnableReplicatedSubscriptions = this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
        if (z && !isPresent && isEnableReplicatedSubscriptions) {
            log.info("[{}] Enabling replicated subscriptions controller", this.topic);
            this.replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this, this.brokerService.pulsar().getConfiguration().getClusterName()));
        } else {
            if ((!isPresent || z) && isEnableReplicatedSubscriptions) {
                return;
            }
            log.info("[{}] Disabled replicated subscriptions controller", this.topic);
            this.replicatedSubscriptionsController.ifPresent((v0) -> {
                v0.close();
            });
            this.replicatedSubscriptionsController = Optional.empty();
        }
    }

    public void receivedReplicatedSubscriptionMarker(Position position, int i, ByteBuf byteBuf) {
        ReplicatedSubscriptionsController orElse = this.replicatedSubscriptionsController.orElse(null);
        if (orElse == null) {
            checkReplicatedSubscriptionControllerState(true);
            orElse = this.replicatedSubscriptionsController.get();
        }
        orElse.receivedReplicatedSubscriptionMarker(position, i, byteBuf);
    }

    public Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController() {
        return this.replicatedSubscriptionsController;
    }

    public CompactedTopic getCompactedTopic() {
        return this.compactedTopic;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isSystemTopic() {
        return false;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isPersistent() {
        return true;
    }

    public synchronized void cancelFencedTopicMonitoringTask() {
        ScheduledFuture<?> scheduledFuture = this.fencedTopicMonitoringTask;
        if (scheduledFuture == null || scheduledFuture.isDone()) {
            return;
        }
        scheduledFuture.cancel(false);
    }

    private synchronized void fence() {
        int topicFencingTimeoutSeconds;
        this.isFenced = true;
        ScheduledFuture<?> scheduledFuture = this.fencedTopicMonitoringTask;
        if ((scheduledFuture == null || scheduledFuture.isDone()) && (topicFencingTimeoutSeconds = this.brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds()) > 0) {
            this.fencedTopicMonitoringTask = this.brokerService.executor().schedule(this::closeFencedTopicForcefully, topicFencingTimeoutSeconds, TimeUnit.SECONDS);
        }
    }

    private synchronized void unfence() {
        this.isFenced = false;
        cancelFencedTopicMonitoringTask();
    }

    private void closeFencedTopicForcefully() {
        if (this.isFenced) {
            int topicFencingTimeoutSeconds = this.brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
            if (this.isClosingOrDeleting) {
                log.warn("[{}] Topic remained fenced for {} seconds and is already closed (pendingWriteOps: {})", new Object[]{this.topic, Integer.valueOf(topicFencingTimeoutSeconds), Long.valueOf(this.pendingWriteOps.get())});
            } else {
                log.error("[{}] Topic remained fenced for {} seconds, so close it (pendingWriteOps: {})", new Object[]{this.topic, Integer.valueOf(topicFencingTimeoutSeconds), Long.valueOf(this.pendingWriteOps.get())});
                close();
            }
        }
    }

    private void fenceTopicToCloseOrDelete() {
        this.isClosingOrDeleting = true;
        this.isFenced = true;
    }

    public void unfenceTopicToResume() {
        this.isFenced = false;
        this.isClosingOrDeleting = false;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void publishTxnMessage(TxnID txnID, ByteBuf byteBuf, Topic.PublishContext publishContext) {
        this.pendingWriteOps.incrementAndGet();
        if (this.isFenced) {
            publishContext.completed(new BrokerServiceException.TopicFencedException("fenced"), -1L, -1L);
            decrementPendingWriteOpsAndCheck();
            return;
        }
        if (isExceedMaximumMessageSize(byteBuf.readableBytes(), publishContext)) {
            publishContext.completed(new BrokerServiceException.NotAllowedException("Exceed maximum message size"), -1L, -1L);
            decrementPendingWriteOpsAndCheck();
            return;
        }
        switch (this.messageDeduplication.isDuplicate(publishContext, byteBuf)) {
            case NotDup:
                this.transactionBuffer.appendBufferToTxn(txnID, publishContext.getSequenceId(), byteBuf).thenAccept(position -> {
                    this.messageDeduplication.recordMessagePersisted(publishContext, (PositionImpl) position);
                    publishContext.completed(null, ((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId());
                    decrementPendingWriteOpsAndCheck();
                }).exceptionally(th -> {
                    Throwable cause = th.getCause();
                    if (cause instanceof BrokerServiceException.NotAllowedException) {
                        publishContext.completed((BrokerServiceException.NotAllowedException) cause, -1L, -1L);
                        decrementPendingWriteOpsAndCheck();
                        return null;
                    }
                    if (!(cause instanceof ManagedLedgerException)) {
                        cause = new ManagedLedgerException(cause);
                    }
                    addFailed((ManagedLedgerException) cause, publishContext);
                    return null;
                });
                return;
            case Dup:
                publishContext.completed(null, -1L, -1L);
                decrementPendingWriteOpsAndCheck();
                return;
            default:
                publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1L, -1L);
                decrementPendingWriteOpsAndCheck();
                return;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> endTxn(TxnID txnID, int i, long j) {
        return 0 == i ? this.transactionBuffer.commitTxn(txnID, j) : 1 == i ? this.transactionBuffer.abortTxn(txnID, j) : FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Unsupported txnAction " + i));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> truncate() {
        return this.ledger.asyncTruncate();
    }

    public long getDelayedDeliveryTickTimeMillis() {
        return this.topicPolicies.getDelayedDeliveryTickTimeMillis().get().longValue();
    }

    public boolean isDelayedDeliveryEnabled() {
        return this.topicPolicies.getDelayedDeliveryEnabled().get().booleanValue();
    }

    public int getMaxUnackedMessagesOnSubscription() {
        return this.topicPolicies.getMaxUnackedMessagesOnSubscription().get().intValue();
    }

    @Override // org.apache.pulsar.broker.service.TopicPolicyListener
    public void onUpdate(TopicPolicies topicPolicies) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] update topic policy: {}", this.topic, topicPolicies);
        }
        if (topicPolicies == null) {
            return;
        }
        updateTopicPolicy(topicPolicies);
        updateDispatchRateLimiter();
        updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
            updatePublishDispatcher();
            initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(topicPolicies));
            if (this.subscribeRateLimiter.isPresent()) {
                this.subscribeRateLimiter.ifPresent(subscribeRateLimiter -> {
                    subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate());
                });
            }
            this.replicators.forEach((str, replicator) -> {
                replicator.updateRateLimiter();
            });
            checkMessageExpiry();
            if (topicPolicies.getReplicationClusters() != null) {
                checkReplicationAndRetryOnFailure();
            }
            checkDeduplicationStatus();
            preCreateSubscriptionForCompactionIfNeeded();
            checkPersistencePolicies();
        }).exceptionally(th -> {
            Throwable cause = th instanceof CompletionException ? th.getCause() : th;
            log.error("[{}] update topic policy error: {}", new Object[]{this.topic, cause.getMessage(), cause});
            return null;
        });
    }

    private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
        ArrayList arrayList = new ArrayList((int) this.subscriptions.size());
        this.subscriptions.forEach((str, persistentSubscription) -> {
            ArrayList arrayList2 = new ArrayList(persistentSubscription.getConsumers().size());
            persistentSubscription.getConsumers().forEach(consumer -> {
                arrayList2.add(consumer.checkPermissionsAsync());
            });
            arrayList.add(FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList2).thenRun(() -> {
                Dispatcher dispatcher = persistentSubscription.getDispatcher();
                if (dispatcher != null) {
                    dispatcher.updateRateLimiter();
                }
            }));
        });
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList);
    }

    private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> optional) {
        if (optional.isPresent()) {
            synchronized (this.subscribeRateLimiter) {
                if (!this.subscribeRateLimiter.isPresent() && optional.get().getSubscribeRate() != null && optional.get().getSubscribeRate().subscribeThrottlingRatePerConsumer > 0) {
                    this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
                } else if (!optional.get().isSubscribeRateSet() || optional.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
                    this.subscribeRateLimiter.ifPresent((v0) -> {
                        v0.close();
                    });
                    this.subscribeRateLimiter = Optional.empty();
                }
            }
        }
    }

    protected CompletableFuture<Void> initTopicPolicy() {
        return (this.brokerService.pulsar().getConfig().isSystemTopicEnabled() && this.brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) ? CompletableFuture.completedFuture(null).thenRunAsync(() -> {
            onUpdate(this.brokerService.getPulsar().getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(this.topic)));
        }, (Executor) this.brokerService.getTopicOrderedExecutor()) : CompletableFuture.completedFuture(null);
    }

    @VisibleForTesting
    public MessageDeduplication getMessageDeduplication() {
        return this.messageDeduplication;
    }

    private boolean checkMaxSubscriptionsPerTopicExceed(String str) {
        Integer num;
        return (!StringUtils.isNotEmpty(str) || getSubscription(str) == null) && (num = this.topicPolicies.getMaxSubscriptionsPerTopic().get()) != null && num.intValue() > 0 && this.subscriptions != null && this.subscriptions.size() >= ((long) num.intValue());
    }

    public boolean checkSubscriptionTypesEnable(CommandSubscribe.SubType subType) {
        EnumSet<CommandSubscribe.SubType> enumSet = this.topicPolicies.getSubscriptionTypesEnabled().get();
        return enumSet != null && enumSet.contains(subType);
    }

    public TransactionBufferStats getTransactionBufferStats() {
        return this.transactionBuffer.getStats();
    }

    public TransactionPendingAckStats getTransactionPendingAckStats(String str) {
        return this.subscriptions.get(str).getTransactionPendingAckStats();
    }

    public PositionImpl getMaxReadPosition() {
        return this.transactionBuffer.getMaxReadPosition();
    }

    public boolean isTxnAborted(TxnID txnID) {
        return this.transactionBuffer.isTxnAborted(txnID);
    }

    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
        return this.transactionBuffer.getTransactionInBufferStats(txnID);
    }

    @Override // org.apache.pulsar.broker.service.AbstractTopic
    protected boolean isTerminated() {
        return this.ledger.isTerminated();
    }

    public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String str) {
        return this.subscriptions.get(str).getTransactionInPendingAckStats(txnID);
    }

    public CompletableFuture<ManagedLedger> getPendingAckManagedLedger(String str) {
        PersistentSubscription persistentSubscription = this.subscriptions.get(str);
        return persistentSubscription == null ? FutureUtil.failedFuture(new BrokerServiceException.SubscriptionNotFoundException(this.topic + " not found subscription : " + str)) : persistentSubscription.getPendingAckManageLedger();
    }

    private CompletableFuture<Void> transactionBufferCleanupAndClose() {
        return this.transactionBuffer.clearSnapshot().thenCompose(r3 -> {
            return this.transactionBuffer.closeAsync();
        });
    }

    public long getLastDataMessagePublishedTimestamp() {
        return this.lastDataMessagePublishedTimestamp;
    }

    public TransactionBuffer getTransactionBuffer() {
        return this.transactionBuffer;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.pulsar.broker.service.persistent.PersistentTopic.access$902(org.apache.pulsar.broker.service.persistent.PersistentTopic, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$902(org.apache.pulsar.broker.service.persistent.PersistentTopic r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastActive = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.broker.service.persistent.PersistentTopic.access$902(org.apache.pulsar.broker.service.persistent.PersistentTopic, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.pulsar.broker.service.persistent.PersistentTopic.access$1102(org.apache.pulsar.broker.service.persistent.PersistentTopic, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$1102(org.apache.pulsar.broker.service.persistent.PersistentTopic r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastActive = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.broker.service.persistent.PersistentTopic.access$1102(org.apache.pulsar.broker.service.persistent.PersistentTopic, long):long");
    }

    static {
    }
}
