package org.apache.pulsar.broker.service.nonpersistent;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.class */
public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements NonPersistentDispatcher {
    private final NonPersistentTopic topic;
    protected final Subscription subscription;
    private CompletableFuture<Void> closeFuture;
    private final String name;
    protected final Rate msgDrop;
    private volatile int totalAvailablePermits;
    private final ServiceConfiguration serviceConfig;
    private final RedeliveryTracker redeliveryTracker;
    protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
    private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);

    public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic nonPersistentTopic, Subscription subscription) {
        super(subscription);
        this.closeFuture = null;
        this.totalAvailablePermits = 0;
        this.topic = nonPersistentTopic;
        this.subscription = subscription;
        this.name = nonPersistentTopic.getName() + " / " + subscription.getName();
        this.msgDrop = new Rate();
        this.serviceConfig = nonPersistentTopic.getBrokerService().pulsar().getConfiguration();
        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        if (IS_CLOSED_UPDATER.get(this) == 1) {
            log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.name, consumer);
            consumer.disconnect();
        } else {
            if (isConsumersExceededOnSubscription()) {
                log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", this.name);
                throw new BrokerServiceException.ConsumerBusyException("Subscription reached max consumers limit");
            }
            this.consumerList.add(consumer);
            this.consumerSet.add(consumer);
        }
    }

    private boolean isConsumersExceededOnSubscription() {
        int maxConsumersPerSubscription = this.serviceConfig.getMaxConsumersPerSubscription();
        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= this.consumerList.size();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        if (this.consumerSet.removeAll(consumer) != 1) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Trying to remove a non-connected consumer: {}", this.name, consumer);
            }
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.getAvailablePermits());
            return;
        }
        this.consumerList.remove(consumer);
        log.info("Removed consumer {}", consumer);
        if (this.consumerList.isEmpty()) {
            if (this.closeFuture != null) {
                log.info("[{}] All consumers removed. Subscription is disconnected", this.name);
                this.closeFuture.complete(null);
            }
            TOTAL_AVAILABLE_PERMITS_UPDATER.set(this, 0);
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public boolean isConsumerConnected() {
        return !this.consumerList.isEmpty();
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CopyOnWriteArrayList<Consumer> getConsumers() {
        return this.consumerList;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized boolean canUnsubscribe(Consumer consumer) {
        return this.consumerList.size() == 1 && this.consumerSet.contains(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> close() {
        IS_CLOSED_UPDATER.set(this, 1);
        return disconnectAllConsumers();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void consumerFlow(Consumer consumer, int i) {
        if (!this.consumerSet.contains(consumer)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ignoring flow control from disconnected consumer {}", this.name, consumer);
            }
        } else {
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, i);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Trigger new read after receiving flow control message", consumer);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean z) {
        this.closeFuture = new CompletableFuture<>();
        if (this.consumerList.isEmpty()) {
            this.closeFuture.complete(null);
        } else {
            this.consumerList.forEach((v0) -> {
                v0.disconnect();
            });
        }
        return this.closeFuture;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> disconnectActiveConsumers(boolean z) {
        return disconnectAllConsumers(z);
    }

    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void resetCloseFuture() {
        this.closeFuture = null;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void reset() {
        resetCloseFuture();
        IS_CLOSED_UPDATER.set(this, 0);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Shared;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcher
    public void sendMessages(List<Entry> list) {
        Consumer nextConsumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null;
        if (nextConsumer == null) {
            list.forEach(entry -> {
                int numberOfMessagesInBatch = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), this.subscription.toString(), -1L);
                if (numberOfMessagesInBatch > 0) {
                    this.msgDrop.recordEvent(numberOfMessagesInBatch);
                }
                entry.release();
            });
            return;
        }
        SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
        EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(list.size());
        filterEntriesForConsumer(list, entryBatchSizes, threadLocal, null, null, false);
        nextConsumer.sendMessages(list, entryBatchSizes, null, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker());
        TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -threadLocal.getTotalMessages());
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcher
    public boolean hasPermits() {
        return TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0;
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcher
    public Rate getMessageDropRate() {
        return this.msgDrop;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers
    public boolean isConsumerAvailable(Consumer consumer) {
        return consumer != null && consumer.getAvailablePermits() > 0 && consumer.isWritable();
    }
}
