package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryAndMetadata;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.SharedConsumerAssignor;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.class */
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, AsyncCallbacks.ReadEntriesCallback {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
    private CompletableFuture<Void> closeFuture;
    protected final MessageRedeliveryController redeliveryMessages;
    protected final RedeliveryTracker redeliveryTracker;
    private Optional<DelayedDeliveryTracker> delayedDeliveryTracker;
    protected volatile boolean havePendingRead;
    protected volatile boolean havePendingReplayRead;
    protected volatile PositionImpl minReplayedPosition;
    protected boolean shouldRewindBeforeReadingOrReplaying;
    protected final String name;
    private boolean sendInProgress;
    protected volatile int totalAvailablePermits;
    protected volatile int readBatchSize;
    protected final Backoff readFailureBackoff;
    protected volatile int totalUnackedMessages;
    private volatile int blockedDispatcherOnUnackedMsgs;
    protected Optional<DispatchRateLimiter> dispatchRateLimiter;
    private AtomicBoolean isRescheduleReadInProgress;
    protected final ExecutorService dispatchMessagesThread;
    private final SharedConsumerAssignor assignor;
    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
    private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
    protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers$ReadType.class */
    public enum ReadType {
        Normal,
        Replay
    }

    public PersistentDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription) {
        this(persistentTopic, managedCursor, subscription, true);
    }

    public PersistentDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription, boolean z) {
        super(subscription, persistentTopic.getBrokerService().pulsar().getConfiguration());
        this.closeFuture = null;
        this.delayedDeliveryTracker = Optional.empty();
        this.havePendingRead = false;
        this.havePendingReplayRead = false;
        this.minReplayedPosition = null;
        this.shouldRewindBeforeReadingOrReplaying = false;
        this.sendInProgress = false;
        this.totalAvailablePermits = 0;
        this.totalUnackedMessages = 0;
        this.blockedDispatcherOnUnackedMsgs = 0;
        this.dispatchRateLimiter = Optional.empty();
        this.isRescheduleReadInProgress = new AtomicBoolean(false);
        this.cursor = managedCursor;
        this.lastIndividualDeletedRangeFromCursorRecovery = managedCursor.getLastIndividualDeletedRange();
        this.name = persistentTopic.getName() + " / " + Codec.decode(managedCursor.getName());
        this.topic = persistentTopic;
        this.dispatchMessagesThread = persistentTopic.getBrokerService().getTopicOrderedExecutor().chooseThread();
        this.redeliveryMessages = new MessageRedeliveryController(z);
        this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        initializeDispatchRateLimiterIfNeeded();
        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, (v1) -> {
            addMessageToReplay(v1);
        });
        this.readFailureBackoff = new Backoff(persistentTopic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
        if (IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.name, consumer);
            consumer.disconnect();
            return CompletableFuture.completedFuture(null);
        }
        if (this.consumerList.isEmpty()) {
            if (this.havePendingRead || this.havePendingReplayRead) {
                this.shouldRewindBeforeReadingOrReplaying = true;
            } else {
                this.cursor.rewind();
                this.shouldRewindBeforeReadingOrReplaying = false;
            }
            this.redeliveryMessages.clear();
            this.delayedDeliveryTracker.ifPresent(delayedDeliveryTracker -> {
                if (delayedDeliveryTracker instanceof InMemoryDelayedDeliveryTracker) {
                    delayedDeliveryTracker.clear();
                }
            });
        }
        if (isConsumersExceededOnSubscription()) {
            log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}", this.name, consumer);
            return FutureUtil.failedFuture(new BrokerServiceException.ConsumerBusyException("Subscription reached max consumers limit"));
        }
        if (this.consumerSet.contains(consumer)) {
            log.warn("[{}] Attempting to add a consumer that already registered {}", this.name, consumer);
        }
        this.consumerList.add(consumer);
        if (this.consumerList.size() > 1 && consumer.getPriorityLevel() < this.consumerList.get(this.consumerList.size() - 2).getPriorityLevel()) {
            this.consumerList.sort(Comparator.comparingInt((v0) -> {
                return v0.getPriorityLevel();
            }));
        }
        this.consumerSet.add(consumer);
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher
    public boolean isConsumersExceededOnSubscription() {
        return isConsumersExceededOnSubscription(this.topic, this.consumerList.size());
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        addUnAckedMessages(-consumer.getUnackedMessages());
        if (this.consumerSet.removeAll(consumer) != 1) {
            log.error("[{}] Trying to remove a non-connected consumer: {}", this.name, consumer);
            this.consumerList.removeIf(consumer2 -> {
                return consumer.equals(consumer2);
            });
            if (this.consumerList.isEmpty()) {
                clearComponentsAfterRemovedAllConsumers();
                return;
            }
            return;
        }
        this.consumerList.remove(consumer);
        log.info("Removed consumer {} with pending {} acks", consumer, Long.valueOf(consumer.getPendingAcks().size()));
        if (this.consumerList.isEmpty()) {
            clearComponentsAfterRemovedAllConsumers();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer are left, reading more entries", this.name);
        }
        consumer.getPendingAcks().forEach((j, j2, j3, j4) -> {
            addMessageToReplay(j, j2, j4);
        });
        this.totalAvailablePermits -= consumer.getAvailablePermits();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. New dispatcher permit count is {}", new Object[]{this.name, Integer.valueOf(consumer.getAvailablePermits()), Integer.valueOf(this.totalAvailablePermits)});
        }
        readMoreEntries();
    }

    private synchronized void clearComponentsAfterRemovedAllConsumers() {
        cancelPendingRead();
        this.redeliveryMessages.clear();
        this.redeliveryTracker.clear();
        if (this.closeFuture != null) {
            log.info("[{}] All consumers removed. Subscription is disconnected", this.name);
            this.closeFuture.complete(null);
        }
        this.totalAvailablePermits = 0;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void consumerFlow(Consumer consumer, int i) {
        this.topic.getBrokerService().executor().execute(() -> {
            internalConsumerFlow(consumer, i);
        });
    }

    private synchronized void internalConsumerFlow(Consumer consumer, int i) {
        if (!this.consumerSet.contains(consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", this.name, consumer);
            }
        } else {
            this.totalAvailablePermits += i;
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {} after adding {} permits", new Object[]{this.name, consumer, Integer.valueOf(this.totalAvailablePermits), Integer.valueOf(i)});
            }
            readMoreEntries();
        }
    }

    public void readMoreEntriesAsync() {
        this.topic.getBrokerService().executor().execute(this::readMoreEntries);
    }

    public synchronized void readMoreEntries() {
        if (isSendInProgress() || shouldPauseDeliveryForDelayTracker()) {
            return;
        }
        int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
        int max = Math.max(this.totalAvailablePermits, firstAvailableConsumerPermits);
        if (max <= 0 || firstAvailableConsumerPermits <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer buffer is full, pause reading", this.name);
                return;
            }
            return;
        }
        Pair<Integer, Long> calculateToRead = calculateToRead(max);
        int intValue = ((Integer) calculateToRead.getLeft()).intValue();
        long longValue = ((Long) calculateToRead.getRight()).longValue();
        if (intValue == -1 || longValue == -1) {
            return;
        }
        NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(intValue);
        NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
        if (!filterOutEntriesWillBeDiscarded.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, Integer.valueOf(filterOutEntriesWillBeDiscarded.size()), Integer.valueOf(this.consumerList.size())});
            }
            this.havePendingReplayRead = true;
            this.minReplayedPosition = messagesToReplayNow.first();
            Set<? extends Position> asyncReplayEntriesInOrder = this.topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(filterOutEntriesWillBeDiscarded) : asyncReplayEntries(filterOutEntriesWillBeDiscarded);
            asyncReplayEntriesInOrder.forEach(position -> {
                this.redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId());
            });
            if (filterOutEntriesWillBeDiscarded.size() - asyncReplayEntriesInOrder.size() == 0) {
                this.havePendingReplayRead = false;
                readMoreEntriesAsync();
                return;
            }
            return;
        }
        if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, Integer.valueOf(this.totalUnackedMessages), Integer.valueOf(this.topic.getMaxUnackedMessagesOnSubscription())});
                return;
            }
            return;
        }
        if (this.havePendingRead || !hasConsumersNeededNormalRead()) {
            if (log.isDebugEnabled()) {
                if (messagesToReplayNow.isEmpty()) {
                    log.debug("[{}] Cannot schedule next read until previous one is done", this.name);
                    return;
                } else {
                    log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were filtered out due to the mechanism of Key_Shared mode, and the left consumers have no permits now", this.topic.getName(), getSubscriptionName());
                    return;
                }
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, Integer.valueOf(intValue), Integer.valueOf(this.consumerList.size())});
        }
        this.havePendingRead = true;
        NavigableSet<PositionImpl> messagesToReplayNow2 = getMessagesToReplayNow(1);
        if (messagesToReplayNow2.isEmpty()) {
            this.minReplayedPosition = null;
        } else {
            this.minReplayedPosition = messagesToReplayNow2.first();
            this.redeliveryMessages.add(this.minReplayedPosition.getLedgerId(), this.minReplayedPosition.getEntryId());
        }
        if (!this.delayedDeliveryTracker.isPresent()) {
            this.cursor.asyncReadEntriesOrWait(intValue, longValue, this, ReadType.Normal, this.topic.getMaxReadPosition());
            return;
        }
        Predicate predicate = null;
        DelayedDeliveryTracker delayedDeliveryTracker = this.delayedDeliveryTracker.get();
        if (delayedDeliveryTracker instanceof BucketDelayedDeliveryTracker) {
            predicate = positionImpl -> {
                return ((BucketDelayedDeliveryTracker) delayedDeliveryTracker).containsMessage(positionImpl.getLedgerId(), positionImpl.getEntryId());
            };
        }
        this.cursor.asyncReadEntriesWithSkipOrWait(intValue, longValue, this, ReadType.Normal, this.topic.getMaxReadPosition(), predicate);
    }

    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher
    protected void reScheduleRead() {
        if (this.isRescheduleReadInProgress.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Reschedule message read in {} ms", new Object[]{this.topic.getName(), this.name, 1000});
            }
            this.topic.getBrokerService().executor().schedule(() -> {
                this.isRescheduleReadInProgress.set(false);
                readMoreEntries();
            }, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    protected Pair<Integer, Long> calculateToRead(int i) {
        int min = Math.min(i, this.readBatchSize);
        long dispatcherMaxReadSizeBytes = this.serviceConfig.getDispatcherMaxReadSizeBytes();
        Consumer randomConsumer = getRandomConsumer();
        if (randomConsumer != null && randomConsumer.isPreciseDispatcherFlowControl()) {
            min = Math.min((int) Math.ceil((i * 1.0d) / Math.max(1, randomConsumer.getAvgMessagesPerEntry())), this.readBatchSize);
        }
        if (!isConsumerWritable()) {
            min = 1;
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getBrokerDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter dispatchRateLimiter = this.topic.getBrokerDispatchRateLimiter().get();
                if (reachDispatchRateLimit(dispatchRateLimiter)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(dispatchRateLimiter.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter.getDispatchRateOnByte()), 1000});
                    }
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> updateMessagesToRead = updateMessagesToRead(dispatchRateLimiter, min, dispatcherMaxReadSizeBytes);
                min = ((Integer) updateMessagesToRead.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) updateMessagesToRead.getRight()).longValue();
            }
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter dispatchRateLimiter2 = this.topic.getDispatchRateLimiter().get();
                if (reachDispatchRateLimit(dispatchRateLimiter2)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(dispatchRateLimiter2.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter2.getDispatchRateOnByte()), 1000});
                    }
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> updateMessagesToRead2 = updateMessagesToRead(dispatchRateLimiter2, min, dispatcherMaxReadSizeBytes);
                min = ((Integer) updateMessagesToRead2.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) updateMessagesToRead2.getRight()).longValue();
            }
            if (this.dispatchRateLimiter.isPresent()) {
                if (reachDispatchRateLimit(this.dispatchRateLimiter.get())) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(this.dispatchRateLimiter.get().getDispatchRateOnMsg()), Long.valueOf(this.dispatchRateLimiter.get().getDispatchRateOnByte()), 1000});
                    }
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> updateMessagesToRead3 = updateMessagesToRead(this.dispatchRateLimiter.get(), min, dispatcherMaxReadSizeBytes);
                min = ((Integer) updateMessagesToRead3.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) updateMessagesToRead3.getRight()).longValue();
            }
        }
        if (!this.havePendingReplayRead) {
            return Pair.of(Integer.valueOf(Math.max(min, 1)), Long.valueOf(Math.max(dispatcherMaxReadSizeBytes, 1L)));
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Skipping replay while awaiting previous read to complete", this.name);
        }
        return Pair.of(-1, -1L);
    }

    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, ReadType.Replay);
    }

    protected Set<? extends Position> asyncReplayEntriesInOrder(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, ReadType.Replay, true);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> close() {
        Optional<DelayedDeliveryTracker> optional;
        IS_CLOSED_UPDATER.set(this, 1);
        synchronized (this) {
            optional = this.delayedDeliveryTracker;
            this.delayedDeliveryTracker = Optional.empty();
        }
        optional.ifPresent((v0) -> {
            v0.close();
        });
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.close();
        });
        return disconnectAllConsumers();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean z) {
        this.closeFuture = new CompletableFuture<>();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach(consumer -> {
                consumer.disconnect(z);
            });
            cancelPendingRead();
        }
        return this.closeFuture;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> disconnectActiveConsumers(boolean z) {
        return disconnectAllConsumers(z);
    }

    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void resetCloseFuture() {
        this.closeFuture = null;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void reset() {
        resetCloseFuture();
        IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Shared;
    }

    public final synchronized void readEntriesComplete(List<Entry> list, Object obj) {
        ReadType readType = (ReadType) obj;
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int min = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        this.readFailureBackoff.reduceToHalf();
        if (this.shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) {
            list.forEach((v0) -> {
                v0.release();
            });
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            readMoreEntries();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Distributing {} messages to {} consumers", new Object[]{this.name, Integer.valueOf(list.size()), Integer.valueOf(this.consumerList.size())});
        }
        long sum = list.stream().mapToLong((v0) -> {
            return v0.getLength();
        }).sum();
        updatePendingBytesToDispatch(sum);
        if (this.serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
            acquireSendInProgress();
            this.dispatchMessagesThread.execute(() -> {
                if (!sendMessagesToConsumers(readType, list, false)) {
                    updatePendingBytesToDispatch(-sum);
                } else {
                    updatePendingBytesToDispatch(-sum);
                    readMoreEntries();
                }
            });
        } else if (!sendMessagesToConsumers(readType, list, true)) {
            updatePendingBytesToDispatch(-sum);
        } else {
            updatePendingBytesToDispatch(-sum);
            readMoreEntriesAsync();
        }
    }

    protected synchronized void acquireSendInProgress() {
        this.sendInProgress = true;
    }

    protected synchronized void releaseSendInProgress() {
        this.sendInProgress = false;
    }

    protected synchronized boolean isSendInProgress() {
        return this.sendInProgress;
    }

    protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> list, boolean z) {
        if (z) {
            acquireSendInProgress();
        }
        try {
            boolean trySendMessagesToConsumers = trySendMessagesToConsumers(readType, list);
            releaseSendInProgress();
            return trySendMessagesToConsumers;
        } catch (Throwable th) {
            releaseSendInProgress();
            throw th;
        }
    }

    protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> list) {
        if (needTrimAckedMessages()) {
            this.cursor.trimDeletedEntries(list);
        }
        int size = list.size();
        if (size == 0) {
            return true;
        }
        MessageMetadata[] messageMetadataArr = new MessageMetadata[list.size()];
        int i = 0;
        boolean z = false;
        for (int i2 = 0; i2 < messageMetadataArr.length; i2++) {
            MessageMetadata peekAndCopyMessageMetadata = Commands.peekAndCopyMessageMetadata(list.get(i2).getDataBuffer(), this.subscription.toString(), -1L);
            if (peekAndCopyMessageMetadata != null) {
                i += peekAndCopyMessageMetadata.getNumMessagesInBatch();
                if (!z && peekAndCopyMessageMetadata.hasUuid()) {
                    z = true;
                }
            }
            messageMetadataArr[i2] = peekAndCopyMessageMetadata;
        }
        if (z) {
            return sendChunkedMessagesToConsumers(readType, list, messageMetadataArr);
        }
        int i3 = 0;
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        int max = i > 0 ? Math.max(i / list.size(), 1) : 1;
        while (size > 0) {
            int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
            if (!(Math.max(this.totalAvailablePermits, firstAvailableConsumerPermits) > 0 && firstAvailableConsumerPermits > 0)) {
                break;
            }
            Consumer nextConsumer = getNextConsumer();
            if (nextConsumer == null) {
                log.info("[{}] rewind because no available consumer found from total {}", this.name, Integer.valueOf(this.consumerList.size()));
                list.subList(i3, list.size()).forEach((v0) -> {
                    v0.release();
                });
                this.cursor.rewind();
                return false;
            }
            int availablePermits = nextConsumer.isWritable() ? nextConsumer.getAvailablePermits() : 1;
            if (nextConsumer.getMaxUnackedMessages() > 0) {
                availablePermits = Math.min(availablePermits, Math.max(nextConsumer.getMaxUnackedMessages() - nextConsumer.getUnackedMessages(), 0));
            }
            if (log.isDebugEnabled() && !nextConsumer.isWritable()) {
                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; availablePermits are {}", new Object[]{this.topic.getName(), this.name, nextConsumer, Integer.valueOf(nextConsumer.getAvailablePermits())});
            }
            int max2 = Math.max(Math.min(Math.min(i, availablePermits), this.serviceConfig.getDispatcherMaxRoundRobinBatchSize()) / max, 1);
            List<Entry> subList = list.subList(i3, Math.min(i3 + max2, list.size()));
            if (readType == ReadType.Replay) {
                subList.forEach(entry -> {
                    this.redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
                });
            }
            SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
            EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(subList.size());
            EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(subList.size());
            j3 += filterEntriesForConsumer(messageMetadataArr, i3, subList, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, readType == ReadType.Replay, nextConsumer);
            nextConsumer.sendMessages(subList, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), this.redeliveryTracker);
            int totalMessages = threadLocal.getTotalMessages();
            i -= totalMessages;
            i3 += max2;
            size -= max2;
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(totalMessages - entryBatchIndexesAcks.getTotalAckedIndexCount()));
            if (log.isDebugEnabled()) {
                log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in PersistentDispatcherMultipleConsumers", new Object[]{this.name, Integer.valueOf(totalMessages), Integer.valueOf(entryBatchIndexesAcks.getTotalAckedIndexCount())});
            }
            j += threadLocal.getTotalMessages();
            j2 += threadLocal.getTotalBytes();
        }
        acquirePermitsForDeliveredMessages(this.topic, this.cursor, j3, j, j2);
        if (size <= 0) {
            return true;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", this.name, Integer.valueOf(list.size() - i3));
        }
        list.subList(i3, list.size()).forEach(entry2 -> {
            addMessageToReplay(entry2.getLedgerId(), entry2.getEntryId(), getStickyKeyHash(entry2));
            entry2.release();
        });
        return true;
    }

    private boolean sendChunkedMessagesToConsumers(ReadType readType, List<Entry> list, MessageMetadata[] messageMetadataArr) {
        ArrayList arrayList = new ArrayList(messageMetadataArr.length);
        for (int i = 0; i < messageMetadataArr.length; i++) {
            arrayList.add(EntryAndMetadata.create(list.get(i), messageMetadataArr[i]));
        }
        Map<Consumer, List<EntryAndMetadata>> assign = this.assignor.assign(arrayList, this.consumerList.size());
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        AtomicInteger atomicInteger = new AtomicInteger(assign.size());
        for (Map.Entry<Consumer, List<EntryAndMetadata>> entry : assign.entrySet()) {
            Consumer key = entry.getKey();
            List<EntryAndMetadata> value = entry.getValue();
            int min = Math.min(key.getAvailablePermits(), value.size());
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, key.consumerName(), Integer.valueOf(min), readType});
            }
            if (min < value.size()) {
                for (int i2 = min; i2 < value.size(); i2++) {
                    addMessageToReplay(value.get(i2));
                    value.set(i2, null);
                }
            }
            if (min == 0) {
                atomicInteger.decrementAndGet();
            } else {
                if (readType == ReadType.Replay) {
                    value.stream().limit(min).forEach(entryAndMetadata -> {
                        this.redeliveryMessages.remove(entryAndMetadata.getLedgerId(), entryAndMetadata.getEntryId());
                    });
                }
                SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
                EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(min);
                EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(min);
                j3 += filterEntriesForConsumer(value, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, readType == ReadType.Replay, key);
                key.sendMessages(value, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker()).addListener(future -> {
                    if (future.isDone() && atomicInteger.decrementAndGet() == 0) {
                        readMoreEntries();
                    }
                });
                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(threadLocal.getTotalMessages() - entryBatchIndexesAcks.getTotalAckedIndexCount()));
                j += threadLocal.getTotalMessages();
                j2 += threadLocal.getTotalBytes();
            }
        }
        acquirePermitsForDeliveredMessages(this.topic, this.cursor, j3, j, j2);
        return atomicInteger.get() == 0;
    }

    public synchronized void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        ReadType readType = (ReadType) obj;
        long next = this.readFailureBackoff.next();
        if (managedLedgerException instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog(false) == 0) {
                checkAndApplyReachedEndOfTopicOrTopicMigration(this.consumerList);
            }
        } else if ((managedLedgerException.getCause() instanceof TransactionException.TransactionNotSealedException) || (managedLedgerException.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException)) {
            next = 1;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, managedLedgerException.getMessage(), readType, Double.valueOf(1 / 1000.0d)});
            }
        } else if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), managedLedgerException.getMessage(), readType, Double.valueOf(next / 1000.0d)});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Error reading entries at {} : {}, Read Type {} - Retrying to read in {} seconds", new Object[]{this.name, this.cursor.getReadPosition(), managedLedgerException.getMessage(), readType, Double.valueOf(next / 1000.0d)});
        }
        if (this.shouldRewindBeforeReadingOrReplaying) {
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.cursor.rewind();
        }
        if (readType == ReadType.Normal) {
            this.havePendingRead = false;
        } else {
            this.havePendingReplayRead = false;
            if (managedLedgerException instanceof ManagedLedgerException.InvalidReplayPositionException) {
                PositionImpl markDeletedPosition = this.cursor.getMarkDeletedPosition();
                this.redeliveryMessages.removeAllUpTo(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId());
            }
        }
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        this.topic.getBrokerService().executor().schedule(() -> {
            synchronized (this) {
                if (!this.havePendingRead || readType == ReadType.Replay) {
                    log.info("[{}] Retrying read operation", this.name);
                    readMoreEntries();
                } else {
                    log.info("[{}] Skipping read retry: havePendingRead {}", new Object[]{this.name, Boolean.valueOf(this.havePendingRead), managedLedgerException});
                }
            }
        }, next, TimeUnit.MILLISECONDS);
    }

    private boolean needTrimAckedMessages() {
        return this.lastIndividualDeletedRangeFromCursorRecovery != null && this.lastIndividualDeletedRangeFromCursorRecovery.upperEndpoint().compareTo(this.cursor.getReadPosition()) > 0;
    }

    protected boolean isAtleastOneConsumerAvailable() {
        return getFirstAvailableConsumerPermits() > 0;
    }

    protected int getFirstAvailableConsumerPermits() {
        int availablePermits;
        if (this.consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == 1) {
            return 0;
        }
        Iterator<Consumer> it = this.consumerList.iterator();
        while (it.hasNext()) {
            Consumer next = it.next();
            if (next != null && !next.isBlocked() && (availablePermits = next.getAvailablePermits()) > 0) {
                return availablePermits;
            }
        }
        return 0;
    }

    private boolean isConsumerWritable() {
        Iterator<Consumer> it = this.consumerList.iterator();
        while (it.hasNext()) {
            if (it.next().isWritable()) {
                return true;
            }
        }
        if (!log.isDebugEnabled()) {
            return false;
        }
        log.debug("[{}-{}] consumer is not writable", this.topic.getName(), this.name);
        return false;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers
    public boolean isConsumerAvailable(Consumer consumer) {
        return (consumer == null || consumer.isBlocked() || consumer.getAvailablePermits() <= 0) ? false : true;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long j) {
        consumer.getPendingAcks().forEach((j2, j3, j4, j5) -> {
            if (addMessageToReplay(j2, j3, j5)) {
                this.redeliveryTracker.incrementAndGetRedeliveryCount(PositionImpl.get(j2, j3));
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, this.redeliveryMessages});
        }
        readMoreEntries();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        list.forEach(positionImpl -> {
            if (addMessageToReplay(positionImpl.getLedgerId(), positionImpl.getEntryId())) {
                this.redeliveryTracker.incrementAndGetRedeliveryCount(positionImpl);
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", new Object[]{this.name, consumer, list});
        }
        readMoreEntries();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void addUnAckedMessages(int i) {
        int maxUnackedMessagesOnSubscription = this.topic.getMaxUnackedMessagesOnSubscription();
        if (maxUnackedMessagesOnSubscription <= 0 && this.blockedDispatcherOnUnackedMsgs == 1 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", this.name);
            readMoreEntriesAsync();
        }
        int addAndGet = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, i);
        if (addAndGet >= maxUnackedMessagesOnSubscription && maxUnackedMessagesOnSubscription > 0 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 0, 1)) {
            log.debug("[{}] Dispatcher is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, Integer.valueOf(addAndGet), Integer.valueOf(maxUnackedMessagesOnSubscription)});
        } else if (this.topic.getBrokerService().isBrokerDispatchingBlocked() && this.blockedDispatcherOnUnackedMsgs == 1) {
            if (this.totalUnackedMessages < this.topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
                this.topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(new PersistentDispatcherMultipleConsumers[]{this}));
            }
        } else if (this.blockedDispatcherOnUnackedMsgs == 1 && addAndGet < maxUnackedMessagesOnSubscription / 2 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 1, 0)) {
            log.debug("[{}] Dispatcher is unblocked", this.name);
            readMoreEntriesAsync();
        }
        this.topic.getBrokerService().addUnAckedMessages(this, i);
    }

    public boolean isBlockedDispatcherOnUnackedMsgs() {
        return this.blockedDispatcherOnUnackedMsgs == 1;
    }

    public void blockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 1;
    }

    public void unBlockDispatcherOnUnackedMsgs() {
        this.blockedDispatcherOnUnackedMsgs = 0;
    }

    public int getTotalUnackedMessages() {
        return this.totalUnackedMessages;
    }

    public String getName() {
        return this.name;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void updateRateLimiter() {
        if (initializeDispatchRateLimiterIfNeeded()) {
            return;
        }
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.updateDispatchRate();
        });
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public boolean initializeDispatchRateLimiterIfNeeded() {
        if (this.dispatchRateLimiter.isPresent() || !DispatchRateLimiter.isDispatchRateEnabled(this.topic.getSubscriptionDispatchRate(getSubscriptionName()))) {
            return false;
        }
        this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, getSubscriptionName(), DispatchRateLimiter.Type.SUBSCRIPTION));
        return true;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public boolean trackDelayedDelivery(long j, long j2, MessageMetadata messageMetadata) {
        if (!this.topic.isDelayedDeliveryEnabled()) {
            return false;
        }
        synchronized (this) {
            if (!this.delayedDeliveryTracker.isPresent()) {
                if (!messageMetadata.hasDeliverAtTime()) {
                    return false;
                }
                this.delayedDeliveryTracker = Optional.of(this.topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
            }
            this.delayedDeliveryTracker.get().resetTickTime(this.topic.getDelayedDeliveryTickTimeMillis());
            return this.delayedDeliveryTracker.get().addMessage(j, j2, messageMetadata.hasDeliverAtTime() ? messageMetadata.getDeliverAtTime() : -1L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int i) {
        if (this.delayedDeliveryTracker.isPresent() && this.delayedDeliveryTracker.get().hasMessageAvailable()) {
            this.delayedDeliveryTracker.get().resetTickTime(this.topic.getDelayedDeliveryTickTimeMillis());
            this.delayedDeliveryTracker.get().getScheduledMessages(i).forEach(positionImpl -> {
                this.redeliveryMessages.add(positionImpl.getLedgerId(), positionImpl.getEntryId());
            });
        }
        return !this.redeliveryMessages.isEmpty() ? this.redeliveryMessages.getMessagesToReplayNow(i) : Collections.emptyNavigableSet();
    }

    protected NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> navigableSet) {
        return navigableSet;
    }

    protected boolean hasConsumersNeededNormalRead() {
        return true;
    }

    protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
        return this.delayedDeliveryTracker.isPresent() && this.delayedDeliveryTracker.get().shouldPauseAllDeliveries();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public long getNumberOfDelayedMessages() {
        return ((Long) this.delayedDeliveryTracker.map((v0) -> {
            return v0.getNumberOfDelayedMessages();
        }).orElse(0L)).longValue();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> clearDelayedMessages() {
        if (!this.topic.isDelayedDeliveryEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.delayedDeliveryTracker.isPresent()) {
            return this.delayedDeliveryTracker.get().clear();
        }
        DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = this.topic.getBrokerService().getDelayedDeliveryTrackerFactory();
        return delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory ? ((BucketDelayedDeliveryTrackerFactory) delayedDeliveryTrackerFactory).cleanResidualSnapshots(this.cursor) : CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void cursorIsReset() {
        if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
            this.lastIndividualDeletedRangeFromCursorRecovery = null;
        }
    }

    private void addMessageToReplay(Entry entry) {
        addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
        entry.release();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean addMessageToReplay(long j, long j2, long j3) {
        if (!checkIfMessageIsUnacked(j, j2)) {
            return false;
        }
        this.redeliveryMessages.add(j, j2, j3);
        return true;
    }

    protected boolean addMessageToReplay(long j, long j2) {
        if (!checkIfMessageIsUnacked(j, j2)) {
            return false;
        }
        this.redeliveryMessages.add(j, j2);
        return true;
    }

    private boolean checkIfMessageIsUnacked(long j, long j2) {
        Position markDeletedPosition = this.cursor.getMarkDeletedPosition();
        return markDeletedPosition == null || j > markDeletedPosition.getLedgerId() || (j == markDeletedPosition.getLedgerId() && j2 > markDeletedPosition.getEntryId());
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public boolean checkAndUnblockIfStuck() {
        if (this.cursor.checkAndUpdateReadPositionChanged() || this.totalAvailablePermits <= 0 || this.havePendingReplayRead || this.havePendingRead || this.cursor.getNumberOfEntriesInBacklog(false) <= 0) {
            return false;
        }
        log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", this.topic.getName(), this.name);
        readMoreEntries();
        return true;
    }

    public PersistentTopic getTopic() {
        return this.topic;
    }

    public long getDelayedTrackerMemoryUsage() {
        return ((Long) this.delayedDeliveryTracker.map((v0) -> {
            return v0.getBufferMemoryUsage();
        }).orElse(0L)).longValue();
    }

    public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
        if (!this.delayedDeliveryTracker.isEmpty() && (this.delayedDeliveryTracker.get() instanceof BucketDelayedDeliveryTracker)) {
            return ((BucketDelayedDeliveryTracker) this.delayedDeliveryTracker.get()).genTopicMetricMap();
        }
        return Collections.emptyMap();
    }

    public ManagedCursor getCursor() {
        return this.cursor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getStickyKeyHash(Entry entry) {
        return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
    }
}
