package org.apache.bookkeeper.mledger.impl;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.Errors;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/managed-ledger-2.10.2.4.jar:org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.class */
public class ManagedLedgerOfflineBacklog {
    private final byte[] password;
    private final BookKeeper.DigestType digestType;
    private static final int META_READ_TIMEOUT_SECONDS = 60;
    private final boolean accurate;
    private final String brokerName;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ManagedLedgerOfflineBacklog.class);

    public ManagedLedgerOfflineBacklog(DigestType digestType, byte[] bArr, String str, boolean z) {
        this.digestType = BookKeeper.DigestType.fromApiDigestType(digestType);
        this.password = bArr;
        this.accurate = z;
        this.brokerName = str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getNumberOfEntries(Range<PositionImpl> range, NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> navigableMap) {
        PositionImpl lowerEndpoint = range.lowerEndpoint();
        boolean z = range.lowerBoundType() == BoundType.CLOSED;
        PositionImpl upperEndpoint = range.upperEndpoint();
        boolean z2 = range.upperBoundType() == BoundType.CLOSED;
        if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) {
            return ((upperEndpoint.getEntryId() - lowerEndpoint.getEntryId()) - 1) + (z ? 1L : 0L) + (z2 ? 1L : 0L);
        }
        long entryId = 0 + upperEndpoint.getEntryId() + (z2 ? 1L : 0L);
        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo) navigableMap.get(Long.valueOf(lowerEndpoint.getLedgerId()));
        if (ledgerInfo != null) {
            entryId = entryId + (ledgerInfo.getEntries() - (lowerEndpoint.getEntryId() + 1)) + (z ? 1L : 0L);
        }
        Iterator<MLDataFormats.ManagedLedgerInfo.LedgerInfo> it = navigableMap.subMap(Long.valueOf(lowerEndpoint.getLedgerId()), false, Long.valueOf(upperEndpoint.getLedgerId()), false).values().iterator();
        while (it.hasNext()) {
            entryId += it.next().getEntries();
        }
        return entryId;
    }

    public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl managedLedgerFactoryImpl, String str) throws Exception {
        return estimateUnloadedTopicBacklog(managedLedgerFactoryImpl, TopicName.get("persistent://" + str));
    }

    public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl managedLedgerFactoryImpl, TopicName topicName) throws Exception {
        String persistenceNamingEncoding = topicName.getPersistenceNamingEncoding();
        long j = 0;
        long j2 = 0;
        ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap();
        PersistentOfflineTopicStats persistentOfflineTopicStats = new PersistentOfflineTopicStats(persistenceNamingEncoding, this.brokerName);
        readLedgerMeta(managedLedgerFactoryImpl, topicName, concurrentSkipListMap);
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : concurrentSkipListMap.values()) {
            j += ledgerInfo.getEntries();
            j2 += ledgerInfo.getSize();
            if (this.accurate) {
                persistentOfflineTopicStats.addLedgerDetails(ledgerInfo.getEntries(), ledgerInfo.getTimestamp(), ledgerInfo.getSize(), ledgerInfo.getLedgerId());
            }
        }
        persistentOfflineTopicStats.totalMessages = j;
        persistentOfflineTopicStats.storageSize = j2;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Total number of entries - {} and size - {}", persistenceNamingEncoding, Long.valueOf(j), Long.valueOf(j2));
        }
        calculateCursorBacklogs(managedLedgerFactoryImpl, topicName, concurrentSkipListMap, persistentOfflineTopicStats);
        persistentOfflineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());
        return persistentOfflineTopicStats;
    }

    private void readLedgerMeta(ManagedLedgerFactoryImpl managedLedgerFactoryImpl, TopicName topicName, final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> navigableMap) throws Exception {
        final String persistenceNamingEncoding = topicName.getPersistenceNamingEncoding();
        MetaStore metaStore = managedLedgerFactoryImpl.getMetaStore();
        final BookKeeper bookKeeper = managedLedgerFactoryImpl.getBookKeeper();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        metaStore.getManagedLedgerInfo(persistenceNamingEncoding, false, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() { // from class: org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog.1
            @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
            public void operationComplete(MLDataFormats.ManagedLedgerInfo managedLedgerInfo, Stat stat) {
                for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : managedLedgerInfo.getLedgerInfoList()) {
                    navigableMap.put(Long.valueOf(ledgerInfo.getLedgerId()), ledgerInfo);
                }
                if (navigableMap.isEmpty()) {
                    ManagedLedgerOfflineBacklog.log.warn("[{}] Ledger list empty", persistenceNamingEncoding);
                    countDownLatch.countDown();
                    return;
                }
                long longValue = ((Long) navigableMap.lastKey()).longValue();
                String str = persistenceNamingEncoding;
                NavigableMap navigableMap2 = navigableMap;
                CountDownLatch countDownLatch2 = countDownLatch;
                AsyncCallback.OpenCallback openCallback = (i, ledgerHandle, obj) -> {
                    if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                        ManagedLedgerOfflineBacklog.log.debug("[{}] Opened ledger {}: {}", str, Long.valueOf(longValue), BKException.getMessage(i));
                    }
                    if (i == 0) {
                        navigableMap2.put(Long.valueOf(longValue), MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(longValue).setEntries(ledgerHandle.getLastAddConfirmed() + 1).setSize(ledgerHandle.getLength()).setTimestamp(System.currentTimeMillis()).build());
                        countDownLatch2.countDown();
                    } else if (!Errors.isNoSuchLedgerExistsException(i)) {
                        ManagedLedgerOfflineBacklog.log.error("[{}] Failed to open ledger {}: {}", str, Long.valueOf(longValue), BKException.getMessage(i));
                        countDownLatch2.countDown();
                    } else {
                        ManagedLedgerOfflineBacklog.log.warn("[{}] Ledger not found: {}", str, navigableMap2.lastKey());
                        navigableMap2.remove(navigableMap2.lastKey());
                        countDownLatch2.countDown();
                    }
                };
                if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                    ManagedLedgerOfflineBacklog.log.debug("[{}] Opening ledger {}", persistenceNamingEncoding, Long.valueOf(longValue));
                }
                try {
                    bookKeeper.asyncOpenLedgerNoRecovery(longValue, ManagedLedgerOfflineBacklog.this.digestType, ManagedLedgerOfflineBacklog.this.password, openCallback, null);
                } catch (Exception e) {
                    ManagedLedgerOfflineBacklog.log.warn("[{}] Failed to open ledger {}: {}", persistenceNamingEncoding, Long.valueOf(longValue), e);
                    countDownLatch.countDown();
                }
            }

            @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
            public void operationFailed(ManagedLedgerException.MetaStoreException metaStoreException) {
                ManagedLedgerOfflineBacklog.log.warn("[{}] Unable to obtain managed ledger metadata - {}", persistenceNamingEncoding, metaStoreException);
                countDownLatch.countDown();
            }
        });
        if (this.accurate) {
            countDownLatch.await();
        } else {
            countDownLatch.await(60L, TimeUnit.SECONDS);
        }
    }

    private void calculateCursorBacklogs(ManagedLedgerFactoryImpl managedLedgerFactoryImpl, TopicName topicName, final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> navigableMap, final PersistentOfflineTopicStats persistentOfflineTopicStats) throws Exception {
        if (navigableMap.isEmpty()) {
            return;
        }
        final String persistenceNamingEncoding = topicName.getPersistenceNamingEncoding();
        final MetaStore metaStore = managedLedgerFactoryImpl.getMetaStore();
        final BookKeeper bookKeeper = managedLedgerFactoryImpl.getBookKeeper();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ConcurrentOpenHashMap build = ConcurrentOpenHashMap.newBuilder().build();
        MLDataFormats.ManagedLedgerInfo.LedgerInfo value = navigableMap.lastEntry().getValue();
        final PositionImpl positionImpl = new PositionImpl(value.getLedgerId(), value.getEntries() - 1);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Last ledger position {}", persistenceNamingEncoding, positionImpl);
        }
        metaStore.getCursors(persistenceNamingEncoding, new MetaStore.MetaStoreCallback<List<String>>() { // from class: org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog.2
            @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
            public void operationComplete(List<String> list, Stat stat) {
                if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                    ManagedLedgerOfflineBacklog.log.debug("[{}] Found {} cursors", persistenceNamingEncoding, Integer.valueOf(list.size()));
                }
                if (list.isEmpty()) {
                    countDownLatch.countDown();
                    return;
                }
                final CountDownLatch countDownLatch2 = new CountDownLatch(list.size());
                for (final String str : list) {
                    try {
                        if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                            ManagedLedgerOfflineBacklog.log.debug("[{}] Loading cursor {}", persistenceNamingEncoding, str);
                        }
                        String str2 = persistenceNamingEncoding;
                        ConcurrentOpenHashMap concurrentOpenHashMap = build;
                        PersistentOfflineTopicStats persistentOfflineTopicStats2 = persistentOfflineTopicStats;
                        PositionImpl positionImpl2 = positionImpl;
                        NavigableMap navigableMap2 = navigableMap;
                        final AsyncCallback.OpenCallback openCallback = (i, ledgerHandle, obj) -> {
                            final long id = ledgerHandle.getId();
                            if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                ManagedLedgerOfflineBacklog.log.debug("[{}] Opened cursor ledger {} for cursor {}. rc={}", str2, Long.valueOf(id), str, Integer.valueOf(i));
                            }
                            if (i != 0) {
                                ManagedLedgerOfflineBacklog.log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", str2, Long.valueOf(id), str, BKException.getMessage(i));
                                countDownLatch2.countDown();
                                return;
                            }
                            final long lastAddConfirmed = ledgerHandle.getLastAddConfirmed();
                            if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                ManagedLedgerOfflineBacklog.log.debug("[{}] Cursor {} LAC {} read from ledger {}", str2, str, Long.valueOf(lastAddConfirmed), Long.valueOf(id));
                            }
                            if (lastAddConfirmed != -1) {
                                ledgerHandle.asyncReadEntries(lastAddConfirmed, lastAddConfirmed, new AsyncCallback.ReadCallback() { // from class: org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog.2.1
                                    @Override // org.apache.bookkeeper.client.AsyncCallback.ReadCallback
                                    public void readComplete(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
                                        try {
                                            if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                                ManagedLedgerOfflineBacklog.log.debug("readComplete rc={} entryId={}", Integer.valueOf(i), Long.valueOf(lastAddConfirmed));
                                            }
                                            if (i != 0) {
                                                ManagedLedgerOfflineBacklog.log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", str2, Long.valueOf(id), str, BKException.getMessage(i));
                                                persistentOfflineTopicStats2.addCursorDetails(str, -1L, ledgerHandle.getId());
                                            } else {
                                                try {
                                                    PositionImpl positionImpl3 = new PositionImpl(MLDataFormats.PositionInfo.parseFrom(enumeration.nextElement().getEntry()));
                                                    if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                                        ManagedLedgerOfflineBacklog.log.debug("[{}] Cursor {} MD {} read last ledger position {}", str2, str, positionImpl3, positionImpl2);
                                                    }
                                                    Range openClosed = Range.openClosed(positionImpl3, positionImpl2);
                                                    if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                                        ManagedLedgerOfflineBacklog.log.debug("[{}] Calculating backlog for cursor {} using range {}", str2, str, openClosed);
                                                    }
                                                    long numberOfEntries = ManagedLedgerOfflineBacklog.this.getNumberOfEntries(openClosed, navigableMap2);
                                                    persistentOfflineTopicStats2.messageBacklog += numberOfEntries;
                                                    persistentOfflineTopicStats2.addCursorDetails(str, numberOfEntries, ledgerHandle.getId());
                                                } catch (InvalidProtocolBufferException e) {
                                                    ManagedLedgerOfflineBacklog.log.warn("[{}] Error reading position from metadata ledger {} for cursor {}: {}", str2, Long.valueOf(id), str, e);
                                                    persistentOfflineTopicStats2.addCursorDetails(str, -1L, ledgerHandle.getId());
                                                    countDownLatch2.countDown();
                                                    return;
                                                }
                                            }
                                            countDownLatch2.countDown();
                                        } catch (Throwable th) {
                                            countDownLatch2.countDown();
                                            throw th;
                                        }
                                    }
                                }, null);
                                return;
                            }
                            concurrentOpenHashMap.put(str, Long.valueOf(id));
                            ManagedLedgerOfflineBacklog.log.info("[{}] Cursor {} LAC {} read from ledger {}", str2, str, Long.valueOf(lastAddConfirmed), Long.valueOf(id));
                            countDownLatch2.countDown();
                        };
                        metaStore.asyncGetCursorInfo(persistenceNamingEncoding, str, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() { // from class: org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog.2.2
                            @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
                            public void operationComplete(MLDataFormats.ManagedCursorInfo managedCursorInfo, Stat stat2) {
                                long cursorsLedgerId = managedCursorInfo.getCursorsLedgerId();
                                if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                    ManagedLedgerOfflineBacklog.log.debug("[{}] Cursor {} meta-data read ledger id {}", persistenceNamingEncoding, str, Long.valueOf(cursorsLedgerId));
                                }
                                if (cursorsLedgerId != -1) {
                                    bookKeeper.asyncOpenLedgerNoRecovery(cursorsLedgerId, ManagedLedgerOfflineBacklog.this.digestType, ManagedLedgerOfflineBacklog.this.password, openCallback, null);
                                    return;
                                }
                                Range openClosed = Range.openClosed(new PositionImpl(managedCursorInfo.getMarkDeleteLedgerId(), managedCursorInfo.getMarkDeleteEntryId()), positionImpl);
                                if (ManagedLedgerOfflineBacklog.log.isDebugEnabled()) {
                                    ManagedLedgerOfflineBacklog.log.debug("[{}] Calculating backlog for cursor {} using range {}", persistenceNamingEncoding, str, openClosed);
                                }
                                long numberOfEntries = ManagedLedgerOfflineBacklog.this.getNumberOfEntries(openClosed, navigableMap);
                                persistentOfflineTopicStats.messageBacklog += numberOfEntries;
                                persistentOfflineTopicStats.addCursorDetails(str, numberOfEntries, cursorsLedgerId);
                                countDownLatch2.countDown();
                            }

                            @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
                            public void operationFailed(ManagedLedgerException.MetaStoreException metaStoreException) {
                                ManagedLedgerOfflineBacklog.log.warn("[{}] Unable to obtain cursor ledger for cursor {}: {}", persistenceNamingEncoding, str, metaStoreException);
                                countDownLatch2.countDown();
                            }
                        });
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
                try {
                    if (ManagedLedgerOfflineBacklog.this.accurate) {
                        countDownLatch2.await();
                    } else {
                        countDownLatch2.await(60L, TimeUnit.SECONDS);
                    }
                    countDownLatch.countDown();
                } catch (Exception e) {
                    ManagedLedgerOfflineBacklog.log.warn("[{}] Error reading subscription positions{}", persistenceNamingEncoding, e);
                    countDownLatch.countDown();
                }
            }

            @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
            public void operationFailed(ManagedLedgerException.MetaStoreException metaStoreException) {
                ManagedLedgerOfflineBacklog.log.warn("[{}] Failed to get the cursors list", persistenceNamingEncoding, metaStoreException);
                countDownLatch.countDown();
            }
        });
        if (this.accurate) {
            countDownLatch.await();
        } else {
            countDownLatch.await(60L, TimeUnit.SECONDS);
        }
        if (!this.accurate || build.size() <= 0) {
            return;
        }
        build.forEach((str, l) -> {
            if (log.isDebugEnabled()) {
                log.debug("Cursor {} Ledger {} Trying to obtain MD from BkAdmin", str, l);
            }
            PositionImpl tryGetMDPosition = tryGetMDPosition(bookKeeper, l.longValue(), str);
            if (tryGetMDPosition == null) {
                log.warn("[{}] Cursor {} read from ledger {}. Unable to determine cursor position", persistenceNamingEncoding, str, l);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cursor {} read from ledger using bk admin {}. position {}", persistenceNamingEncoding, str, l, tryGetMDPosition);
            }
            Range<PositionImpl> openClosed = Range.openClosed(tryGetMDPosition, positionImpl);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Calculating backlog for cursor {} using range {}", persistenceNamingEncoding, str, openClosed);
            }
            long numberOfEntries = getNumberOfEntries(openClosed, navigableMap);
            persistentOfflineTopicStats.messageBacklog += numberOfEntries;
            persistentOfflineTopicStats.addCursorDetails(str, numberOfEntries, l.longValue());
        });
    }

    private PositionImpl tryGetMDPosition(BookKeeper bookKeeper, long j, String str) {
        BookKeeperAdmin bookKeeperAdmin = null;
        PositionImpl positionImpl = null;
        try {
            try {
                bookKeeperAdmin = new BookKeeperAdmin(bookKeeper);
                for (LedgerEntry ledgerEntry : bookKeeperAdmin.readEntries(j, 0L, -1L)) {
                    long entryId = ledgerEntry.getEntryId();
                    if (log.isDebugEnabled()) {
                        log.debug(" Read entry {} from ledger {} for cursor {}", Long.valueOf(entryId), Long.valueOf(j), str);
                    }
                    positionImpl = new PositionImpl(MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry()));
                    if (log.isDebugEnabled()) {
                        log.debug("Cursor {} read position {}", str, positionImpl);
                    }
                }
                if (bookKeeperAdmin != null) {
                    try {
                        bookKeeperAdmin.close();
                    } catch (Exception e) {
                        log.warn("Unable to close bk admin for ledgerId {} for cursor {}", Long.valueOf(j), str, e);
                    }
                }
            } catch (Throwable th) {
                if (bookKeeperAdmin != null) {
                    try {
                        bookKeeperAdmin.close();
                    } catch (Exception e2) {
                        log.warn("Unable to close bk admin for ledgerId {} for cursor {}", Long.valueOf(j), str, e2);
                    }
                }
                throw th;
            }
        } catch (Exception e3) {
            log.warn("Unable to determine LAC for ledgerId {} for cursor {}: {}", Long.valueOf(j), str, e3);
            if (bookKeeperAdmin != null) {
                try {
                    bookKeeperAdmin.close();
                } catch (Exception e4) {
                    log.warn("Unable to close bk admin for ledgerId {} for cursor {}", Long.valueOf(j), str, e4);
                }
            }
        }
        return positionImpl;
    }
}
