package org.apache.bookkeeper.tools.perf.dlog;

import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.Unpooled;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/tools/perf/dlog/PerfWriter.class */
public class PerfWriter implements Runnable {
    private final byte[] payload;
    private final ServiceURI serviceURI;
    private final Flags flags;
    private static final Logger log = LoggerFactory.getLogger(PerfWriter.class);
    private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
    private final LongAdder recordsWritten = new LongAdder();
    private final LongAdder bytesWritten = new LongAdder();
    private final Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
    private final Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
    private final AtomicBoolean isDone = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/bookkeeper/tools/perf/dlog/PerfWriter$Flags.class */
    public static class Flags extends CliFlags {

        @Parameter(names = {"-r", "--rate"}, description = "Write rate bytes/s across log streams")
        public int writeRate = 0;

        @Parameter(names = {"-rs", "--record-size"}, description = "Log record size")
        public int recordSize = 1024;

        @Parameter(names = {"-ln", "--log-name"}, description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
        public String logName = "test-log-%06d";

        @Parameter(names = {"-l", "--num-logs"}, description = "Number of log streams")
        public int numLogs = 1;

        @Parameter(names = {"-t", "--threads"}, description = "Number of threads writing")
        public int numThreads = 1;

        @Parameter(names = {"-mob", "--max-outstanding-megabytes"}, description = "Number of threads writing")
        public long maxOutstandingMB = 200;

        @Parameter(names = {"-n", "--num-records"}, description = "Number of records to write in total. If 0, it will keep writing")
        public long numRecords = 0;

        @Parameter(names = {"-mlss", "--max-log-segment-size"}, description = "Max log segment size")
        public int maxLogSegmentSize = 67108864;

        @Parameter(names = {"-b", "--num-bytes"}, description = "Number of bytes to write in total. If 0, it will keep writing")
        public long numBytes = 0;

        @Parameter(names = {"-e", "--ensemble-size"}, description = "Ledger ensemble size")
        public int ensembleSize = 1;

        @Parameter(names = {"-w", "--write-quorum-size"}, description = "Ledger write quorum size")
        public int writeQuorumSize = 1;

        @Parameter(names = {"-a", "--ack-quorum-size"}, description = "Ledger ack quorum size")
        public int ackQuorumSize = 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PerfWriter(ServiceURI serviceURI, Flags flags) {
        this.serviceURI = serviceURI;
        this.flags = flags;
        this.payload = new byte[flags.recordSize];
        ThreadLocalRandom.current().nextBytes(this.payload);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            execute();
        } catch (Exception e) {
            log.error("Encountered exception at running dlog perf writer", e);
        }
    }

    void execute() throws Exception {
        log.info("Starting dlog perf writer with config : {}", new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this.flags));
        Namespace build = NamespaceBuilder.newBuilder().conf(newDlogConf(this.flags)).uri(this.serviceURI.getUri()).build();
        Throwable th = null;
        try {
            try {
                execute(build);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    void execute(Namespace namespace) throws Exception {
        ArrayList arrayList = new ArrayList(this.flags.numLogs);
        for (int i = 0; i < this.flags.numLogs; i++) {
            arrayList.add(Pair.of(Integer.valueOf(i), namespace.openLog(String.format(this.flags.logName, Integer.valueOf(i)))));
        }
        log.info("Successfully open {} logs", Integer.valueOf(arrayList.size()));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.isDone.set(true);
            printAggregatedStats(this.cumulativeRecorder);
        }));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.flags.numThreads);
        for (int i2 = 0; i2 < this.flags.numThreads; i2++) {
            try {
                int i3 = i2;
                List list = (List) arrayList.stream().filter(pair -> {
                    return ((Integer) pair.getLeft()).intValue() % this.flags.numThreads == i3;
                }).map(pair2 -> {
                    return (DistributedLogManager) pair2.getRight();
                }).collect(Collectors.toList());
                long j = this.flags.numRecords / this.flags.numThreads;
                long j2 = this.flags.numBytes / this.flags.numThreads;
                double d = this.flags.writeRate / this.flags.numThreads;
                long j3 = ((this.flags.maxOutstandingMB * 1024) * 1024) / this.flags.numThreads;
                newFixedThreadPool.submit(() -> {
                    try {
                        write(list, d, (int) j3, j, j2);
                    } catch (Exception e) {
                        log.error("Encountered error at writing records", e);
                    }
                });
            } catch (Throwable th) {
                newFixedThreadPool.shutdown();
                if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                    newFixedThreadPool.shutdownNow();
                }
                arrayList.forEach(pair3 -> {
                    ((DistributedLogManager) pair3.getRight()).asyncClose();
                });
                throw th;
            }
        }
        log.info("Started {} write threads", Integer.valueOf(this.flags.numThreads));
        reportStats();
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
            newFixedThreadPool.shutdownNow();
        }
        arrayList.forEach(pair32 -> {
            ((DistributedLogManager) pair32.getRight()).asyncClose();
        });
    }

    /* JADX WARN: Type inference failed for: r0v42, types: [org.apache.distributedlog.api.AsyncLogWriter] */
    void write(List<DistributedLogManager> list, double d, int i, long j, long j2) throws Exception {
        log.info("Write thread started with : logs = {}, rate = {}, num records = {}, num bytes = {}, max outstanding bytes = {}", new Object[]{list.stream().map(distributedLogManager -> {
            return distributedLogManager.getStreamName();
        }).collect(Collectors.toList()), Double.valueOf(d), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
        List list2 = (List) FutureUtils.result(FutureUtils.collect((List) list.stream().map(distributedLogManager2 -> {
            return distributedLogManager2.openAsyncLogWriter();
        }).collect(Collectors.toList())));
        long max = Math.max(0L, list2.stream().mapToLong(asyncLogWriter -> {
            return asyncLogWriter.getLastTxId();
        }).max().orElse(0L));
        RateLimiter create = d > 0.0d ? RateLimiter.create(d) : null;
        Semaphore semaphore = i > 0 ? new Semaphore(i) : null;
        if (create != null) {
            create.acquire((int) d);
        }
        long j3 = 0;
        long j4 = 0;
        int size = list.size();
        while (true) {
            for (int i2 = 0; i2 < size; i2++) {
                if (j > 0 && j3 >= j) {
                    markPerfDone();
                }
                if (j2 > 0 && j4 >= j2) {
                    markPerfDone();
                }
                if (null != semaphore) {
                    semaphore.acquire(this.payload.length);
                }
                j3++;
                j4 += this.payload.length;
                if (null != create) {
                    create.acquire(this.payload.length);
                }
                long nanoTime = System.nanoTime();
                ?? r0 = (AsyncLogWriter) list2.get(i2);
                long j5 = max + 1;
                max = r0;
                Semaphore semaphore2 = semaphore;
                r0.write(new LogRecord(j5, Unpooled.wrappedBuffer(this.payload))).thenAccept(dlsn -> {
                    if (null != semaphore2) {
                        semaphore2.release(this.payload.length);
                    }
                    this.recordsWritten.increment();
                    this.bytesWritten.add(this.payload.length);
                    long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime);
                    this.recorder.recordValue(micros);
                    this.cumulativeRecorder.recordValue(micros);
                }).exceptionally(th -> {
                    log.warn("Error at writing records", th);
                    System.exit(-1);
                    return null;
                });
            }
        }
    }

    @SuppressFBWarnings({"DM_EXIT"})
    void markPerfDone() throws Exception {
        log.info("------------------- DONE -----------------------");
        printAggregatedStats(this.cumulativeRecorder);
        this.isDone.set(true);
        Thread.sleep(5000L);
        System.exit(0);
    }

    void reportStats() {
        long nanoTime = System.nanoTime();
        Histogram histogram = null;
        while (true) {
            try {
                Thread.sleep(10000L);
                if (this.isDone.get()) {
                    return;
                }
                long nanoTime2 = System.nanoTime();
                double d = (nanoTime2 - nanoTime) / 1.0E9d;
                histogram = this.recorder.getIntervalHistogram(histogram);
                log.info("Throughput written : {}  records/s --- {} MB/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", new Object[]{throughputFormat.format(this.recordsWritten.sumThenReset() / d), throughputFormat.format(((this.bytesWritten.sumThenReset() / d) / 1024.0d) / 1024.0d), dec.format(histogram.getMean() / 1000.0d), dec.format(histogram.getValueAtPercentile(50.0d) / 1000.0d), dec.format(histogram.getValueAtPercentile(95.0d) / 1000.0d), dec.format(histogram.getValueAtPercentile(99.0d) / 1000.0d), dec.format(histogram.getValueAtPercentile(99.9d) / 1000.0d), dec.format(histogram.getValueAtPercentile(99.99d) / 1000.0d), dec.format(histogram.getMaxValue() / 1000.0d)});
                histogram.reset();
                nanoTime = nanoTime2;
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private static DistributedLogConfiguration newDlogConf(Flags flags) {
        return new DistributedLogConfiguration().setEnsembleSize(flags.ensembleSize).setWriteQuorumSize(flags.writeQuorumSize).setAckQuorumSize(flags.ackQuorumSize).setOutputBufferSize(524288).setPeriodicFlushFrequencyMilliSeconds(2).setWriteLockEnabled(false).setMaxLogSegmentBytes(flags.maxLogSegmentSize).setLogSegmentRollingIntervalMinutes(1).setExplicitTruncationByApplication(true);
    }

    private static void printAggregatedStats(Recorder recorder) {
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{dec.format(recorder.getIntervalHistogram().getMean() / 1000.0d), dec.format(r0.getValueAtPercentile(50.0d) / 1000.0d), dec.format(r0.getValueAtPercentile(95.0d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.0d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.9d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.99d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.999d) / 1000.0d), dec.format(r0.getMaxValue() / 1000.0d)});
    }
}
