package org.apache.bookkeeper.tools.perf.dlog;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.class */
public class PerfSegmentReader extends PerfReaderBase {
    private static final Logger log = LoggerFactory.getLogger(PerfSegmentReader.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader$Split.class */
    public static class Split {
        final DistributedLogManager manager;
        final LogSegmentMetadata segment;
        final long startEntryId;
        final long endEntryId;

        public Split(DistributedLogManager distributedLogManager, LogSegmentMetadata logSegmentMetadata, long j, long j2) {
            this.manager = distributedLogManager;
            this.segment = logSegmentMetadata;
            this.startEntryId = j;
            this.endEntryId = j2;
        }

        public DistributedLogManager getManager() {
            return this.manager;
        }

        public LogSegmentMetadata getSegment() {
            return this.segment;
        }

        public long getStartEntryId() {
            return this.startEntryId;
        }

        public long getEndEntryId() {
            return this.endEntryId;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Split)) {
                return false;
            }
            Split split = (Split) obj;
            if (!split.canEqual(this) || getStartEntryId() != split.getStartEntryId() || getEndEntryId() != split.getEndEntryId()) {
                return false;
            }
            DistributedLogManager manager = getManager();
            DistributedLogManager manager2 = split.getManager();
            if (manager == null) {
                if (manager2 != null) {
                    return false;
                }
            } else if (!manager.equals(manager2)) {
                return false;
            }
            LogSegmentMetadata segment = getSegment();
            LogSegmentMetadata segment2 = split.getSegment();
            return segment == null ? segment2 == null : segment.equals(segment2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Split;
        }

        public int hashCode() {
            long startEntryId = getStartEntryId();
            int i = (1 * 59) + ((int) ((startEntryId >>> 32) ^ startEntryId));
            long endEntryId = getEndEntryId();
            int i2 = (i * 59) + ((int) ((endEntryId >>> 32) ^ endEntryId));
            DistributedLogManager manager = getManager();
            int hashCode = (i2 * 59) + (manager == null ? 43 : manager.hashCode());
            LogSegmentMetadata segment = getSegment();
            return (hashCode * 59) + (segment == null ? 43 : segment.hashCode());
        }

        public String toString() {
            return "PerfSegmentReader.Split(manager=" + getManager() + ", segment=" + getSegment() + ", startEntryId=" + getStartEntryId() + ", endEntryId=" + getEndEntryId() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PerfSegmentReader(ServiceURI serviceURI, PerfReaderBase.Flags flags) {
        super(serviceURI, flags);
    }

    @Override // org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase
    protected void execute(Namespace namespace) throws Exception {
        ArrayList arrayList = new ArrayList(this.flags.numLogs);
        for (int i = 0; i < this.flags.numLogs; i++) {
            arrayList.add(namespace.openLog(String.format(this.flags.logName, Integer.valueOf(i))));
        }
        log.info("Successfully open {} logs", Integer.valueOf(arrayList.size()));
        List list = (List) ((List) arrayList.stream().flatMap(distributedLogManager -> {
            try {
                return distributedLogManager.getLogSegments().stream().map(logSegmentMetadata -> {
                    return Pair.of(distributedLogManager, logSegmentMetadata);
                });
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }).collect(Collectors.toList())).stream().flatMap(pair -> {
            return getNumSplits((DistributedLogManager) pair.getLeft(), (LogSegmentMetadata) pair.getRight()).stream();
        }).collect(Collectors.toList());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.isDone.set(true);
            printAggregatedStats(this.cumulativeRecorder);
        }));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.flags.numThreads);
        for (int i2 = 0; i2 < this.flags.numThreads; i2++) {
            try {
                int i3 = i2;
                List list2 = (List) list.stream().filter(split -> {
                    return list.indexOf(split) % this.flags.numThreads == i3;
                }).collect(Collectors.toList());
                newFixedThreadPool.submit(() -> {
                    try {
                        read(list2);
                    } catch (Exception e) {
                        log.error("Encountered error at writing records", e);
                    }
                });
            } catch (Throwable th) {
                newFixedThreadPool.shutdown();
                if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                    newFixedThreadPool.shutdownNow();
                }
                arrayList.forEach(distributedLogManager2 -> {
                    distributedLogManager2.asyncClose();
                });
                throw th;
            }
        }
        log.info("Started {} write threads", Integer.valueOf(this.flags.numThreads));
        reportStats();
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        arrayList.forEach(distributedLogManager22 -> {
            distributedLogManager22.asyncClose();
        });
    }

    void read(List<Split> list) throws Exception {
        log.info("Read thread started with : splits = {}", list.stream().map(split -> {
            return "(log = " + split.manager.getStreamName() + ", segment = " + split.segment.getLogSegmentSequenceNumber() + " [" + split.startEntryId + ", " + split.endEntryId + "])";
        }).collect(Collectors.toList()));
        list.forEach(split2 -> {
            try {
                readSegmentSplit(split2);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    void readSegmentSplit(Split split) throws Exception {
        LogSegmentEntryReader logSegmentEntryReader = (LogSegmentEntryReader) FutureUtils.result(split.manager.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER).openReader(split.segment, split.getStartEntryId()));
        logSegmentEntryReader.start();
        try {
            MutableBoolean mutableBoolean = new MutableBoolean(false);
            while (!mutableBoolean.booleanValue()) {
                ((List) FutureUtils.result(logSegmentEntryReader.readNext(100))).forEach(reader -> {
                    while (true) {
                        try {
                            try {
                                if (reader.nextRecord() == null) {
                                    break;
                                }
                                this.recordsRead.increment();
                                this.bytesRead.add(r0.getPayloadBuf().readableBytes());
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        } finally {
                            reader.release();
                        }
                    }
                    if (split.getEndEntryId() < 0 || reader.getEntryId() < split.getEndEntryId()) {
                        return;
                    }
                    mutableBoolean.setValue(true);
                });
            }
            logSegmentEntryReader.asyncClose();
        } catch (EndOfLogSegmentException e) {
            logSegmentEntryReader.asyncClose();
        } catch (Throwable th) {
            logSegmentEntryReader.asyncClose();
            throw th;
        }
    }

    List<Split> getNumSplits(DistributedLogManager distributedLogManager, LogSegmentMetadata logSegmentMetadata) {
        if (this.flags.numSplitsPerSegment <= 1) {
            return Lists.newArrayList(new Split[]{new Split(distributedLogManager, logSegmentMetadata, 0L, -1L)});
        }
        long lastEntryId = logSegmentMetadata.getLastEntryId();
        long j = (lastEntryId + 1) / 2;
        long j2 = 0;
        ArrayList arrayList = new ArrayList(this.flags.numSplitsPerSegment);
        int i = 0;
        while (i < this.flags.numSplitsPerSegment) {
            long j3 = j2;
            long j4 = i == this.flags.numSplitsPerSegment - 1 ? lastEntryId : (j2 + j) - 1;
            arrayList.add(new Split(distributedLogManager, logSegmentMetadata, j3, j4));
            j2 = j4 + 1;
            i++;
        }
        return arrayList;
    }

    @Override // org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase, java.lang.Runnable
    public /* bridge */ /* synthetic */ void run() {
        super.run();
    }
}
