package com.linkedin.venice.fastclient;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.fastclient.ClientConfig;
import com.linkedin.venice.fastclient.meta.StoreMetadataFetchMode;
import com.linkedin.venice.fastclient.schema.TestValueSchema;
import com.linkedin.venice.fastclient.utils.AbstractClientEndToEndSetup;
import com.linkedin.venice.fastclient.utils.ClientTestUtils;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.utils.TestUtils;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.class */
public class AvroStoreClientEndToEndTest extends AbstractClientEndToEndSetup {
    protected void runTest(ClientConfig.ClientConfigBuilder clientConfigBuilder, boolean z, int i, Optional<AvroGenericStoreClient> optional, StoreMetadataFetchMode storeMetadataFetchMode) throws Exception {
        runTest(clientConfigBuilder, z, i, metricsRepository -> {
        }, metricsRepository2 -> {
        }, null, optional, storeMetadataFetchMode);
    }

    protected void runTest(ClientConfig.ClientConfigBuilder clientConfigBuilder, boolean z, int i, Consumer<MetricsRepository> consumer, Consumer<MetricsRepository> consumer2, MetricsRepository metricsRepository, Optional<AvroGenericStoreClient> optional, StoreMetadataFetchMode storeMetadataFetchMode) throws Exception {
        MetricsRepository metricsRepository2 = new MetricsRepository();
        AvroGenericStoreClient avroGenericStoreClient = null;
        AvroGenericStoreClient avroGenericStoreClient2 = null;
        try {
            AvroGenericStoreClient<String, GenericRecord> genericFastClient = getGenericFastClient(clientConfigBuilder, metricsRepository2, storeMetadataFetchMode);
            AvroGenericStoreClient<String, Object> genericFastVsonClient = getGenericFastVsonClient(clientConfigBuilder.clone(), new MetricsRepository(), optional, storeMetadataFetchMode);
            if (!z) {
                for (int i2 = 0; i2 < 100; i2++) {
                    String str = "key_" + i2;
                    Assert.assertEquals(((Integer) ((GenericRecord) genericFastClient.get(str).get()).get("int_field")).intValue(), i2);
                    Object obj = genericFastVsonClient.get(str).get();
                    Assert.assertTrue(obj instanceof Map, "VsonClient should return Map, but got" + obj.getClass());
                    Assert.assertEquals(((Integer) ((Map) obj).get("int_field")).intValue(), i2);
                }
            } else if (i == 2) {
                for (int i3 = 0; i3 < 99; i3++) {
                    String str2 = "key_" + i3;
                    String str3 = "key_" + (i3 + 1);
                    HashSet hashSet = new HashSet();
                    hashSet.add(str2);
                    hashSet.add(str3);
                    Map map = (Map) genericFastClient.batchGet(hashSet).get();
                    Assert.assertEquals(map.size(), 2);
                    Assert.assertEquals(((Integer) ((GenericRecord) map.get(str2)).get("int_field")).intValue(), i3);
                    Assert.assertEquals(((Integer) ((GenericRecord) map.get(str3)).get("int_field")).intValue(), i3 + 1);
                    Map map2 = (Map) genericFastVsonClient.batchGet(hashSet).get();
                    Assert.assertEquals(map2.size(), 2);
                    Object obj2 = map2.get(str2);
                    Assert.assertTrue(obj2 instanceof Map, "VsonClient should return Map, but got " + obj2.getClass());
                    Assert.assertEquals(((Integer) ((Map) obj2).get("int_field")).intValue(), i3);
                    Object obj3 = map2.get(str3);
                    Assert.assertTrue(obj3 instanceof Map, "VsonClient should return Map, but got " + obj3.getClass());
                    Assert.assertEquals(((Integer) ((Map) obj3).get("int_field")).intValue(), i3 + 1);
                }
            } else {
                if (i != 100) {
                    throw new VeniceException("unsupported batchGetKeySize: " + i);
                }
                HashSet hashSet2 = new HashSet();
                for (int i4 = 0; i4 < 100; i4++) {
                    hashSet2.add("key_" + i4);
                }
                Map map3 = (Map) genericFastClient.batchGet(hashSet2).get();
                Assert.assertEquals(map3.size(), 100);
                for (int i5 = 0; i5 < 100; i5++) {
                    Assert.assertEquals(((Integer) ((GenericRecord) map3.get("key_" + i5)).get("int_field")).intValue(), i5);
                }
                Map map4 = (Map) genericFastVsonClient.batchGet(hashSet2).get();
                Assert.assertEquals(map4.size(), 100);
                for (int i6 = 0; i6 < 100; i6++) {
                    Object obj4 = map4.get("key_" + i6);
                    Assert.assertTrue(obj4 instanceof Map, "VsonClient should return Map, but got" + obj4.getClass());
                    Assert.assertEquals(((Integer) ((Map) obj4).get("int_field")).intValue(), i6);
                }
            }
            consumer.accept(metricsRepository2);
            consumer2.accept(metricsRepository);
            if (genericFastClient != null) {
                genericFastClient.close();
            }
            if (genericFastVsonClient != null) {
                genericFastVsonClient.close();
            }
            MetricsRepository metricsRepository3 = new MetricsRepository();
            AvroSpecificStoreClient<String, TestValueSchema> specificFastClient = getSpecificFastClient(clientConfigBuilder.clone(), metricsRepository3, TestValueSchema.class, storeMetadataFetchMode);
            try {
                if (!z) {
                    for (int i7 = 0; i7 < 100; i7++) {
                        Assert.assertEquals(((TestValueSchema) specificFastClient.get("key_" + i7).get()).int_field, i7);
                    }
                } else if (i == 2) {
                    for (int i8 = 0; i8 < 99; i8++) {
                        String str4 = "key_" + i8;
                        String str5 = "key_" + (i8 + 1);
                        HashSet hashSet3 = new HashSet();
                        hashSet3.add(str4);
                        hashSet3.add(str5);
                        Map map5 = (Map) specificFastClient.batchGet(hashSet3).get();
                        Assert.assertEquals(map5.size(), 2);
                        Assert.assertEquals(((TestValueSchema) map5.get(str4)).int_field, i8);
                        Assert.assertEquals(((TestValueSchema) map5.get(str5)).int_field, i8 + 1);
                    }
                } else {
                    if (i != 100) {
                        throw new VeniceException("unsupported batchGetKeySize: " + i);
                    }
                    HashSet hashSet4 = new HashSet();
                    for (int i9 = 0; i9 < 100; i9++) {
                        hashSet4.add("key_" + i9);
                    }
                    Map map6 = (Map) specificFastClient.batchGet(hashSet4).get();
                    Assert.assertEquals(map6.size(), 100);
                    for (int i10 = 0; i10 < 100; i10++) {
                        Assert.assertEquals(((TestValueSchema) map6.get("key_" + i10)).int_field, i10);
                    }
                }
                consumer.accept(metricsRepository3);
                if (specificFastClient != null) {
                    specificFastClient.close();
                }
            } catch (Throwable th) {
                if (specificFastClient != null) {
                    specificFastClient.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                avroGenericStoreClient.close();
            }
            if (0 != 0) {
                avroGenericStoreClient2.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "FastClient-Four-Boolean-A-Number-Store-Metadata-Fetch-Mode", timeOut = 60000)
    public void testFastClientGet(boolean z, boolean z2, boolean z3, boolean z4, int i, StoreMetadataFetchMode storeMetadataFetchMode) throws Exception {
        Consumer<MetricsRepository> consumer;
        ClientConfig.ClientConfigBuilder routingPendingRequestCounterInstanceBlockThreshold = new ClientConfig.ClientConfigBuilder().setStoreName(this.storeName).setR2Client(this.r2Client).setSpeculativeQueryEnabled(z2).setDualReadEnabled(z).setRoutingPendingRequestCounterInstanceBlockThreshold(100);
        if (z3) {
            routingPendingRequestCounterInstanceBlockThreshold.setMaxAllowedKeyCntInBatchGetReq(100).setUseStreamingBatchGetAsDefault(z4);
        }
        AvroGenericStoreClient<String, GenericRecord> avroGenericStoreClient = null;
        AvroSpecificStoreClient<String, TestValueSchema> avroSpecificStoreClient = null;
        AvroGenericStoreClient<String, Object> avroGenericStoreClient2 = null;
        MetricsRepository metricsRepository = new MetricsRepository();
        try {
            if (z) {
                avroGenericStoreClient = getGenericThinClient(metricsRepository);
                routingPendingRequestCounterInstanceBlockThreshold.setGenericThinClient(avroGenericStoreClient);
                avroSpecificStoreClient = getSpecificThinClient();
                routingPendingRequestCounterInstanceBlockThreshold.setSpecificThinClient(avroSpecificStoreClient);
                avroGenericStoreClient2 = getGenericVsonThinClient();
                consumer = metricsRepository2 -> {
                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                        Assert.assertTrue(((Metric) metricsRepository2.metrics().get(new StringBuilder().append(".").append(this.storeName).append(z3 ? "--multiget_streaming_" : "--").append("request_key_count.Rate").toString())).value() > 0.0d, "Thin client metrics should be incremented when dual read is enabled");
                    });
                };
            } else {
                consumer = metricsRepository3 -> {
                    metricsRepository3.metrics().forEach((str, metric) -> {
                        Assert.assertTrue(metric.value() == 0.0d, "Thin client metrics should not be incremented when dual read is disabled");
                    });
                };
            }
            runTest(routingPendingRequestCounterInstanceBlockThreshold, z3, i, metricsRepository4 -> {
                validateMetrics(metricsRepository4, z4, i, i);
            }, consumer, metricsRepository, z ? Optional.of(avroGenericStoreClient2) : Optional.empty(), storeMetadataFetchMode);
            if (avroGenericStoreClient != null) {
                avroGenericStoreClient.close();
            }
            if (avroSpecificStoreClient != null) {
                avroSpecificStoreClient.close();
            }
            if (avroGenericStoreClient2 != null) {
                avroGenericStoreClient2.close();
            }
        } catch (Throwable th) {
            if (avroGenericStoreClient != null) {
                avroGenericStoreClient.close();
            }
            if (avroSpecificStoreClient != null) {
                avroSpecificStoreClient.close();
            }
            if (avroGenericStoreClient2 != null) {
                avroGenericStoreClient2.close();
            }
            throw th;
        }
    }

    @Test(expectedExceptions = {VeniceClientException.class, ExecutionException.class}, expectedExceptionsMessageRegExp = ".* metadata is not ready, attempting to re-initialize", dataProvider = "FastClient-Three-Boolean-And-A-Number", timeOut = 60000)
    public void testFastClientWithoutServers(boolean z, boolean z2, boolean z3, int i) throws Exception {
        Iterator<VeniceServerWrapper> it = this.veniceCluster.getVeniceServers().iterator();
        while (it.hasNext()) {
            this.veniceCluster.stopVeniceServer(it.next().getPort());
        }
        ClientConfig.ClientConfigBuilder maxAllowedKeyCntInBatchGetReq = new ClientConfig.ClientConfigBuilder().setStoreName(this.storeName).setR2Client(this.r2Client).setSpeculativeQueryEnabled(z3).setDualReadEnabled(z2).setMaxAllowedKeyCntInBatchGetReq(100);
        AvroGenericStoreClient<String, GenericRecord> avroGenericStoreClient = null;
        AvroSpecificStoreClient<String, TestValueSchema> avroSpecificStoreClient = null;
        AvroGenericStoreClient<String, Object> avroGenericStoreClient2 = null;
        MetricsRepository metricsRepository = new MetricsRepository();
        if (z2) {
            try {
                avroGenericStoreClient = getGenericThinClient(metricsRepository);
                maxAllowedKeyCntInBatchGetReq.setGenericThinClient(avroGenericStoreClient);
                avroSpecificStoreClient = getSpecificThinClient();
                maxAllowedKeyCntInBatchGetReq.setSpecificThinClient(avroSpecificStoreClient);
                avroGenericStoreClient2 = getGenericVsonThinClient();
            } catch (Throwable th) {
                if (avroGenericStoreClient != null) {
                    avroGenericStoreClient.close();
                }
                if (avroSpecificStoreClient != null) {
                    avroSpecificStoreClient.close();
                }
                if (avroGenericStoreClient2 != null) {
                    avroGenericStoreClient2.close();
                }
                throw th;
            }
        }
        runTest(maxAllowedKeyCntInBatchGetReq, z, i, z2 ? Optional.of(avroGenericStoreClient2) : Optional.empty(), StoreMetadataFetchMode.SERVER_BASED_METADATA);
        if (avroGenericStoreClient != null) {
            avroGenericStoreClient.close();
        }
        if (avroSpecificStoreClient != null) {
            avroSpecificStoreClient.close();
        }
        if (avroGenericStoreClient2 != null) {
            avroGenericStoreClient2.close();
        }
    }

    @Test(dataProvider = "fastClientHTTPVariantsAndStoreMetadataFetchModes", timeOut = 60000)
    public void testFastClientGetWithDifferentHTTPVariants(ClientTestUtils.FastClientHTTPVariant fastClientHTTPVariant, StoreMetadataFetchMode storeMetadataFetchMode) throws Exception {
        ClientConfig.ClientConfigBuilder dualReadEnabled = new ClientConfig.ClientConfigBuilder().setStoreName(this.storeName).setR2Client(ClientTestUtils.getR2Client(fastClientHTTPVariant)).setDualReadEnabled(false);
        Consumer<MetricsRepository> consumer = metricsRepository -> {
            metricsRepository.metrics().forEach((str, metric) -> {
                if (str.contains("long_tail_retry_request")) {
                    Assert.assertTrue(metric.value() == 0.0d, "Long tail retry should not be triggered");
                }
            });
        };
        runTest(dualReadEnabled, false, 2, consumer, metricsRepository2 -> {
        }, null, Optional.empty(), storeMetadataFetchMode);
        runTest(dualReadEnabled, true, 2, consumer, metricsRepository3 -> {
        }, null, Optional.empty(), storeMetadataFetchMode);
    }

    @Test(dataProvider = "FastClient-One-Boolean", timeOut = 60000)
    public void testFastClientWithLongTailRetry(boolean z) throws Exception {
        String str;
        String str2;
        ClientConfig.ClientConfigBuilder r2Client = new ClientConfig.ClientConfigBuilder().setStoreName(this.storeName).setR2Client(this.r2Client);
        if (z) {
            r2Client.setMaxAllowedKeyCntInBatchGetReq(100).setUseStreamingBatchGetAsDefault(true);
        }
        if (z) {
            str = "--multiget_";
            str2 = "batch Get";
            r2Client.setLongTailRetryEnabledForBatchGet(true).setLongTailRetryThresholdForBatchGetInMicroSeconds(1);
        } else {
            str = "--";
            str2 = "single Get";
            r2Client.setLongTailRetryEnabledForSingleGet(true).setLongTailRetryThresholdForSingleGetInMicroSeconds(1);
        }
        String str3 = str;
        String str4 = str2;
        runTest(r2Client, z, 100, metricsRepository -> {
            Assert.assertTrue(((Metric) metricsRepository.metrics().get(new StringBuilder().append(".").append(this.storeName).append(str3).append("long_tail_retry_request.OccurrenceRate").toString())).value() > 0.0d, "Long tail retry for " + str4 + " should be triggered");
        }, metricsRepository2 -> {
        }, null, Optional.empty(), StoreMetadataFetchMode.SERVER_BASED_METADATA);
    }
}
