package org.apache.bookkeeper.tools.perf.journal;

import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.Stats;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.commons.configuration.CompositeConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/tools/perf/journal/JournalWriter.class */
public class JournalWriter implements Runnable {
    private final Flags flags;
    private static final Logger log = LoggerFactory.getLogger(JournalWriter.class);
    private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
    private final LongAdder recordsWritten = new LongAdder();
    private final LongAdder bytesWritten = new LongAdder();
    private final Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
    private final Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
    private final AtomicBoolean isDone = new AtomicBoolean(false);
    private final ServerConfiguration conf = new ServerConfiguration();

    /* loaded from: input_file:org/apache/bookkeeper/tools/perf/journal/JournalWriter$Flags.class */
    public static class Flags extends CliFlags {

        @Parameter(names = {"-j", "--journal-dirs"}, description = "The list of journal directories, separated by comma", required = true)
        public List<String> journalDirs;

        @Parameter(names = {"-t", "--num-test-threads"}, description = "Num of test threads to append entries to journal")
        public int numTestThreads = 1;

        @Parameter(names = {"-nl", "--num-ledgers"}, description = "Num of ledgers to append entries to journal")
        public int numLedgers = 24;

        @Parameter(names = {"-r", "--rate"}, description = "Write rate bytes/s across journals")
        public int writeRate = 0;

        @Parameter(names = {"-s", "--entry-size"}, description = "Entry size")
        public int recordSize = 1024;

        @Parameter(names = {"-mob", "--max-outstanding-megabytes"}, description = "Number of threads writing")
        public long maxOutstandingMB = 200;

        @Parameter(names = {"-n", "--num-entries"}, description = "Number of entries to write in total. If 0, it will keep writing")
        public long numEntries = 0;

        @Parameter(names = {"-b", "--num-bytes"}, description = "Number of bytes to write in total. If 0, it will keep writing")
        public long numBytes = 0;

        @Parameter(names = {"-wb", "--write-buffer-size-kb"}, description = "Journal write buffer size")
        public int writeBufferSizeKB = 1024;

        @Parameter(names = {"--sync"}, description = "Journal sync enabled")
        public boolean journalSyncEnabled = false;

        @Parameter(names = {"-gci", "--group-commit-interval-ms"}, description = "Journal group commit interval in milliseconds")
        public int groupCommitIntervalMs = 1;

        @Parameter(names = {"-gcb", "--group-commit-max-bytes"}, description = "Journal group commit max buffered bytes")
        public int groupCommitMaxBytes = 524288;

        @Parameter(names = {"-q", "--journal-queue-size"}, description = "Journal queue size")
        public int journalQueueSize = 10000;

        @Parameter(names = {"-jt", "--num-journal-callback-threads"}, description = "Number of journal callback threads")
        public int numJournalCallbackThreads = 8;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JournalWriter(CompositeConfiguration compositeConfiguration, Flags flags) {
        this.conf.addConfiguration(compositeConfiguration);
        this.flags = flags;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            execute();
        } catch (Exception e) {
            log.error("Encountered exception at running dlog perf writer", e);
        }
    }

    void execute() throws Exception {
        log.info("Starting journal perf writer with config : {}", new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this.flags));
        Preconditions.checkArgument(this.flags.journalDirs.size() > 0, "No journal dirs is provided");
        updateServerConf(this.conf, this.flags);
        log.info("Benchmark the journal perf with server config : {}", this.conf.asJson());
        Stats.loadStatsProvider(this.conf);
        Stats.get().start(this.conf);
        StatsLogger scope = Stats.get().getStatsLogger("").scope("bookie");
        ByteBufAllocator allocator = getAllocator(this.conf);
        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(this.conf, this.conf.getLedgerDirs(), new DiskChecker(this.conf.getDiskUsageThreshold(), this.conf.getDiskUsageWarnThreshold()), NullStatsLogger.INSTANCE);
        Journal[] journalArr = new Journal[this.flags.journalDirs.size()];
        for (int i = 0; i < journalArr.length; i++) {
            Journal journal = new Journal(i, new File(this.flags.journalDirs.get(i)), this.conf, ledgerDirsManager, scope.scope("journal"), allocator);
            journalArr[i] = journal;
            journal.start();
        }
        try {
            execute(journalArr);
            for (Journal journal2 : journalArr) {
                journal2.shutdown();
            }
            Stats.get().stop();
        } catch (Throwable th) {
            for (Journal journal3 : journalArr) {
                journal3.shutdown();
            }
            Stats.get().stop();
            throw th;
        }
    }

    void execute(Journal[] journalArr) throws Exception {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.isDone.set(true);
            printAggregatedStats(this.cumulativeRecorder);
        }));
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
            for (Journal journal : journalArr) {
                CheckpointSource.Checkpoint newCheckpoint = journal.newCheckpoint();
                try {
                    journal.checkpointComplete(newCheckpoint, true);
                } catch (IOException e) {
                    log.error("Failed to complete checkpoint {}", newCheckpoint, e);
                }
            }
        }, 30L, 30L, TimeUnit.SECONDS);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.flags.numTestThreads);
        for (int i = 0; i < this.flags.numTestThreads; i++) {
            try {
                int i2 = i;
                long j = this.flags.numEntries / this.flags.numTestThreads;
                long j2 = this.flags.numBytes / this.flags.numTestThreads;
                double d = this.flags.writeRate / this.flags.numTestThreads;
                long j3 = ((this.flags.maxOutstandingMB * 1024) * 1024) / this.flags.numTestThreads;
                int i3 = this.flags.numLedgers / this.flags.numTestThreads;
                newFixedThreadPool.submit(() -> {
                    try {
                        write(i2, journalArr, i3, d, (int) j3, j, j2);
                    } catch (Throwable th) {
                        log.error("Encountered error at writing records", th);
                    }
                });
            } catch (Throwable th) {
                newSingleThreadScheduledExecutor.shutdown();
                newFixedThreadPool.shutdown();
                if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                    newFixedThreadPool.shutdownNow();
                }
                throw th;
            }
        }
        log.info("Started {} write threads", Integer.valueOf(this.flags.numTestThreads));
        reportStats();
        newSingleThreadScheduledExecutor.shutdown();
        newFixedThreadPool.shutdown();
        if (newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
            return;
        }
        newFixedThreadPool.shutdownNow();
    }

    void write(int i, Journal[] journalArr, int i2, double d, int i3, long j, long j2) throws Exception {
        log.info("Write thread {} started with : rate = {}, num records = {}, num bytes = {}, max outstanding bytes = {}", new Object[]{Integer.valueOf(i), Double.valueOf(d), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i3)});
        RateLimiter create = d > 0.0d ? RateLimiter.create(d) : null;
        Semaphore semaphore = i3 > 0 ? new Semaphore(i3) : null;
        if (create != null) {
            create.acquire((int) d);
        }
        long j3 = 0;
        long j4 = 0;
        byte[] bArr = new byte[this.flags.recordSize];
        ThreadLocalRandom.current().nextBytes(bArr);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr);
        long[] jArr = new long[i2];
        Arrays.fill(jArr, 0L);
        while (true) {
            for (Journal journal : journalArr) {
                int nextInt = ThreadLocalRandom.current().nextInt(i2);
                long j5 = (i * i2) + nextInt;
                long j6 = jArr[nextInt];
                jArr[nextInt] = j6 + 1;
                ByteBuf retainedDuplicate = wrappedBuffer.retainedDuplicate();
                int readableBytes = retainedDuplicate.readableBytes();
                if (j > 0 && j3 >= j) {
                    markPerfDone();
                }
                if (j2 > 0 && j4 >= j2) {
                    markPerfDone();
                }
                if (null != semaphore) {
                    semaphore.acquire(readableBytes);
                }
                j3++;
                j4 += readableBytes;
                if (null != create) {
                    create.acquire(readableBytes);
                }
                long nanoTime = System.nanoTime();
                Semaphore semaphore2 = semaphore;
                journal.logAddEntry(j5, j6, retainedDuplicate, false, (i4, j7, j8, bookieId, obj) -> {
                    retainedDuplicate.release();
                    if (0 != i4) {
                        log.warn("Error at writing records : {}", BookieException.create(i4));
                        Runtime.getRuntime().exit(-1);
                        return;
                    }
                    if (null != semaphore2) {
                        semaphore2.release(readableBytes);
                    }
                    this.recordsWritten.increment();
                    this.bytesWritten.add(readableBytes);
                    long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime);
                    this.recorder.recordValue(micros);
                    this.cumulativeRecorder.recordValue(micros);
                }, (Object) null);
            }
        }
    }

    @SuppressFBWarnings({"DM_EXIT"})
    void markPerfDone() throws Exception {
        log.info("------------------- DONE -----------------------");
        printAggregatedStats(this.cumulativeRecorder);
        this.isDone.set(true);
        Thread.sleep(5000L);
        System.exit(0);
    }

    void reportStats() {
        long nanoTime = System.nanoTime();
        Histogram histogram = null;
        while (true) {
            try {
                Thread.sleep(10000L);
                if (this.isDone.get()) {
                    return;
                }
                long nanoTime2 = System.nanoTime();
                double d = (nanoTime2 - nanoTime) / 1.0E9d;
                histogram = this.recorder.getIntervalHistogram(histogram);
                log.info("Throughput written : {}  records/s --- {} MB/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", new Object[]{throughputFormat.format(this.recordsWritten.sumThenReset() / d), throughputFormat.format(((this.bytesWritten.sumThenReset() / d) / 1024.0d) / 1024.0d), dec.format(histogram.getMean() / 1000.0d), dec.format(histogram.getValueAtPercentile(50.0d) / 1000.0d), dec.format(histogram.getValueAtPercentile(95.0d) / 1000.0d), dec.format(histogram.getValueAtPercentile(99.0d) / 1000.0d), dec.format(histogram.getValueAtPercentile(99.9d) / 1000.0d), dec.format(histogram.getValueAtPercentile(99.99d) / 1000.0d), dec.format(histogram.getMaxValue() / 1000.0d)});
                histogram.reset();
                nanoTime = nanoTime2;
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private static void updateServerConf(ServerConfiguration serverConfiguration, Flags flags) {
        serverConfiguration.setJournalWriteBufferSizeKB(flags.writeBufferSizeKB);
        serverConfiguration.setJournalMaxGroupWaitMSec(flags.groupCommitIntervalMs);
        serverConfiguration.setJournalBufferedWritesThreshold(flags.groupCommitMaxBytes);
        serverConfiguration.setNumJournalCallbackThreads(flags.numJournalCallbackThreads);
        serverConfiguration.setJournalQueueSize(flags.journalQueueSize);
        serverConfiguration.setJournalSyncData(flags.journalSyncEnabled);
        serverConfiguration.setLedgerDirNames((String[]) flags.journalDirs.toArray(new String[0]));
        serverConfiguration.setStatsProviderClass(PrometheusMetricsProvider.class);
        for (File file : Bookie.getCurrentDirectories(serverConfiguration.getLedgerDirs())) {
            if (file.mkdirs()) {
                log.info("Successfully created dir {}", file);
            }
        }
    }

    private static ByteBufAllocator getAllocator(ServerConfiguration serverConfiguration) {
        return ByteBufAllocatorBuilder.create().poolingPolicy(serverConfiguration.getAllocatorPoolingPolicy()).poolingConcurrency(serverConfiguration.getAllocatorPoolingConcurrency()).outOfMemoryPolicy(serverConfiguration.getAllocatorOutOfMemoryPolicy()).outOfMemoryListener(outOfMemoryError -> {
            log.error("Unable to allocate memory, exiting bookie", outOfMemoryError);
        }).leakDetectionPolicy(serverConfiguration.getAllocatorLeakDetectionPolicy()).build();
    }

    private static void printAggregatedStats(Recorder recorder) {
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{dec.format(recorder.getIntervalHistogram().getMean() / 1000.0d), dec.format(r0.getValueAtPercentile(50.0d) / 1000.0d), dec.format(r0.getValueAtPercentile(95.0d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.0d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.9d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.99d) / 1000.0d), dec.format(r0.getValueAtPercentile(99.999d) / 1000.0d), dec.format(r0.getMaxValue() / 1000.0d)});
    }
}
