package com.linkedin.venice.client.store;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.client.utils.StoreClientTestUtils;
import com.linkedin.venice.compute.protocol.response.ComputeResponseRecordV1;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.MockD2ServerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/venice/client/store/StoreClientPerfTest.class */
public class StoreClientPerfTest {
    private static final Logger LOGGER = LogManager.getLogger(StoreClientPerfTest.class);
    private MockD2ServerWrapper routerServer;
    private String storeName = "test_store";
    private String defaultKeySchemaStr = "\"string\"";
    private D2Client d2Client;
    private String d2ServiceName;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/client/store/StoreClientPerfTest$ResultsContainer.class */
    public static class ResultsContainer extends ArrayList<Object> {
        private static final long serialVersionUID = 1;

        private ResultsContainer() {
        }

        public Object put(Object obj) {
            add(obj);
            return obj;
        }

        public double round(Metric metric) {
            double round = Utils.round(metric.value(), 1);
            add(Double.valueOf(round));
            return round;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/client/store/StoreClientPerfTest$TestComputeRequestBuilder.class */
    public static class TestComputeRequestBuilder extends AvroComputeRequestBuilderV3<String> {
        public TestComputeRequestBuilder(InternalAvroStoreClient internalAvroStoreClient, Schema schema) {
            super(internalAvroStoreClient, schema);
        }

        public Pair<Schema, String> getResultSchema() {
            return super.getResultSchema();
        }
    }

    @BeforeTest
    public void setUp() throws Exception {
        this.d2ServiceName = Utils.getUniqueString(VeniceRouterWrapper.SERVICE_NAME);
        this.routerServer = ServiceFactory.getMockD2Server("Mock-router-server", this.d2ServiceName);
        this.d2Client = D2TestUtils.getAndStartD2Client(this.routerServer.getZkAddress());
    }

    @AfterTest
    public void cleanUp() throws Exception {
        this.routerServer.close();
    }

    private void setupSchemaAndRequest(int i, String str) throws IOException {
        this.routerServer.clearResponseMapping();
        this.routerServer.addResponseForUri("/key_schema/" + this.storeName, StoreClientTestUtils.constructHttpSchemaResponse(this.storeName, 1, this.defaultKeySchemaStr));
        this.routerServer.addResponseForUri("/discover_cluster/" + this.storeName, StoreClientTestUtils.constructHttpClusterDiscoveryResponse(this.storeName, "test_cluster", this.d2ServiceName));
        HashMap hashMap = new HashMap();
        hashMap.put(Integer.valueOf(i), str);
        this.routerServer.addResponseForUri("/value_schema/" + this.storeName + "/" + i, StoreClientTestUtils.constructHttpSchemaResponse(this.storeName, i, str));
        this.routerServer.addResponseForUri("/value_schema/" + this.storeName, StoreClientTestUtils.constructHttpMultiSchemaResponse(this.storeName, hashMap));
    }

    @Test(enabled = false)
    public void clientStressTest() throws InterruptedException, ExecutionException, IOException {
        ClientConfig d2Client = ClientConfig.defaultGenericClientConfig(this.storeName).setD2ServiceName(this.d2ServiceName).setD2Client(this.d2Client);
        int[] iArr = {1, 2, 10};
        boolean[] zArr = {true, false};
        int length = zArr.length * 2 * iArr.length;
        ArrayList arrayList = new ArrayList();
        int i = 1;
        boolean z = false;
        int length2 = zArr.length;
        for (int i2 = 0; i2 < length2; i2++) {
            boolean z2 = zArr[i2];
            int length3 = zArr.length;
            for (int i3 = 0; i3 < length3; i3++) {
                boolean z3 = zArr[i3];
                for (int i4 : iArr) {
                    ClientConfig metricsRepository = ClientConfig.cloneConfig(d2Client).setUseFastAvro(z3).setMetricsRepository(new MetricsRepository());
                    if (!z) {
                        ClientConfig metricsRepository2 = ClientConfig.cloneConfig(metricsRepository).setMetricsRepository(new MetricsRepository());
                        LOGGER.info("\n\n");
                        LOGGER.info("Warm up test.\n\n");
                        clientStressTest(metricsRepository2, i4, z2);
                        z = true;
                        LOGGER.info("\n\n");
                        LOGGER.info("Warm up finished. Beginning real tests now.\n\n");
                    }
                    LOGGER.info("\n\n");
                    LOGGER.info("Test {}/{}\n\n", Integer.valueOf(i), Integer.valueOf(length));
                    arrayList.add(clientStressTest(metricsRepository, i4, z2));
                    i++;
                    LOGGER.info("\n\n");
                    LOGGER.info("Finished {} requests with{} fast-avro at {} concurrentCallsPerBatch. All results so far:\n\n", z2 ? "compute" : "batch get", z3 ? "" : "out", Integer.valueOf(i4));
                    printCSV(arrayList);
                }
            }
        }
    }

    private void printCSV(List<ResultsContainer> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("\n\nCSV output:\n\n\n");
        sb.append("Request type,");
        sb.append("Fast Avro,");
        sb.append("Max concurrent queries,");
        sb.append("Batch get deserializer,");
        sb.append("Envelope iterable impl,");
        sb.append("Total queries,");
        sb.append("Throughput,");
        sb.append("Request serialization time Avg,");
        sb.append("Request serialization time p50,");
        sb.append("Request serialization time p99,");
        sb.append("Request submission to response time Avg,");
        sb.append("Request submission to response time p50,");
        sb.append("Request submission to response time p99,");
        sb.append("Response deserialization time Avg,");
        sb.append("Response deserialization time p50,");
        sb.append("Response deserialization time p99,");
        sb.append("Response envelope deserialization time Avg,");
        sb.append("Response envelope deserialization time p50,");
        sb.append("Response envelope deserialization time p99,");
        sb.append("Response records deserialization time Avg,");
        sb.append("Response records deserialization time p50,");
        sb.append("Response records deserialization time p99,");
        sb.append("Response records deserialization submission time Avg,");
        sb.append("Response records deserialization submission time p50,");
        sb.append("Response records deserialization submission time p99,");
        sb.append("Latency Avg,");
        sb.append("Latency p50,");
        sb.append("Latency p77,");
        sb.append("Latency p90,");
        sb.append("Latency p95,");
        sb.append("Latency p99,");
        sb.append("Latency p99.9\n");
        Iterator<ResultsContainer> it = list.iterator();
        while (it.hasNext()) {
            boolean z = true;
            Iterator<Object> it2 = it.next().iterator();
            while (it2.hasNext()) {
                Object next = it2.next();
                if (z) {
                    z = false;
                } else {
                    sb.append(",");
                }
                sb.append(next.toString());
            }
            sb.append("\n");
        }
        LOGGER.info(sb.toString());
    }

    private ResultsContainer clientStressTest(ClientConfig clientConfig, int i, boolean z) throws IOException, ExecutionException, InterruptedException {
        InternalAvroStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(clientConfig);
        try {
            MetricsRepository metricsRepository = clientConfig.getMetricsRepository();
            Schema parse = new Schema.Parser().parse("{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"ManyFloats\",       \"fields\": [                  { \"name\": \"key\", \"type\": \"string\" },         { \"name\": \"value\", \"type\": {\"type\": \"array\", \"items\": \"float\"} },         { \"name\": \"age\", \"type\": \"int\" }  ]  } ");
            HashSet hashSet = new HashSet();
            setupSchemaAndRequest(1, "{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"ManyFloats\",       \"fields\": [                  { \"name\": \"key\", \"type\": \"string\" },         { \"name\": \"value\", \"type\": {\"type\": \"array\", \"items\": \"float\"} },         { \"name\": \"age\", \"type\": \"int\" }  ]  } ");
            RecordSerializer avroGenericSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(Schema.parse("{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"ManyFloats\",       \"fields\": [                  { \"name\": \"key\", \"type\": \"string\" },         { \"name\": \"value\", \"type\": {\"type\": \"array\", \"items\": \"float\"} },         { \"name\": \"age\", \"type\": \"int\" }  ]  } "));
            ArrayList arrayList = new ArrayList();
            TestComputeRequestBuilder testComputeRequestBuilder = new TestComputeRequestBuilder(andStartGenericAvroClient, andStartGenericAvroClient.getLatestValueSchema());
            Collection collection = (Collection) parse.getFields().stream().map(field -> {
                return field.name();
            }).collect(Collectors.toList());
            testComputeRequestBuilder.project(collection);
            Schema schema = (Schema) testComputeRequestBuilder.getResultSchema().getFirst();
            RecordSerializer avroGenericSerializer2 = SerializerDeserializerFactory.getAvroGenericSerializer(schema);
            RecordDeserializer avroGenericDeserializer = SerializerDeserializerFactory.getAvroGenericDeserializer(schema);
            ArrayList arrayList2 = new ArrayList();
            LOGGER.debug("computeResultSchema : \n{}", schema.toString(true));
            for (int i2 = 0; i2 < 1000; i2++) {
                MultiGetResponseRecordV1 multiGetResponseRecordV1 = new MultiGetResponseRecordV1();
                multiGetResponseRecordV1.keyIndex = i2;
                multiGetResponseRecordV1.schemaId = 1;
                multiGetResponseRecordV1.value = ByteBuffer.wrap(avroGenericSerializer.serialize(TestWriteUtils.getRecordWithFloatArray(parse, i2, 800)));
                arrayList.add(multiGetResponseRecordV1);
                ComputeResponseRecordV1 computeResponseRecordV1 = new ComputeResponseRecordV1();
                computeResponseRecordV1.keyIndex = i2;
                GenericRecord recordWithFloatArray = TestWriteUtils.getRecordWithFloatArray(schema, i2, 800);
                recordWithFloatArray.put("__veniceComputationError__", new HashMap());
                if (i2 == 0) {
                    LOGGER.debug("computeResultRecord: {}", recordWithFloatArray.toString());
                }
                computeResponseRecordV1.value = ByteBuffer.wrap(avroGenericSerializer2.serialize(recordWithFloatArray));
                arrayList2.add(computeResponseRecordV1);
                Assert.assertEquals((GenericRecord) avroGenericDeserializer.deserialize(computeResponseRecordV1.value), recordWithFloatArray);
                hashSet.add("key" + i2);
            }
            this.routerServer.addResponseForUri("/storage/" + this.storeName, StoreClientTestUtils.constructStoreResponse(1, SerializerDeserializerFactory.getAvroGenericSerializer(MultiGetResponseRecordV1.SCHEMA$).serializeObjects(arrayList)));
            this.routerServer.addResponseForUri("/compute/" + this.storeName, StoreClientTestUtils.constructStoreResponse(1, SerializerDeserializerFactory.getAvroGenericSerializer(ComputeResponseRecordV1.SCHEMA$).serializeObjects(arrayList2)));
            int i3 = 1000;
            int i4 = 10000;
            CompletableFuture[] completableFutureArr = new CompletableFuture[i];
            long currentTimeMillis = System.currentTimeMillis();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            ResultsContainer resultsContainer = new ResultsContainer();
            LOGGER.info("");
            LOGGER.info("=============================================================================================");
            LOGGER.info("Request Type:           {}", resultsContainer.put(z ? "compute" : "batch-get"));
            LOGGER.info("Fast Avro:              {}", resultsContainer.put(Boolean.valueOf(clientConfig.isUseFastAvro())));
            LOGGER.info("Max concurrent queries: {}", resultsContainer.put(Integer.valueOf(i)));
            LOGGER.info("Total queries:          {}", resultsContainer.put(10000));
            LOGGER.info("keys/query:             {}", 1000);
            LOGGER.info("bytes/value:            {}", 800);
            LOGGER.info("");
            ComputeRequestBuilder project = andStartGenericAvroClient.compute().project(collection);
            for (int i5 = 1; i5 <= 10000; i5++) {
                completableFutureArr[i5 % i] = (z ? project.execute(hashSet).thenApply(Function.identity()) : andStartGenericAvroClient.batchGet(hashSet).thenApply(Function.identity())).handle((map, th) -> {
                    if (th == null) {
                        Assert.assertEquals(map.size(), i3, "Not enough records returned!");
                        atomicInteger2.getAndIncrement();
                        return null;
                    }
                    if (atomicInteger.getAndIncrement() >= 10) {
                        return null;
                    }
                    LOGGER.error("Query error!", th);
                    return null;
                });
                if (i5 > 0 && i5 % i == 0) {
                    CompletableFuture.allOf(completableFutureArr).get();
                }
            }
            ResultsContainer resultsContainer2 = (ResultsContainer) CompletableFuture.allOf(completableFutureArr).thenApply(r20 -> {
                Assert.assertEquals(atomicInteger2.get(), i4);
                Assert.assertEquals(atomicInteger.get(), 0);
                double currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                Map metrics = metricsRepository.metrics();
                String str = "." + this.storeName + "--";
                String str2 = z ? str + RequestType.COMPUTE.getMetricPrefix() : str + RequestType.MULTI_GET.getMetricPrefix();
                Metric metric = (Metric) metrics.get(str2 + "request_serialization_time.Avg");
                Metric metric2 = (Metric) metrics.get(str2 + "request_submission_to_response_handling_time.Avg");
                Metric metric3 = (Metric) metrics.get(str2 + "response_deserialization_time.Avg");
                Metric metric4 = (Metric) metrics.get(str2 + "response_envelope_deserialization_time.Avg");
                Metric metric5 = (Metric) metrics.get(str2 + "response_records_deserialization_time.Avg");
                Metric metric6 = (Metric) metrics.get(str2 + "response_records_deserialization_submission_to_start_time.Avg");
                Metric metric7 = (Metric) metrics.get(str2 + "request_serialization_time.50thPercentile");
                Metric metric8 = (Metric) metrics.get(str2 + "request_submission_to_response_handling_time.50thPercentile");
                Metric metric9 = (Metric) metrics.get(str2 + "response_deserialization_time.50thPercentile");
                Metric metric10 = (Metric) metrics.get(str2 + "response_envelope_deserialization_time.50thPercentile");
                Metric metric11 = (Metric) metrics.get(str2 + "response_records_deserialization_time.50thPercentile");
                Metric metric12 = (Metric) metrics.get(str2 + "response_records_deserialization_submission_to_start_time.50thPercentile");
                Metric metric13 = (Metric) metrics.get(str2 + "request_serialization_time.99thPercentile");
                Metric metric14 = (Metric) metrics.get(str2 + "request_submission_to_response_handling_time.99thPercentile");
                Metric metric15 = (Metric) metrics.get(str2 + "response_deserialization_time.99thPercentile");
                Metric metric16 = (Metric) metrics.get(str2 + "response_envelope_deserialization_time.99thPercentile");
                Metric metric17 = (Metric) metrics.get(str2 + "response_records_deserialization_time.99thPercentile");
                Metric metric18 = (Metric) metrics.get(str2 + "response_records_deserialization_submission_to_start_time.99thPercentile");
                Metric metric19 = (Metric) metrics.get(str2 + "healthy_request_latency.Avg");
                Metric metric20 = (Metric) metrics.get(str2 + "healthy_request_latency.50thPercentile");
                Metric metric21 = (Metric) metrics.get(str2 + "healthy_request_latency.77thPercentile");
                Metric metric22 = (Metric) metrics.get(str2 + "healthy_request_latency.90thPercentile");
                Metric metric23 = (Metric) metrics.get(str2 + "healthy_request_latency.95thPercentile");
                Metric metric24 = (Metric) metrics.get(str2 + "healthy_request_latency.99thPercentile");
                Metric metric25 = (Metric) metrics.get(str2 + "healthy_request_latency.99_9thPercentile");
                LOGGER.info("Throughput: {} queries/sec", resultsContainer.put(new DecimalFormat("0.0").format(i4 / (currentTimeMillis2 / 1000.0d))));
                LOGGER.info("");
                LOGGER.info("Request serialization time                       (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric)), Double.valueOf(resultsContainer.round(metric7)), Double.valueOf(resultsContainer.round(metric13)));
                LOGGER.info("Request submission to response time              (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric2)), Double.valueOf(resultsContainer.round(metric8)), Double.valueOf(resultsContainer.round(metric14)));
                LOGGER.info("Response deserialization time                    (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric3)), Double.valueOf(resultsContainer.round(metric9)), Double.valueOf(resultsContainer.round(metric15)));
                LOGGER.info("Response envelope deserialization time           (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric4)), Double.valueOf(resultsContainer.round(metric10)), Double.valueOf(resultsContainer.round(metric16)));
                LOGGER.info("Response records deserialization time            (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric5)), Double.valueOf(resultsContainer.round(metric11)), Double.valueOf(resultsContainer.round(metric17)));
                LOGGER.info("Response records deserialization submission time (Avg, p50, p99) : {} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric6)), Double.valueOf(resultsContainer.round(metric12)), Double.valueOf(resultsContainer.round(metric18)));
                LOGGER.info("Latency                    (Avg, p50, p77, p90, p95, p99, p99.9) : {} ms, \t{} ms, \t{} ms, \t{} ms, \t{} ms, \t{} ms, \t{} ms.", Double.valueOf(resultsContainer.round(metric19)), Double.valueOf(resultsContainer.round(metric20)), Double.valueOf(resultsContainer.round(metric21)), Double.valueOf(resultsContainer.round(metric22)), Double.valueOf(resultsContainer.round(metric23)), Double.valueOf(resultsContainer.round(metric24)), Double.valueOf(resultsContainer.round(metric25)));
                LOGGER.info("");
                return resultsContainer;
            }).get();
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
            return resultsContainer2;
        } catch (Throwable th2) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }
}
