package org.apache.bookkeeper.tools.perf.dlog;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/tools/perf/dlog/PerfReader.class */
public class PerfReader extends PerfReaderBase {
    private static final Logger log = LoggerFactory.getLogger(PerfReader.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public PerfReader(ServiceURI serviceURI, PerfReaderBase.Flags flags) {
        super(serviceURI, flags);
    }

    @Override // org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase
    protected void execute(Namespace namespace) throws Exception {
        ArrayList arrayList = new ArrayList(this.flags.numLogs);
        for (int i = 0; i < this.flags.numLogs; i++) {
            arrayList.add(Pair.of(Integer.valueOf(i), namespace.openLog(String.format(this.flags.logName, Integer.valueOf(i)))));
        }
        log.info("Successfully open {} logs", Integer.valueOf(arrayList.size()));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.isDone.set(true);
            printAggregatedStats(this.cumulativeRecorder);
        }));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.flags.numThreads);
        for (int i2 = 0; i2 < this.flags.numThreads; i2++) {
            try {
                int i3 = i2;
                List list = (List) arrayList.stream().filter(pair -> {
                    return ((Integer) pair.getLeft()).intValue() % this.flags.numThreads == i3;
                }).map(pair2 -> {
                    return (DistributedLogManager) pair2.getRight();
                }).collect(Collectors.toList());
                newFixedThreadPool.submit(() -> {
                    try {
                        read(list);
                    } catch (Exception e) {
                        log.error("Encountered error at writing records", e);
                    }
                });
            } catch (Throwable th) {
                newFixedThreadPool.shutdown();
                if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                    newFixedThreadPool.shutdownNow();
                }
                arrayList.forEach(pair3 -> {
                    ((DistributedLogManager) pair3.getRight()).asyncClose();
                });
                throw th;
            }
        }
        log.info("Started {} write threads", Integer.valueOf(this.flags.numThreads));
        reportStats();
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        arrayList.forEach(pair32 -> {
            ((DistributedLogManager) pair32.getRight()).asyncClose();
        });
    }

    void read(List<DistributedLogManager> list) throws Exception {
        log.info("Read thread started with : logs = {}", list.stream().map(distributedLogManager -> {
            return distributedLogManager.getStreamName();
        }).collect(Collectors.toList()));
        List list2 = (List) list.stream().map(distributedLogManager2 -> {
            try {
                return distributedLogManager2.openLogReader(DLSN.InitialDLSN);
            } catch (IOException e) {
                log.error("Failed to open reader for log stream {}", distributedLogManager2.getStreamName(), e);
                throw new UncheckedIOException(e);
            }
        }).collect(Collectors.toList());
        int size = list.size();
        while (true) {
            for (int i = 0; i < size; i++) {
                if (null != ((LogReader) list2.get(i)).readNext(true)) {
                    this.recordsRead.increment();
                    this.bytesRead.add(r0.getPayloadBuf().readableBytes());
                }
            }
        }
    }

    @Override // org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase, java.lang.Runnable
    public /* bridge */ /* synthetic */ void run() {
        super.run();
    }
}
