package org.apache.distributedlog.auditor;

import dlshade.com.google.common.base.Objects;
import dlshade.com.google.common.base.Preconditions;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.client.AsyncCallback;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.client.BookKeeper;
import dlshade.org.apache.bookkeeper.client.BookKeeperAccessor;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.meta.LedgerManager;
import dlshade.org.apache.bookkeeper.net.NodeBase;
import dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import dlshade.org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import dlshade.org.apache.commons.lang3.tuple.Pair;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.DLUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/auditor/DLAuditor.class */
public class DLAuditor {
    private static final Logger logger;
    private final DistributedLogConfiguration conf;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.distributedlog.auditor.DLAuditor$7, reason: invalid class name */
    /* loaded from: input_file:org/apache/distributedlog/auditor/DLAuditor$7.class */
    public class AnonymousClass7 implements BookkeeperInternalCallbacks.Processor<Long> {
        final /* synthetic */ AtomicLong val$numLedgers;
        final /* synthetic */ ExecutorService val$executorService;
        final /* synthetic */ BookKeeper val$bk;
        final /* synthetic */ AtomicLong val$totalBytes;
        final /* synthetic */ AtomicLong val$totalEntries;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.distributedlog.auditor.DLAuditor$7$1, reason: invalid class name */
        /* loaded from: input_file:org/apache/distributedlog/auditor/DLAuditor$7$1.class */
        public class AnonymousClass1 implements Runnable {
            final /* synthetic */ Long val$lid;
            final /* synthetic */ AsyncCallback.VoidCallback val$cb;

            AnonymousClass1(Long l, AsyncCallback.VoidCallback voidCallback) {
                this.val$lid = l;
                this.val$cb = voidCallback;
            }

            @Override // java.lang.Runnable
            public void run() {
                AnonymousClass7.this.val$bk.asyncOpenLedgerNoRecovery(this.val$lid.longValue(), BookKeeper.DigestType.CRC32, DLAuditor.this.conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8), new AsyncCallback.OpenCallback() { // from class: org.apache.distributedlog.auditor.DLAuditor.7.1.1
                    @Override // dlshade.org.apache.bookkeeper.client.AsyncCallback.OpenCallback
                    public void openComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                        int i2;
                        if (0 == i) {
                            AnonymousClass7.this.val$totalBytes.addAndGet(ledgerHandle.getLength());
                            AnonymousClass7.this.val$totalEntries.addAndGet(ledgerHandle.getLastAddConfirmed() + 1);
                            i2 = i;
                        } else {
                            i2 = -9;
                        }
                        final int i3 = i2;
                        AnonymousClass7.this.val$executorService.submit(new Runnable() { // from class: org.apache.distributedlog.auditor.DLAuditor.7.1.1.1
                            @Override // java.lang.Runnable
                            public void run() {
                                AnonymousClass1.this.val$cb.processResult(i3, null, null);
                            }
                        });
                    }
                }, null);
            }
        }

        AnonymousClass7(AtomicLong atomicLong, ExecutorService executorService, BookKeeper bookKeeper, AtomicLong atomicLong2, AtomicLong atomicLong3) {
            this.val$numLedgers = atomicLong;
            this.val$executorService = executorService;
            this.val$bk = bookKeeper;
            this.val$totalBytes = atomicLong2;
            this.val$totalEntries = atomicLong3;
        }

        @Override // dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor
        public void process(Long l, AsyncCallback.VoidCallback voidCallback) {
            this.val$numLedgers.incrementAndGet();
            this.val$executorService.submit(new AnonymousClass1(l, voidCallback));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/auditor/DLAuditor$Action.class */
    public interface Action<T> {
        void execute(T t) throws IOException;
    }

    public DLAuditor(DistributedLogConfiguration distributedLogConfiguration) {
        this.conf = distributedLogConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ZooKeeperClient getZooKeeperClient(Namespace namespace) {
        NamespaceDriver namespaceDriver = namespace.getNamespaceDriver();
        if ($assertionsDisabled || (namespaceDriver instanceof BKNamespaceDriver)) {
            return ((BKNamespaceDriver) namespaceDriver).getWriterZKC();
        }
        throw new AssertionError();
    }

    private BookKeeperClient getBookKeeperClient(Namespace namespace) {
        NamespaceDriver namespaceDriver = namespace.getNamespaceDriver();
        if ($assertionsDisabled || (namespaceDriver instanceof BKNamespaceDriver)) {
            return ((BKNamespaceDriver) namespaceDriver).getReaderBKC();
        }
        throw new AssertionError();
    }

    private String validateAndGetZKServers(List<URI> list) {
        String zKServersFromDLUri = BKNamespaceDriver.getZKServersFromDLUri(list.get(0));
        Iterator<URI> it = list.iterator();
        while (it.hasNext()) {
            if (!zKServersFromDLUri.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(it.next()))) {
                throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster");
            }
        }
        return zKServersFromDLUri;
    }

    private BKDLConfig resolveBKDLConfig(ZooKeeperClient zooKeeperClient, List<URI> list) throws IOException {
        BKDLConfig resolveDLConfig = BKDLConfig.resolveDLConfig(zooKeeperClient, list.get(0));
        Iterator<URI> it = list.iterator();
        while (it.hasNext()) {
            BKDLConfig resolveDLConfig2 = BKDLConfig.resolveDLConfig(zooKeeperClient, it.next());
            if (!Objects.equal(resolveDLConfig.getBkLedgersPath(), resolveDLConfig2.getBkLedgersPath()) || !Objects.equal(resolveDLConfig.getBkZkServersForWriter(), resolveDLConfig2.getBkZkServersForWriter())) {
                throw new IllegalArgumentException("Uris don't use same bookkeeper cluster");
            }
        }
        return resolveDLConfig;
    }

    public Pair<Set<Long>, Set<Long>> collectLedgers(List<URI> list, List<List<String>> list2) throws IOException {
        Preconditions.checkArgument(list.size() > 0, "No uri provided to audit");
        ZooKeeperClient build = ZooKeeperClientBuilder.newBuilder().name("DLAuditor-ZK").zkServers(validateAndGetZKServers(list)).sessionTimeoutMs(this.conf.getZKSessionTimeoutMilliseconds()).retryPolicy(new BoundExponentialBackoffRetryPolicy(this.conf.getZKRetryBackoffStartMillis(), this.conf.getZKRetryBackoffMaxMillis(), Integer.MAX_VALUE)).zkAclId(this.conf.getZkAclId()).build();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            BKDLConfig resolveBKDLConfig = resolveBKDLConfig(build, list);
            logger.info("Resolved bookkeeper config : {}", resolveBKDLConfig);
            BookKeeperClient build2 = BookKeeperClientBuilder.newBuilder().name("DLAuditor-BK").dlConfig(this.conf).zkServers(resolveBKDLConfig.getBkZkServersForWriter()).ledgersPath(resolveBKDLConfig.getBkLedgersPath()).build();
            try {
                Pair<Set<Long>, Set<Long>> of = Pair.of(collectLedgersFromBK(build2, newCachedThreadPool), collectLedgersFromDL(list, list2));
                build2.close();
                build.close();
                newCachedThreadPool.shutdown();
                return of;
            } catch (Throwable th) {
                build2.close();
                throw th;
            }
        } catch (Throwable th2) {
            build.close();
            newCachedThreadPool.shutdown();
            throw th2;
        }
    }

    private Set<Long> collectLedgersFromBK(BookKeeperClient bookKeeperClient, final ExecutorService executorService) throws IOException {
        LedgerManager ledgerManager = BookKeeperAccessor.getLedgerManager(bookKeeperClient.get());
        final HashSet hashSet = new HashSet();
        final CompletableFuture createFuture = FutureUtils.createFuture();
        ledgerManager.asyncProcessLedgers(new BookkeeperInternalCallbacks.Processor<Long>() { // from class: org.apache.distributedlog.auditor.DLAuditor.1
            @Override // dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor
            public void process(Long l, final AsyncCallback.VoidCallback voidCallback) {
                synchronized (hashSet) {
                    hashSet.add(l);
                    if (0 == hashSet.size() % 1000) {
                        DLAuditor.logger.info("Collected {} ledgers", Integer.valueOf(hashSet.size()));
                    }
                }
                executorService.submit(new Runnable() { // from class: org.apache.distributedlog.auditor.DLAuditor.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        voidCallback.processResult(0, null, null);
                    }
                });
            }
        }, new AsyncCallback.VoidCallback() { // from class: org.apache.distributedlog.auditor.DLAuditor.2
            @Override // dlshade.org.apache.zookeeper.AsyncCallback.VoidCallback
            public void processResult(int i, String str, Object obj) {
                if (0 == i) {
                    createFuture.complete(null);
                } else {
                    createFuture.completeExceptionally(BKException.create(i));
                }
            }
        }, null, 0, -9);
        try {
            createFuture.get();
            logger.info("Collected total {} ledgers", Integer.valueOf(hashSet.size()));
            return hashSet;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted on collecting ledgers : ", e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof IOException) {
                throw ((IOException) e2.getCause());
            }
            throw new IOException("Failed to collect ledgers : ", e2.getCause());
        }
    }

    /* JADX WARN: Finally extract failed */
    private Set<Long> collectLedgersFromDL(List<URI> list, List<List<String>> list2) throws IOException {
        final TreeSet treeSet = new TreeSet();
        ArrayList<Namespace> arrayList = new ArrayList(list.size());
        try {
            Iterator<URI> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(NamespaceBuilder.newBuilder().conf(this.conf).uri(it.next()).build());
            }
            final CountDownLatch countDownLatch = new CountDownLatch(list.size());
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(list.size());
            try {
                int i = 0;
                for (final Namespace namespace : arrayList) {
                    final URI uri = list.get(i);
                    final List<String> list3 = list2.get(i);
                    i++;
                    newFixedThreadPool.submit(new Runnable() { // from class: org.apache.distributedlog.auditor.DLAuditor.3
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                DLAuditor.logger.info("Collecting ledgers from {} : {}", uri, list3);
                                DLAuditor.this.collectLedgersFromAllocator(uri, namespace, list3, treeSet);
                                synchronized (treeSet) {
                                    DLAuditor.logger.info("Collected {} ledgers from allocators for {} : {} ", new Object[]{Integer.valueOf(treeSet.size()), uri, treeSet});
                                }
                                DLAuditor.this.collectLedgersFromDL(uri, namespace, treeSet);
                            } catch (IOException e) {
                                atomicInteger.incrementAndGet();
                                DLAuditor.logger.info("Error to collect ledgers from DL : ", e);
                            }
                            countDownLatch.countDown();
                        }
                    });
                }
                try {
                    countDownLatch.await();
                    if (atomicInteger.get() > 0) {
                        throw new IOException(atomicInteger.get() + " errors to collect ledgers from DL");
                    }
                    newFixedThreadPool.shutdown();
                    return treeSet;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.warn("Interrupted on collecting ledgers from DL : ", e);
                    throw new DLInterruptedException("Interrupted on collecting ledgers from DL : ", e);
                }
            } catch (Throwable th) {
                newFixedThreadPool.shutdown();
                throw th;
            }
        } finally {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((Namespace) it2.next()).close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void collectLedgersFromAllocator(URI uri, final Namespace namespace, List<String> list, final Set<Long> set) throws IOException {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String str = uri.getPath() + NodeBase.PATH_SEPARATOR_STR + it.next();
            try {
                Iterator<String> it2 = getZooKeeperClient(namespace).get().getChildren(str, false).iterator();
                while (it2.hasNext()) {
                    linkedBlockingQueue.add(str + NodeBase.PATH_SEPARATOR_STR + it2.next());
                }
            } catch (KeeperException e) {
                throw new ZKException("Failed to get list of pools from " + str, e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                throw new DLInterruptedException("Interrupted on getting list of pools from " + str, e2);
            }
        }
        logger.info("Collecting ledgers from allocators for {} : {}", uri, linkedBlockingQueue);
        executeAction(linkedBlockingQueue, 10, new Action<String>() { // from class: org.apache.distributedlog.auditor.DLAuditor.4
            @Override // org.apache.distributedlog.auditor.DLAuditor.Action
            public void execute(String str2) throws IOException {
                try {
                    collectLedgersFromPool(str2);
                } catch (KeeperException e3) {
                    throw new ZKException("Failed to collect ledgers from allocation pool " + str2, e3.code());
                } catch (InterruptedException e4) {
                    Thread.currentThread().interrupt();
                    throw new DLInterruptedException("Interrupted on collecting ledgers from allocation pool " + str2, e4);
                }
            }

            private void collectLedgersFromPool(String str2) throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
                Iterator<String> it3 = DLAuditor.this.getZooKeeperClient(namespace).get().getChildren(str2, false).iterator();
                while (it3.hasNext()) {
                    String str3 = str2 + NodeBase.PATH_SEPARATOR_STR + it3.next();
                    byte[] data = DLAuditor.this.getZooKeeperClient(namespace).get().getData(str3, false, new Stat());
                    if (null != data && data.length > 0) {
                        try {
                            long bytes2LogSegmentId = DLUtils.bytes2LogSegmentId(data);
                            synchronized (set) {
                                set.add(Long.valueOf(bytes2LogSegmentId));
                            }
                        } catch (NumberFormatException e3) {
                            DLAuditor.logger.warn("Invalid ledger found in allocator path {} : ", str3, e3);
                        }
                    }
                }
            }
        });
        logger.info("Collected ledgers from allocators for {}.", uri);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void collectLedgersFromDL(URI uri, final Namespace namespace, final Set<Long> set) throws IOException {
        logger.info("Enumerating {} to collect streams.", uri);
        Iterator<String> logs = namespace.getLogs();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        while (logs.hasNext()) {
            linkedBlockingQueue.add(logs.next());
        }
        logger.info("Collected {} streams from uri {} : {}", new Object[]{Integer.valueOf(linkedBlockingQueue.size()), uri, logs});
        executeAction(linkedBlockingQueue, 10, new Action<String>() { // from class: org.apache.distributedlog.auditor.DLAuditor.5
            @Override // org.apache.distributedlog.auditor.DLAuditor.Action
            public void execute(String str) throws IOException {
                DLAuditor.this.collectLedgersFromStream(namespace, str, set);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Long> collectLedgersFromStream(Namespace namespace, String str, Set<Long> set) throws IOException {
        DistributedLogManager openLog = namespace.openLog(str);
        try {
            List<LogSegmentMetadata> logSegments = openLog.getLogSegments();
            ArrayList arrayList = new ArrayList();
            for (LogSegmentMetadata logSegmentMetadata : logSegments) {
                synchronized (set) {
                    set.add(Long.valueOf(logSegmentMetadata.getLogSegmentId()));
                }
                arrayList.add(Long.valueOf(logSegmentMetadata.getLogSegmentId()));
            }
            return arrayList;
        } finally {
            openLog.close();
        }
    }

    public Map<String, Long> calculateStreamSpaceUsage(URI uri) throws IOException {
        logger.info("Collecting stream space usage for {}.", uri);
        Namespace build = NamespaceBuilder.newBuilder().conf(this.conf).uri(uri).build();
        try {
            Map<String, Long> calculateStreamSpaceUsage = calculateStreamSpaceUsage(uri, build);
            build.close();
            return calculateStreamSpaceUsage;
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    private Map<String, Long> calculateStreamSpaceUsage(final URI uri, final Namespace namespace) throws IOException {
        Iterator<String> logs = namespace.getLogs();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        while (logs.hasNext()) {
            linkedBlockingQueue.add(logs.next());
        }
        final ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        executeAction(linkedBlockingQueue, 10, new Action<String>() { // from class: org.apache.distributedlog.auditor.DLAuditor.6
            @Override // org.apache.distributedlog.auditor.DLAuditor.Action
            public void execute(String str) throws IOException {
                concurrentSkipListMap.put(str, Long.valueOf(DLAuditor.this.calculateStreamSpaceUsage(namespace, str)));
                if (atomicInteger.incrementAndGet() % 1000 == 0) {
                    DLAuditor.logger.info("Calculated {} streams from uri {}.", Integer.valueOf(atomicInteger.get()), uri);
                }
            }
        });
        return concurrentSkipListMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long calculateStreamSpaceUsage(Namespace namespace, String str) throws IOException {
        DistributedLogManager openLog = namespace.openLog(str);
        long j = 0;
        try {
            for (LogSegmentMetadata logSegmentMetadata : openLog.getLogSegments()) {
                try {
                    LedgerHandle openLedgerNoRecovery = getBookKeeperClient(namespace).get().openLedgerNoRecovery(logSegmentMetadata.getLogSegmentId(), BookKeeper.DigestType.CRC32, this.conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
                    j += openLedgerNoRecovery.getLength();
                    openLedgerNoRecovery.close();
                } catch (BKException e) {
                    logger.error("Failed to open ledger {} : ", Long.valueOf(logSegmentMetadata.getLogSegmentId()), e);
                    throw new IOException("Failed to open ledger " + logSegmentMetadata.getLogSegmentId(), e);
                } catch (InterruptedException e2) {
                    logger.warn("Interrupted on opening ledger {} : ", Long.valueOf(logSegmentMetadata.getLogSegmentId()), e2);
                    Thread.currentThread().interrupt();
                    throw new DLInterruptedException("Interrupted on opening ledger " + logSegmentMetadata.getLogSegmentId(), e2);
                }
            }
            return j;
        } finally {
            openLog.close();
        }
    }

    public long calculateLedgerSpaceUsage(URI uri) throws IOException {
        ArrayList newArrayList = Lists.newArrayList(uri);
        ZooKeeperClient build = ZooKeeperClientBuilder.newBuilder().name("DLAuditor-ZK").zkServers(validateAndGetZKServers(newArrayList)).sessionTimeoutMs(this.conf.getZKSessionTimeoutMilliseconds()).retryPolicy(new BoundExponentialBackoffRetryPolicy(this.conf.getZKRetryBackoffStartMillis(), this.conf.getZKRetryBackoffMaxMillis(), Integer.MAX_VALUE)).zkAclId(this.conf.getZkAclId()).build();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            BKDLConfig resolveBKDLConfig = resolveBKDLConfig(build, newArrayList);
            logger.info("Resolved bookkeeper config : {}", resolveBKDLConfig);
            BookKeeperClient build2 = BookKeeperClientBuilder.newBuilder().name("DLAuditor-BK").dlConfig(this.conf).zkServers(resolveBKDLConfig.getBkZkServersForWriter()).ledgersPath(resolveBKDLConfig.getBkLedgersPath()).build();
            try {
                long calculateLedgerSpaceUsage = calculateLedgerSpaceUsage(build2, newCachedThreadPool);
                build2.close();
                build.close();
                newCachedThreadPool.shutdown();
                return calculateLedgerSpaceUsage;
            } catch (Throwable th) {
                build2.close();
                throw th;
            }
        } catch (Throwable th2) {
            build.close();
            newCachedThreadPool.shutdown();
            throw th2;
        }
    }

    private long calculateLedgerSpaceUsage(BookKeeperClient bookKeeperClient, ExecutorService executorService) throws IOException {
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        AtomicLong atomicLong3 = new AtomicLong(0L);
        LedgerManager ledgerManager = BookKeeperAccessor.getLedgerManager(bookKeeperClient.get());
        final CompletableFuture createFuture = FutureUtils.createFuture();
        ledgerManager.asyncProcessLedgers(new AnonymousClass7(atomicLong3, executorService, bookKeeperClient.get(), atomicLong, atomicLong2), new AsyncCallback.VoidCallback() { // from class: org.apache.distributedlog.auditor.DLAuditor.8
            @Override // dlshade.org.apache.zookeeper.AsyncCallback.VoidCallback
            public void processResult(int i, String str, Object obj) {
                if (0 == i) {
                    createFuture.complete(null);
                } else {
                    createFuture.completeExceptionally(BKException.create(i));
                }
            }
        }, null, 0, -9);
        try {
            createFuture.get();
            logger.info("calculated {} ledgers\n\ttotal bytes = {}\n\ttotal entries = {}", new Object[]{Long.valueOf(atomicLong3.get()), Long.valueOf(atomicLong.get()), Long.valueOf(atomicLong2.get())});
            return atomicLong.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted on calculating ledger space : ", e);
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof IOException) {
                throw ((IOException) e2.getCause());
            }
            throw new IOException("Failed to calculate ledger space : ", e2.getCause());
        }
    }

    public void close() {
    }

    static <T> void executeAction(final LinkedBlockingQueue<T> linkedBlockingQueue, final int i, final Action<T> action) throws IOException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(linkedBlockingQueue.size());
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        for (int i2 = 0; i2 < i; i2++) {
            try {
                newFixedThreadPool.submit(new Runnable() { // from class: org.apache.distributedlog.auditor.DLAuditor.9
                    @Override // java.lang.Runnable
                    public void run() {
                        while (true) {
                            Object poll = linkedBlockingQueue.poll();
                            if (null == poll) {
                                break;
                            }
                            try {
                                action.execute(poll);
                                countDownLatch2.countDown();
                            } catch (IOException e) {
                                DLAuditor.logger.error("Failed to execute action on item '{}'", poll, e);
                                atomicInteger.incrementAndGet();
                                countDownLatch.countDown();
                            }
                        }
                        if (atomicInteger.get() == 0 && atomicInteger2.incrementAndGet() == i) {
                            countDownLatch.countDown();
                        }
                    }
                });
            } finally {
                newFixedThreadPool.shutdown();
            }
        }
        try {
            countDownLatch.await();
            if (atomicInteger.get() > 0) {
                throw new IOException("Encountered " + atomicInteger.get() + " failures on executing action.");
            }
            countDownLatch2.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on executing action", e);
            throw new DLInterruptedException("Interrupted on executing action", e);
        }
    }

    static {
        $assertionsDisabled = !DLAuditor.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(DLAuditor.class);
    }
}
