package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Lists;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.class */
public class PersistentStreamingDispatcherSingleActiveConsumer extends PersistentDispatcherSingleActiveConsumer implements StreamingDispatcher {
    private static final Logger log = LoggerFactory.getLogger(PersistentStreamingDispatcherSingleActiveConsumer.class);
    private final StreamingEntryReader streamingEntryReader;

    public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor managedCursor, PulsarApi.CommandSubscribe.SubType subType, int i, PersistentTopic persistentTopic, Subscription subscription) {
        super(managedCursor, subType, i, persistentTopic, subscription);
        this.streamingEntryReader = new StreamingEntryReader(this.cursor, this, this.topic);
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public void canReadMoreEntries(boolean z) {
        this.havePendingRead = false;
        this.topic.getBrokerService().executor().schedule(() -> {
            this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topicName, SafeRun.safeRun(() -> {
                synchronized (this) {
                    Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
                    if (consumer == null || this.havePendingRead) {
                        log.info("[{}-{}] Skipping read as we still havePendingRead {}", this.name, consumer);
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}-{}] Scheduling read ", this.name, consumer);
                        }
                        readMoreEntries(consumer);
                    }
                }
            }));
        }, z ? this.readFailureBackoff.next() : 0L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer, org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.streamingEntryReader.cancelReadRequests()) {
            this.havePendingRead = false;
        }
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public synchronized void notifyConsumersEndOfTopic() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0) {
            this.consumers.forEach((v0) -> {
                v0.reachedEndOfTopic();
            });
        }
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public String getName() {
        return this.name;
    }

    @Override // org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher
    public void readEntryComplete(Entry entry, PendingReadEntryRequest pendingReadEntryRequest) {
        this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.name, SafeRun.safeRun(() -> {
            internalReadEntryComplete(entry, pendingReadEntryRequest);
        }));
    }

    public synchronized void internalReadEntryComplete(Entry entry, PendingReadEntryRequest pendingReadEntryRequest) {
        Consumer select;
        if (pendingReadEntryRequest.isLast()) {
            this.readFailureBackoff.reduceToHalf();
            this.havePendingRead = false;
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int min = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Increasing read batch size from {} to {}", new Object[]{this.name, ((Consumer) pendingReadEntryRequest.ctx).consumerName(), Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
        if (this.isKeyHashRangeFiltered && ((select = this.stickyKeyConsumerSelector.select(peekStickyKey(entry.getDataBuffer()))) == null || consumer != select)) {
            entry.release();
            return;
        }
        Consumer consumer2 = (Consumer) pendingReadEntryRequest.ctx;
        pendingReadEntryRequest.recycle();
        if (consumer != null && consumer2 == consumer) {
            EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(1);
            SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
            EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(1);
            filterEntriesForConsumer(Lists.newArrayList(new Entry[]{entry}), entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, false);
            this.cursor.seek(this.cursor.getManagedLedger().getNextValidPosition(entry.getPosition()));
            dispatchEntriesToConsumer(consumer, Lists.newArrayList(new Entry[]{entry}), entryBatchSizes, entryBatchIndexesAcks, threadLocal);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Rewind because no available consumer found to dispatch message to.", this.name);
        }
        entry.release();
        this.streamingEntryReader.cancelReadRequests();
        this.havePendingRead = false;
        if (consumer != null) {
            notifyActiveConsumerChanged(consumer);
            readMoreEntries(consumer);
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer, org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void readMoreEntries(Consumer consumer) {
        if (null == consumer) {
            return;
        }
        if (this.havePendingRead || consumer.getAvailablePermits() <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Consumer buffer is full, pause reading", this.name, consumer);
                return;
            }
            return;
        }
        int calculateNumOfMessageToRead = calculateNumOfMessageToRead(consumer);
        if (-1 == calculateNumOfMessageToRead) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Schedule read of {} messages", new Object[]{this.name, consumer, Integer.valueOf(calculateNumOfMessageToRead)});
        }
        this.havePendingRead = true;
        if (consumer.readCompacted()) {
            this.topic.getCompactedTopic().asyncReadEntriesOrWait(this.cursor, calculateNumOfMessageToRead, this, consumer);
        } else {
            this.streamingEntryReader.asyncReadEntries(calculateNumOfMessageToRead, this.serviceConfig.getDispatcherMaxReadSizeBytes(), consumer);
        }
    }
}
