package alluxio.stress.cli;

import alluxio.ClientContext;
import alluxio.conf.InstancedConfiguration;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.grpc.Metric;
import alluxio.master.MasterClientContext;
import alluxio.stress.CachingBlockMasterClient;
import alluxio.stress.rpc.BlockMasterBenchParameters;
import alluxio.stress.rpc.RpcTaskResult;
import alluxio.util.FormatUtils;
import alluxio.worker.block.BlockMasterClient;
import alluxio.worker.block.BlockStoreLocation;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/stress/cli/WorkerHeartbeatBench.class */
public class WorkerHeartbeatBench extends RpcBench<BlockMasterBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerHeartbeatBench.class);
    private static final List<Metric> EMPTY_METRICS = ImmutableList.of();
    private static final List<Long> EMPTY_REMOVED_BLOCKS = ImmutableList.of();

    @ParametersDelegate
    private BlockMasterBenchParameters mParameters = new BlockMasterBenchParameters();
    private final InstancedConfiguration mConf = InstancedConfiguration.defaults();
    private Deque<Long> mWorkerPool = new ArrayDeque();
    private List<LocationBlockIdListEntry> mLocationBlockIdList;

    @Override // alluxio.stress.cli.RpcBench
    public RpcTaskResult runRPC() throws Exception {
        RpcTaskResult rpcTaskResult = new RpcTaskResult();
        if (this.mWorkerPool == null) {
            rpcTaskResult.addError("Worker ID pool is null");
            return rpcTaskResult;
        }
        if (this.mWorkerPool.isEmpty()) {
            rpcTaskResult.addError("No more worker IDs for use");
            return rpcTaskResult;
        }
        long longValue = this.mWorkerPool.poll().longValue();
        LOG.info("Acquired worker ID {}", Long.valueOf(longValue));
        CachingBlockMasterClient cachingBlockMasterClient = new CachingBlockMasterClient(MasterClientContext.newBuilder(ClientContext.create(this.mConf)).build(), this.mLocationBlockIdList);
        long parseTimeSize = FormatUtils.parseTimeSize(this.mParameters.mDuration);
        Instant now = Instant.now();
        Instant plus = now.plus(parseTimeSize, (TemporalUnit) ChronoUnit.MILLIS);
        LOG.info("Test start time {}, end time {}", now, plus);
        RpcTaskResult simulateBlockHeartbeat = simulateBlockHeartbeat(cachingBlockMasterClient, longValue, plus);
        LOG.info("Test finished with results: {}", simulateBlockHeartbeat);
        return simulateBlockHeartbeat;
    }

    @Override // alluxio.stress.cli.RpcBench
    public BlockMasterBenchParameters getParameters() {
        return this.mParameters;
    }

    private RpcTaskResult simulateBlockHeartbeat(BlockMasterClient blockMasterClient, long j, Instant instant) {
        RpcTaskResult rpcTaskResult = new RpcTaskResult();
        while (Instant.now().isBefore(instant)) {
            Instant now = Instant.now();
            try {
                LOG.debug("Received command from heartbeat {}", blockMasterClient.heartbeat(j, RpcBenchPreparationUtils.CAPACITY_MEM, RpcBenchPreparationUtils.USED_MEM_EMPTY, EMPTY_REMOVED_BLOCKS, ImmutableMap.of(), RpcBenchPreparationUtils.LOST_STORAGE, EMPTY_METRICS));
                RpcTaskResult.Point point = new RpcTaskResult.Point(Duration.between(now, Instant.now()).toMillis());
                LOG.debug("Iter {} took {}ms", 0L, Long.valueOf(point.mDurationMs));
                rpcTaskResult.addPoint(point);
            } catch (Exception e) {
                LOG.error("Failed to run blockHeartbeat {}", 0L, e);
                rpcTaskResult.addError(e.getMessage());
            }
        }
        return rpcTaskResult;
    }

    @Override // alluxio.stress.cli.Benchmark
    public String getBenchDescription() {
        return String.join("\n", (Iterable<? extends CharSequence>) ImmutableList.of("A benchmarking tool for the WorkerHeartbeat RPC.", "The test will generate a specified number of blocks in the master (without associated files). The test will also register the simulated workers with the master. Then it will keep generating heartbeats with the specified load and sending heartbeats to the master nonstop, until the specified time has elapsed.", "", "Example:", "# 2 job workers will be chosen to run the benchmark", "# Each job worker runs 3 threads each simulating one worker", "# Each worker will have 3000 blocks on tier 0 and 10000 blocks on tier 1", "# Keep sending heartbeats for 30s", "$ bin/alluxio runClass alluxio.stress.cli.WorkerHeartbeatBench --concurrency 3 \\", "--cluster --cluster-limit 2 --tiers \"1000,1000,1000;5000,5000\" --duration 30s", ""));
    }

    @Override // alluxio.stress.cli.Benchmark
    public void prepare() throws Exception {
        LOG.info("Task ID is {}", this.mBaseParameters.mId);
        Map<BlockStoreLocation, List<Long>> generateBlockIdOnTiers = RpcBenchPreparationUtils.generateBlockIdOnTiers(this.mParameters.mTiers);
        BlockMasterClient blockMasterClient = new BlockMasterClient(MasterClientContext.newBuilder(ClientContext.create(this.mConf)).build());
        this.mLocationBlockIdList = blockMasterClient.convertBlockListMapToProto(generateBlockIdOnTiers);
        if (!this.mBaseParameters.mDistributed) {
            LOG.info("Preparing block IDs at the master");
            RpcBenchPreparationUtils.prepareBlocksInMaster(generateBlockIdOnTiers);
            LOG.info("Created all blocks at the master");
        }
        int i = this.mParameters.mConcurrency;
        LOG.info("Register {} simulated workers for the test", Integer.valueOf(i));
        this.mWorkerPool = RpcBenchPreparationUtils.prepareWorkerIds(blockMasterClient, i);
        Preconditions.checkState(this.mWorkerPool.size() == i, "Expecting %s workers but registered %s", i, this.mWorkerPool.size());
        RpcBenchPreparationUtils.registerWorkers(blockMasterClient, this.mWorkerPool);
        LOG.info("All workers registered with the master {}", this.mWorkerPool);
    }

    public static void main(String[] strArr) {
        mainInternal(strArr, new WorkerHeartbeatBench());
    }
}
