package alluxio.stress.cli;

import alluxio.ClientContext;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.master.MasterClientContext;
import alluxio.stress.CachingBlockMasterClient;
import alluxio.stress.rpc.BlockMasterBenchParameters;
import alluxio.stress.rpc.RpcTaskResult;
import alluxio.stress.rpc.TierAlias;
import alluxio.worker.block.BlockMasterClient;
import alluxio.worker.block.BlockMasterSync;
import alluxio.worker.block.BlockStoreLocation;
import com.beust.jcommander.ParametersDelegate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/stress/cli/RegisterWorkerBench.class */
public class RegisterWorkerBench extends RpcBench<BlockMasterBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(RegisterWorkerBench.class);
    private List<String> mTierAliases;
    private Map<String, Long> mCapacityMap;
    private Map<String, Long> mUsedMap;
    private List<LocationBlockIdListEntry> mLocationBlockIdList;

    @ParametersDelegate
    private BlockMasterBenchParameters mParameters = new BlockMasterBenchParameters();
    private final InstancedConfiguration mConf = InstancedConfiguration.defaults();
    private Deque<Long> mWorkerPool = new ArrayDeque();

    @Override // alluxio.stress.cli.Benchmark
    public String getBenchDescription() {
        return String.join("\n", (Iterable<? extends CharSequence>) ImmutableList.of("A benchmarking tool for the RegisterWorker unary RPC.", "The test will generate a specified number of blocks in the master (without associated files). Then it will trigger the specified number of simulated workers to register at once.", "Each simulated worker will have the specified number of blocks, in order to incur the controlled stress on the master side.", "", "Example:", "# 2 job workers will be chosen to run the benchmark", "# Each job worker runs 3 simulated workers", "# Each simulated worker has 3000 blocks on tier 0 and 10000 on tier 1", "# Each simulated worker sends the register RPC once", "$ bin/alluxio runClass alluxio.stress.cli.RegisterWorkerBench --concurrency 3 \\", "--cluster --cluster-limit 2 --tiers \"1000,1000,1000;5000,5000\"", "", new String[0]));
    }

    @Override // alluxio.stress.cli.Benchmark
    public void prepare() throws Exception {
        LOG.info("Task ID is {}", this.mBaseParameters.mId);
        this.mTierAliases = getTierAliases(this.mParameters.mTiers);
        this.mCapacityMap = Maps.toMap(this.mTierAliases, str -> {
            return Long.valueOf(RpcBenchPreparationUtils.CAPACITY);
        });
        this.mUsedMap = Maps.toMap(this.mTierAliases, str2 -> {
            return 0L;
        });
        Map<BlockStoreLocation, List<Long>> generateBlockIdOnTiers = RpcBenchPreparationUtils.generateBlockIdOnTiers(this.mParameters.mTiers);
        BlockMasterClient blockMasterClient = new BlockMasterClient(MasterClientContext.newBuilder(ClientContext.create(this.mConf)).build());
        this.mLocationBlockIdList = blockMasterClient.convertBlockListMapToProto(generateBlockIdOnTiers);
        if (!this.mBaseParameters.mDistributed) {
            LOG.info("Preparing blocks at the master");
            RpcBenchPreparationUtils.prepareBlocksInMaster(generateBlockIdOnTiers);
            LOG.info("Created all blocks at the master");
        }
        int i = this.mParameters.mConcurrency;
        this.mWorkerPool = RpcBenchPreparationUtils.prepareWorkerIds(blockMasterClient, i);
        Preconditions.checkState(this.mWorkerPool.size() == i, "Expecting %s workers but registered %s", i, this.mWorkerPool.size());
        LOG.info("Prepared worker IDs: {}", this.mWorkerPool);
    }

    public static void main(String[] strArr) {
        mainInternal(strArr, new RegisterWorkerBench());
    }

    private RpcTaskResult simulateRegisterWorker(BlockMasterClient blockMasterClient) {
        RpcTaskResult rpcTaskResult = new RpcTaskResult();
        if (this.mWorkerPool == null) {
            rpcTaskResult.addError("Worker ID pool is null");
            return rpcTaskResult;
        }
        if (this.mWorkerPool.isEmpty()) {
            rpcTaskResult.addError("No more worker IDs for use");
            return rpcTaskResult;
        }
        runOnce(blockMasterClient, rpcTaskResult, 0L, this.mWorkerPool.poll().longValue());
        return rpcTaskResult;
    }

    private static List<String> getTierAliases(Map<TierAlias, List<Integer>> map) {
        LOG.info("Simulate {} tiers with config {}", Integer.valueOf(map.size()), map);
        return (List) map.keySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList());
    }

    private void runOnce(BlockMasterClient blockMasterClient, RpcTaskResult rpcTaskResult, long j, long j2) {
        try {
            Instant now = Instant.now();
            if (this.mConf.getBoolean(PropertyKey.WORKER_REGISTER_LEASE_ENABLED)) {
                LOG.info("Acquiring lease for {}", Long.valueOf(j2));
                int i = 0;
                Iterator<LocationBlockIdListEntry> it = this.mLocationBlockIdList.iterator();
                while (it.hasNext()) {
                    i += it.next().getValue().getBlockIdCount();
                }
                blockMasterClient.acquireRegisterLeaseWithBackoff(j2, i, BlockMasterSync.getDefaultAcquireLeaseRetryPolicy());
                LOG.info("Lease acquired for {}", Long.valueOf(j2));
            }
            blockMasterClient.register(j2, this.mTierAliases, this.mCapacityMap, this.mUsedMap, ImmutableMap.of(), RpcBenchPreparationUtils.LOST_STORAGE, RpcBenchPreparationUtils.EMPTY_CONFIG);
            LOG.info("Worker {} registered", Long.valueOf(j2));
            RpcTaskResult.Point point = new RpcTaskResult.Point(Duration.between(now, Instant.now()).toMillis());
            rpcTaskResult.addPoint(point);
            LOG.debug("Iter {} took {}ns", Long.valueOf(j), Long.valueOf(point.mDurationMs));
        } catch (Exception e) {
            LOG.error("Failed to run iter {}", Long.valueOf(j), e);
            rpcTaskResult.addError(e.getMessage());
        }
    }

    @Override // alluxio.stress.cli.RpcBench
    public RpcTaskResult runRPC() throws Exception {
        RpcTaskResult simulateRegisterWorker = simulateRegisterWorker(new CachingBlockMasterClient(MasterClientContext.newBuilder(ClientContext.create(this.mConf)).build(), this.mLocationBlockIdList));
        LOG.info("Received task result {}", simulateRegisterWorker);
        LOG.info("Run finished");
        return simulateRegisterWorker;
    }

    @Override // alluxio.stress.cli.RpcBench
    public BlockMasterBenchParameters getParameters() {
        return this.mParameters;
    }
}
