package org.apache.pulsar.broker.service.persistent;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.class */
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private boolean isDispatcherStuckOnReplays;
    private final KeySharedMode keySharedMode;
    private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
    private final Set<Consumer> stuckConsumers;
    private final Set<Consumer> nextStuckConsumers;
    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public Map<Consumer, List<Entry>> m154initialValue() throws Exception {
            return new HashMap();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode = new int[KeySharedMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[KeySharedMode.AUTO_SPLIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[KeySharedMode.STICKY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription, ServiceConfiguration serviceConfiguration, KeySharedMeta keySharedMeta) {
        super(persistentTopic, managedCursor, subscription, keySharedMeta.isAllowOutOfOrderDelivery());
        this.isDispatcherStuckOnReplays = false;
        this.allowOutOfOrderDelivery = keySharedMeta.isAllowOutOfOrderDelivery();
        this.recentlyJoinedConsumers = this.allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
        this.stuckConsumers = new HashSet();
        this.nextStuckConsumers = new HashSet();
        this.keySharedMode = keySharedMeta.getKeySharedMode();
        switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$common$api$proto$KeySharedMode[this.keySharedMode.ordinal()]) {
            case 1:
                if (serviceConfiguration.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(serviceConfiguration.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    return;
                } else {
                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                    return;
                }
            case 2:
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                return;
            default:
                throw new IllegalArgumentException("Invalid key-shared mode: " + keySharedMeta.getKeySharedMode());
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        try {
            this.selector.addConsumer(consumer);
            PositionImpl positionImpl = (PositionImpl) this.cursor.getReadPosition();
            consumer.setReadPositionWhenJoining(positionImpl);
            if (this.allowOutOfOrderDelivery || this.recentlyJoinedConsumers == null || this.consumerList.size() <= 1 || this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage() <= 1) {
                return;
            }
            this.recentlyJoinedConsumers.put(consumer, positionImpl);
        } catch (BrokerServiceException e) {
            this.consumerSet.removeAll(consumer);
            this.consumerList.remove(consumer);
            throw e;
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        this.selector.removeConsumer(consumer);
        super.removeConsumer(consumer);
        if (this.recentlyJoinedConsumers != null) {
            this.recentlyJoinedConsumers.remove(consumer);
            if (this.consumerList.size() == 1) {
                this.recentlyJoinedConsumers.clear();
            }
            if (removeConsumersFromRecentJoinedConsumers() || !this.redeliveryMessages.isEmpty()) {
                readMoreEntries();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    public void sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> list) {
        long j = 0;
        long j2 = 0;
        if (list.size() == 0) {
            readMoreEntries();
            return;
        }
        if (this.consumerSet.isEmpty()) {
            list.forEach((v0) -> {
                v0.release();
            });
            this.cursor.rewind();
            return;
        }
        this.nextStuckConsumers.clear();
        Map map = (Map) localGroupedEntries.get();
        map.clear();
        HashMap hashMap = new HashMap();
        for (Entry entry : list) {
            int stickyKeyHash = getStickyKeyHash(entry);
            Consumer select = this.selector.select(stickyKeyHash);
            if (select != null) {
                ((List) map.computeIfAbsent(select, consumer -> {
                    return new ArrayList();
                })).add(entry);
                ((Set) hashMap.computeIfAbsent(select, consumer2 -> {
                    return new HashSet();
                })).add(Integer.valueOf(stickyKeyHash));
            } else {
                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                entry.release();
            }
        }
        AtomicInteger atomicInteger = new AtomicInteger(map.size());
        int size = map.size();
        if (size == 0) {
            size = -1;
        }
        for (Map.Entry entry2 : map.entrySet()) {
            Consumer consumer3 = (Consumer) entry2.getKey();
            List<Entry> list2 = (List) entry2.getValue();
            int size2 = list2.size();
            int restrictedMaxEntriesForConsumer = getRestrictedMaxEntriesForConsumer(consumer3, list2, Math.min(size2, consumer3 == null ? 0 : Math.max(consumer3.getAvailablePermits(), 0)), readType, (Set) hashMap.get(consumer3));
            if (log.isDebugEnabled()) {
                Logger logger = log;
                Object[] objArr = new Object[4];
                objArr[0] = this.name;
                objArr[1] = consumer3 == null ? "null" : consumer3.consumerName();
                objArr[2] = Integer.valueOf(restrictedMaxEntriesForConsumer);
                objArr[3] = readType;
                logger.debug("[{}] select consumer {} with messages num {}, read type is {}", objArr);
            }
            if (restrictedMaxEntriesForConsumer < size2) {
                for (int i = restrictedMaxEntriesForConsumer; i < size2; i++) {
                    Entry entry3 = list2.get(i);
                    addMessageToReplay(entry3.getLedgerId(), entry3.getEntryId(), getStickyKeyHash(entry3));
                    entry3.release();
                    list2.set(i, null);
                }
            }
            if (restrictedMaxEntriesForConsumer > 0) {
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    for (int i2 = 0; i2 < restrictedMaxEntriesForConsumer; i2++) {
                        Entry entry4 = list2.get(i2);
                        this.redeliveryMessages.remove(entry4.getLedgerId(), entry4.getEntryId());
                    }
                }
                SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
                EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(restrictedMaxEntriesForConsumer);
                EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(restrictedMaxEntriesForConsumer);
                filterEntriesForConsumer(list2, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, readType == PersistentDispatcherMultipleConsumers.ReadType.Replay);
                consumer3.sendMessages(list2, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker()).addListener(future -> {
                    if (future.isDone() && atomicInteger.decrementAndGet() == 0) {
                        readMoreEntries();
                    }
                });
                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(threadLocal.getTotalMessages() - entryBatchIndexesAcks.getTotalAckedIndexCount()));
                j += threadLocal.getTotalMessages();
                j2 += threadLocal.getTotalBytes();
            } else {
                size = atomicInteger.decrementAndGet();
            }
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(j, j2);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                this.dispatchRateLimiter.get().tryDispatchPermit(j, j2);
            }
        }
        this.stuckConsumers.clear();
        if (j == 0 && (this.recentlyJoinedConsumers == null || this.recentlyJoinedConsumers.isEmpty())) {
            if (!this.nextStuckConsumers.isEmpty()) {
                this.isDispatcherStuckOnReplays = true;
                this.stuckConsumers.addAll(this.nextStuckConsumers);
            }
            readMoreEntries();
        } else {
            if (size == 0) {
                this.topic.getBrokerService().executor().schedule(() -> {
                    synchronized (this) {
                        readMoreEntries();
                    }
                }, 100L, TimeUnit.MILLISECONDS);
            }
        }
    }

    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> list, int i, PersistentDispatcherMultipleConsumers.ReadType readType, Set<Integer> set) {
        PositionImpl next;
        if (i == 0) {
            this.nextStuckConsumers.add(consumer);
            return 0;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal && set != null && this.redeliveryMessages.containsStickyKeyHashes(set)) {
            return 0;
        }
        if (this.recentlyJoinedConsumers == null) {
            return i;
        }
        removeConsumersFromRecentJoinedConsumers();
        PositionImpl positionImpl = this.recentlyJoinedConsumers.get(consumer);
        if (positionImpl == null) {
            if (!this.stuckConsumers.contains(consumer)) {
                return i;
            }
            if (!log.isDebugEnabled()) {
                return 0;
            }
            log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", this.name, consumer);
            return 0;
        }
        if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay && (next = this.recentlyJoinedConsumers.values().iterator().next()) != null && next.compareTo(positionImpl) < 0) {
            positionImpl = next;
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (list.get(i2).getPosition().compareTo(positionImpl) >= 0) {
                return i2;
            }
        }
        return i;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void markDeletePositionMoveForward() {
        if (this.recentlyJoinedConsumers == null || this.recentlyJoinedConsumers.isEmpty() || !removeConsumersFromRecentJoinedConsumers()) {
            return;
        }
        readMoreEntries();
    }

    private boolean removeConsumersFromRecentJoinedConsumers() {
        Iterator<Map.Entry<Consumer, PositionImpl>> it = this.recentlyJoinedConsumers.entrySet().iterator();
        boolean z = false;
        PositionImpl markDeletedPosition = this.cursor.getMarkDeletedPosition();
        if (markDeletedPosition != null) {
            PositionImpl nextValidPosition = this.cursor.getManagedLedger().getNextValidPosition(markDeletedPosition);
            while (it.hasNext() && it.next().getValue().compareTo(nextValidPosition) <= 0) {
                it.remove();
                z = true;
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    public synchronized Set<PositionImpl> getMessagesToReplayNow(int i) {
        if (!this.isDispatcherStuckOnReplays) {
            return super.getMessagesToReplayNow(i);
        }
        this.isDispatcherStuckOnReplays = false;
        return Collections.emptySet();
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public CommandSubscribe.SubType getType() {
        return CommandSubscribe.SubType.Key_Shared;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    public Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }

    public KeySharedMode getKeySharedMode() {
        return this.keySharedMode;
    }

    public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
        return this.recentlyJoinedConsumers;
    }

    public Map<String, List<String>> getConsumerKeyHashRanges() {
        return this.selector.getConsumerKeyHashRanges();
    }
}
