package org.apache.giraph.benchmark;

import com.google.common.collect.Sets;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.giraph.aggregators.LongSumAggregator;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/benchmark/RandomMessageBenchmark.class */
public class RandomMessageBenchmark extends GiraphBenchmark {
    public static final String SUPERSTEP_COUNT = "giraph.randomMessageBenchmark.superstepCount";
    public static final String NUM_BYTES_PER_MESSAGE = "giraph.randomMessageBenchmark.numBytesPerMessage";
    public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
    public static final String NUM_MESSAGES_PER_EDGE = "giraph.randomMessageBenchmark.numMessagesPerEdge";
    public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
    public static final String AGG_SUPERSTEP_TOTAL_BYTES = "superstep total bytes sent";
    public static final String AGG_TOTAL_BYTES = "total bytes sent";
    public static final String AGG_SUPERSTEP_TOTAL_MESSAGES = "superstep total messages";
    public static final String AGG_TOTAL_MESSAGES = "total messages";
    public static final String AGG_SUPERSTEP_TOTAL_MILLIS = "superstep total millis";
    public static final String AGG_TOTAL_MILLIS = "total millis";
    public static final String WORKERS_NUM = "workers";
    private static final BenchmarkOption BYTES_PER_MESSAGE = new BenchmarkOption("b", "bytes", true, "Message bytes per memssage", "Need to set the number of message bytes (-b)");
    private static final BenchmarkOption MESSAGES_PER_EDGE = new BenchmarkOption("n", "number", true, "Number of messages per edge", "Need to set the number of messages per edge (-n)");
    private static final BenchmarkOption FLUSH_THREADS = new BenchmarkOption("f", "flusher", true, "Number of flush threads");
    private static final Logger LOG = Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);

    /* loaded from: input_file:org/apache/giraph/benchmark/RandomMessageBenchmark$RandomMessageBenchmarkMasterCompute.class */
    public static class RandomMessageBenchmarkMasterCompute extends DefaultMasterCompute {
        @Override // org.apache.giraph.master.DefaultMasterCompute, org.apache.giraph.master.MasterCompute
        public void initialize() throws InstantiationException, IllegalAccessException {
            registerAggregator(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_BYTES, LongSumAggregator.class);
            registerAggregator(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_MESSAGES, LongSumAggregator.class);
            registerAggregator(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_MILLIS, LongSumAggregator.class);
            registerAggregator(RandomMessageBenchmark.WORKERS_NUM, LongSumAggregator.class);
        }
    }

    /* loaded from: input_file:org/apache/giraph/benchmark/RandomMessageBenchmark$RandomMessageBenchmarkWorkerContext.class */
    public static class RandomMessageBenchmarkWorkerContext extends WorkerContext {
        private static final Logger LOG = Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
        private byte[] messageBytes;
        private int numMessagesPerEdge = -1;
        private int numSupersteps = -1;
        private final Random random = new Random(System.currentTimeMillis());
        private long startSuperstepMillis = 0;
        private long totalBytes = 0;
        private long totalMessages = 0;
        private long totalMillis = 0;

        @Override // org.apache.giraph.worker.WorkerContext
        public void preApplication() throws InstantiationException, IllegalAccessException {
            this.messageBytes = new byte[getContext().getConfiguration().getInt(RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE, 16)];
            this.numMessagesPerEdge = getContext().getConfiguration().getInt(RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE, 1);
            this.numSupersteps = getContext().getConfiguration().getInt(RandomMessageBenchmark.SUPERSTEP_COUNT, -1);
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void preSuperstep() {
            long j = getAggregatedValue(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_BYTES).get();
            long j2 = getAggregatedValue(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_MESSAGES).get();
            long j3 = getAggregatedValue(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_MILLIS).get();
            long j4 = getAggregatedValue(RandomMessageBenchmark.WORKERS_NUM).get();
            if (getSuperstep() == 0) {
                this.startSuperstepMillis = System.currentTimeMillis();
            } else {
                this.totalBytes += j;
                this.totalMessages += j2;
                this.totalMillis += j3;
                double d = ((((j * j4) * 1000.0d) / 1024.0d) / 1024.0d) / j3;
                double d2 = ((((this.totalBytes * j4) * 1000.0d) / 1024.0d) / 1024.0d) / this.totalMillis;
                double d3 = ((j2 * j4) * 1000.0d) / j3;
                double d4 = ((this.totalMessages * j4) * 1000.0d) / this.totalMillis;
                if (LOG.isInfoEnabled()) {
                    LOG.info("Outputing statistics for superstep " + getSuperstep());
                    LOG.info("superstep total bytes sent : " + j);
                    LOG.info("total bytes sent : " + this.totalBytes);
                    LOG.info("superstep total messages : " + j2);
                    LOG.info("total messages : " + this.totalMessages);
                    LOG.info("superstep total millis : " + j3);
                    LOG.info("total millis : " + this.totalMillis);
                    LOG.info("workers : " + j4);
                    LOG.info("Superstep megabytes / second = " + d);
                    LOG.info("Total megabytes / second = " + d2);
                    LOG.info("Superstep messages / second = " + d3);
                    LOG.info("Total messages / second = " + d4);
                    LOG.info("Superstep megabytes / second / worker = " + (d / j4));
                    LOG.info("Total megabytes / second / worker = " + (d2 / j4));
                    LOG.info("Superstep messages / second / worker = " + (d3 / j4));
                    LOG.info("Total messages / second / worker = " + (d4 / j4));
                }
            }
            aggregate(RandomMessageBenchmark.WORKERS_NUM, new LongWritable(1L));
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void postSuperstep() {
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis - this.startSuperstepMillis;
            this.startSuperstepMillis = currentTimeMillis;
            aggregate(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(j));
        }

        @Override // org.apache.giraph.worker.WorkerContext
        public void postApplication() {
        }

        public byte[] getMessageBytes() {
            return this.messageBytes;
        }

        public int getNumMessagePerEdge() {
            return this.numMessagesPerEdge;
        }

        public int getNumSupersteps() {
            return this.numSupersteps;
        }

        public void randomizeMessageBytes() {
            this.random.nextBytes(this.messageBytes);
        }
    }

    /* loaded from: input_file:org/apache/giraph/benchmark/RandomMessageBenchmark$RandomMessageVertex.class */
    public static class RandomMessageVertex extends Vertex<LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
        @Override // org.apache.giraph.graph.Vertex
        public void compute(Iterable<BytesWritable> iterable) {
            RandomMessageBenchmarkWorkerContext randomMessageBenchmarkWorkerContext = (RandomMessageBenchmarkWorkerContext) getWorkerContext();
            if (getSuperstep() >= randomMessageBenchmarkWorkerContext.getNumSupersteps()) {
                voteToHalt();
                return;
            }
            for (int i = 0; i < randomMessageBenchmarkWorkerContext.getNumMessagePerEdge(); i++) {
                randomMessageBenchmarkWorkerContext.randomizeMessageBytes();
                sendMessageToAllEdges(new BytesWritable(randomMessageBenchmarkWorkerContext.getMessageBytes()));
                aggregate(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(randomMessageBenchmarkWorkerContext.getMessageBytes().length * getNumEdges()));
                aggregate(RandomMessageBenchmark.AGG_SUPERSTEP_TOTAL_MESSAGES, new LongWritable(getNumEdges()));
            }
        }
    }

    @Override // org.apache.giraph.benchmark.GiraphBenchmark
    public Set<BenchmarkOption> getBenchmarkOptions() {
        return Sets.newHashSet(new BenchmarkOption[]{BenchmarkOption.SUPERSTEPS, BenchmarkOption.VERTICES, BenchmarkOption.EDGES_PER_VERTEX, BYTES_PER_MESSAGE, MESSAGES_PER_EDGE, FLUSH_THREADS});
    }

    @Override // org.apache.giraph.benchmark.GiraphBenchmark
    protected void prepareConfiguration(GiraphConfiguration giraphConfiguration, CommandLine commandLine) {
        giraphConfiguration.setVertexClass(RandomMessageVertex.class);
        giraphConfiguration.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
        giraphConfiguration.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
        giraphConfiguration.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
        giraphConfiguration.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, BenchmarkOption.VERTICES.getOptionLongValue(commandLine));
        giraphConfiguration.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(commandLine));
        giraphConfiguration.setInt(SUPERSTEP_COUNT, BenchmarkOption.SUPERSTEPS.getOptionIntValue(commandLine));
        giraphConfiguration.setInt(NUM_BYTES_PER_MESSAGE, BYTES_PER_MESSAGE.getOptionIntValue(commandLine));
        giraphConfiguration.setInt(NUM_MESSAGES_PER_EDGE, MESSAGES_PER_EDGE.getOptionIntValue(commandLine));
        if (FLUSH_THREADS.optionTurnedOn(commandLine)) {
            giraphConfiguration.setInt(GiraphConstants.MSG_NUM_FLUSH_THREADS, FLUSH_THREADS.getOptionIntValue(commandLine));
        }
    }

    public static void main(String[] strArr) throws Exception {
        System.exit(ToolRunner.run(new RandomMessageBenchmark(), strArr));
    }
}
