package org.apache.pulsar.broker.delayed.bucket;

import com.google.common.base.Preconditions;
import com.google.protobuf.UnsafeByteOperations;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/broker/delayed/bucket/MutableBucket.class */
public class MutableBucket extends Bucket implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(MutableBucket.class);
    private final TripleLongPriorityQueue priorityQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MutableBucket(String str, ManagedCursor managedCursor, FutureUtil.Sequencer<Void> sequencer, BucketSnapshotStorage bucketSnapshotStorage) {
        super(str, managedCursor, sequencer, bucketSnapshotStorage, -1L, -1L);
        this.priorityQueue = new TripleLongPriorityQueue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(long j, int i, TripleLongPriorityQueue tripleLongPriorityQueue) {
        return createImmutableBucketAndAsyncPersistent(j, i, tripleLongPriorityQueue, TripleLongPriorityDelayedIndexQueue.wrap(this.priorityQueue), this.startLedgerId, this.endLedgerId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(long j, int i, TripleLongPriorityQueue tripleLongPriorityQueue, DelayedIndexQueue delayedIndexQueue, long j2, long j3) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", new Object[]{this.dispatcherName, Long.valueOf(j2), Long.valueOf(j3)});
        }
        if (delayedIndexQueue.isEmpty()) {
            return null;
        }
        long j4 = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        SnapshotSegment snapshotSegment = new SnapshotSegment();
        SnapshotSegmentMetadata.Builder newBuilder = SnapshotSegmentMetadata.newBuilder();
        ArrayList arrayList3 = new ArrayList();
        long j5 = 0;
        long j6 = 0;
        while (!delayedIndexQueue.isEmpty()) {
            long peekTimestamp = delayedIndexQueue.peekTimestamp();
            if (j5 == 0) {
                j6 = peekTimestamp;
                arrayList3.add(Long.valueOf(j6));
                j5 = (peekTimestamp + j) - 1;
            }
            DelayedIndex addIndexe = snapshotSegment.addIndexe();
            delayedIndexQueue.popToObject(addIndexe);
            long ledgerId = addIndexe.getLedgerId();
            long entryId = addIndexe.getEntryId();
            removeIndexBit(ledgerId, entryId);
            Preconditions.checkArgument(ledgerId >= j2 && ledgerId <= j3);
            if (arrayList2.size() == 0) {
                tripleLongPriorityQueue.add(peekTimestamp, ledgerId, entryId);
            }
            ((RoaringBitmap) hashMap2.computeIfAbsent(Long.valueOf(ledgerId), l -> {
                return new RoaringBitmap();
            })).add(entryId, entryId + 1);
            j4++;
            if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > j5 || (i != -1 && snapshotSegment.getIndexesCount() >= i)) {
                newBuilder.setMaxScheduleTimestamp(peekTimestamp);
                newBuilder.setMinScheduleTimestamp(j6);
                j5 = 0;
                Iterator it = hashMap2.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    Long l2 = (Long) entry.getKey();
                    RoaringBitmap roaringBitmap = (RoaringBitmap) entry.getValue();
                    roaringBitmap.runOptimize();
                    ByteBuffer allocate = ByteBuffer.allocate(roaringBitmap.serializedSizeInBytes());
                    roaringBitmap.serialize(allocate);
                    allocate.flip();
                    newBuilder.putDelayedIndexBitMap(l2.longValue(), UnsafeByteOperations.unsafeWrap(allocate));
                    hashMap.compute(l2, (l3, roaringBitmap2) -> {
                        if (roaringBitmap2 == null) {
                            return roaringBitmap;
                        }
                        roaringBitmap2.or(roaringBitmap);
                        return roaringBitmap2;
                    });
                    it.remove();
                }
                arrayList2.add(newBuilder.m144build());
                newBuilder.m145clear();
                arrayList.add(snapshotSegment);
                snapshotSegment = new SnapshotSegment();
            }
        }
        hashMap.values().forEach((v0) -> {
            v0.runOptimize();
        });
        this.delayedIndexBitMap.values().forEach((v0) -> {
            v0.runOptimize();
        });
        SnapshotMetadata m96build = SnapshotMetadata.newBuilder().addAllMetadataList(arrayList2).m96build();
        int size = arrayList2.size();
        ImmutableBucket immutableBucket = new ImmutableBucket(this.dispatcherName, this.cursor, this.sequencer, this.bucketSnapshotStorage, j2, j3);
        immutableBucket.setCurrentSegmentEntryId(1);
        immutableBucket.setNumberBucketDelayedMessages(j4);
        immutableBucket.setLastSegmentEntryId(size);
        immutableBucket.setFirstScheduleTimestamps(arrayList3);
        immutableBucket.setDelayedIndexBitMap(hashMap);
        immutableBucket.setSnapshotSegments(arrayList.subList(1, arrayList.size()));
        Preconditions.checkArgument(!arrayList.isEmpty());
        SnapshotSegment snapshotSegment2 = arrayList.get(0);
        Pair<ImmutableBucket, DelayedIndex> of = Pair.of(immutableBucket, snapshotSegment2.getIndexeAt(snapshotSegment2.getIndexesCount() - 1));
        immutableBucket.setSnapshotCreateFuture(asyncSaveBucketSnapshot(immutableBucket, m96build, arrayList));
        return of;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void moveScheduledMessageToSharedQueue(long j, TripleLongPriorityQueue tripleLongPriorityQueue) {
        while (!this.priorityQueue.isEmpty()) {
            long peekN1 = this.priorityQueue.peekN1();
            if (peekN1 > j) {
                return;
            }
            tripleLongPriorityQueue.add(peekN1, this.priorityQueue.peekN2(), this.priorityQueue.peekN3());
            this.priorityQueue.pop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resetLastMutableBucketRange() {
        this.startLedgerId = -1L;
        this.endLedgerId = -1L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        resetLastMutableBucketRange();
        this.delayedIndexBitMap.clear();
        this.priorityQueue.clear();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.priorityQueue.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getBufferMemoryUsage() {
        return this.priorityQueue.bytesCapacity();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isEmpty() {
        return this.priorityQueue.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long nextDeliveryTime() {
        return this.priorityQueue.peekN1();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long size() {
        return this.priorityQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addMessage(long j, long j2, long j3) {
        this.priorityQueue.add(j3, j, j2);
        if (this.startLedgerId == -1) {
            this.startLedgerId = j;
        }
        this.endLedgerId = j;
        putIndexBit(j, j2);
    }
}
