package org.apache.pulsar.broker.service.nonpersistent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.shade.com.carrotsearch.hppc.ObjectSet;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMode;

/* loaded from: input_file:org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.class */
public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
    private final StickyKeyConsumerSelector selector;
    private final KeySharedMode keySharedMode;
    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() { // from class: org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal
        public Map<Consumer, List<Entry>> initialValue() throws Exception {
            return new HashMap();
        }
    };

    public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic nonPersistentTopic, Subscription subscription, KeySharedMeta keySharedMeta) {
        super(nonPersistentTopic, subscription);
        this.keySharedMode = keySharedMeta.getKeySharedMode();
        switch (this.keySharedMode) {
            case STICKY:
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                return;
            case AUTO_SPLIT:
            default:
                ServiceConfiguration configuration = nonPersistentTopic.getBrokerService().getPulsar().getConfiguration();
                if (configuration.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(configuration.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    return;
                } else {
                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                    return;
                }
        }
    }

    @VisibleForTesting
    NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic nonPersistentTopic, Subscription subscription, StickyKeyConsumerSelector stickyKeyConsumerSelector) {
        super(nonPersistentTopic, subscription);
        if (stickyKeyConsumerSelector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
            this.keySharedMode = KeySharedMode.STICKY;
        } else if ((stickyKeyConsumerSelector instanceof ConsistentHashingStickyKeyConsumerSelector) || (stickyKeyConsumerSelector instanceof HashRangeAutoSplitStickyKeyConsumerSelector)) {
            this.keySharedMode = KeySharedMode.AUTO_SPLIT;
        } else {
            this.keySharedMode = null;
        }
        this.selector = stickyKeyConsumerSelector;
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        try {
            this.selector.addConsumer(consumer);
        } catch (BrokerServiceException e) {
            this.consumerSet.removeAll((ObjectSet<Consumer>) consumer);
            this.consumerList.remove(consumer);
            throw e;
        }
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        super.removeConsumer(consumer);
        this.selector.removeConsumer(consumer);
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Key_Shared;
    }

    @Override // org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcher
    public void sendMessages(List<Entry> list) {
        if (list.isEmpty()) {
            return;
        }
        if (this.consumerSet.isEmpty()) {
            list.forEach((v0) -> {
                v0.release();
            });
            return;
        }
        Map<Consumer, List<Entry>> map = localGroupedEntries.get();
        map.clear();
        for (Entry entry : list) {
            map.computeIfAbsent(this.selector.select(peekStickyKey(entry.getDataBuffer())), consumer -> {
                return new ArrayList();
            }).add(entry);
        }
        for (Map.Entry<Consumer, List<Entry>> entry2 : map.entrySet()) {
            Consumer key = entry2.getKey();
            List<Entry> value = entry2.getValue();
            SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
            EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(value.size());
            filterEntriesForConsumer(value, entryBatchSizes, threadLocal, null, null, false);
            key.sendMessages(value, entryBatchSizes, null, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker());
            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -threadLocal.getTotalMessages());
        }
    }

    public KeySharedMode getKeySharedMode() {
        return this.keySharedMode;
    }
}
