package com.linkedin.venice.router;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.tehuti.MetricsUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/router/TestRouterRetry.class */
public class TestRouterRetry {
    private static final int MAX_KEY_LIMIT = 10;
    VeniceClusterWrapper veniceCluster;
    private ControllerClient controllerClient;
    private VeniceWriter<Object, Object, Object> veniceWriter;
    private String routerAddr;
    private String storeVersionName;
    private int valueSchemaId;
    private String storeName;
    private static final String KEY_SCHEMA_STR = "\"string\"";
    private static final String VALUE_FIELD_NAME = "int_field";
    private static final String VALUE_SCHEMA_STR = "{\n\"type\": \"record\",\n\"name\": \"test_value_schema\",\n\"fields\": [\n  {\"name\": \"int_field\", \"type\": \"int\"}]\n}";
    private static final Schema VALUE_SCHEMA = new Schema.Parser().parse(VALUE_SCHEMA_STR);
    private static final String KEY_PREFIX = "key_";

    @BeforeClass(alwaysRun = true)
    public void setUp() throws VeniceClientException, ExecutionException, InterruptedException {
        Utils.thisIsLocalhost();
        Properties properties = new Properties();
        properties.put("router.long.tail.retry.for.single.get.threshold.ms", 1);
        properties.put("router.max.key_count.in.multiget.req", 10);
        properties.put("router.long.tail.retry.for.batch.get.threshold.ms", "1-:1");
        properties.put("router.smart.long.tail.retry.enabled", true);
        properties.put("default.offline.push.strategy", OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION.toString());
        this.veniceCluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, 100, true, false, properties);
        this.routerAddr = this.veniceCluster.getRandomRouterSslURL();
        this.storeVersionName = this.veniceCluster.getNewStoreVersion(KEY_SCHEMA_STR, VALUE_SCHEMA_STR).getKafkaTopic();
        this.storeName = Version.parseStoreFromKafkaTopicName(this.storeVersionName);
        this.valueSchemaId = 1;
        this.controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getAllControllersURLs());
        updateStore(0L, 10);
        this.veniceWriter = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(this.storeVersionName).setKeySerializer(new VeniceAvroKafkaSerializer(KEY_SCHEMA_STR)).setValueSerializer(new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR)).build());
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(this.storeVersionName);
        this.veniceWriter.broadcastStartOfPush(new HashMap());
        for (int i = 0; i < 100; i++) {
            GenericData.Record record = new GenericData.Record(VALUE_SCHEMA);
            record.put(VALUE_FIELD_NAME, Integer.valueOf(i));
            this.veniceWriter.put(KEY_PREFIX + i, record, this.valueSchemaId).get();
        }
        this.veniceWriter.broadcastEndOfPush(new HashMap());
        String allControllersURLs = this.veniceCluster.getAllControllersURLs();
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return ControllerClient.getStore(allControllersURLs, this.veniceCluster.getClusterName(), this.storeName).getStore().getCurrentVersion() == parseVersionFromKafkaTopicName;
        });
        this.veniceCluster.refreshAllRouterMetaData();
        this.veniceCluster.stopVeniceServer(this.veniceCluster.getVeniceServers().get(0).getPort());
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceWriter});
    }

    private void updateStore(long j, int i) {
        this.controllerClient.updateStore(this.storeName, new UpdateStoreQueryParams().setReadQuotaInCU(j).setReadComputationEnabled(true).setBatchGetLimit(i));
    }

    @Test(timeOut = 60000)
    public void testRouterRetry() throws ExecutionException, InterruptedException {
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.routerAddr).setSslFactory(SslUtils.getVeniceLocalSslFactory()));
        for (int i = 0; i < 100; i++) {
            try {
                HashSet hashSet = new HashSet();
                for (int i2 = 0; i2 < 9; i2++) {
                    hashSet.add(KEY_PREFIX + i2);
                }
                hashSet.add("unknown_key");
                Map map = (Map) andStartGenericAvroClient.batchGet(hashSet).get();
                Assert.assertEquals(map.size(), 9);
                Map map2 = (Map) andStartGenericAvroClient.compute().project(new String[]{VALUE_FIELD_NAME}).execute(hashSet).get();
                Assert.assertEquals(map2.size(), 9);
                for (int i3 = 0; i3 < 9; i3++) {
                    GenericData.Record record = new GenericData.Record(VALUE_SCHEMA);
                    record.put(VALUE_FIELD_NAME, Integer.valueOf(i3));
                    Assert.assertEquals(map.get(KEY_PREFIX + i3), record);
                    Assert.assertEquals(((ComputeGenericRecord) map2.get(KEY_PREFIX + i3)).get(VALUE_FIELD_NAME), Integer.valueOf(i3));
                }
                GenericData.Record record2 = new GenericData.Record(VALUE_SCHEMA);
                record2.put(VALUE_FIELD_NAME, 2);
                Assert.assertEquals((GenericRecord) andStartGenericAvroClient.get("key_2").get(), record2);
                Assert.assertNull((GenericRecord) andStartGenericAvroClient.get("unknown_key").get());
            } catch (Throwable th) {
                if (andStartGenericAvroClient != null) {
                    try {
                        andStartGenericAvroClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
        double sum = MetricsUtils.getSum(".total--no_available_replica_aborted_retry_request.Count", this.veniceCluster.getVeniceRouters());
        double sum2 = MetricsUtils.getSum(".total--multiget_streaming_no_available_replica_aborted_retry_request.Count", this.veniceCluster.getVeniceRouters());
        double sum3 = MetricsUtils.getSum(".total--compute_streaming_no_available_replica_aborted_retry_request.Count", this.veniceCluster.getVeniceRouters());
        Assert.assertTrue(sum > 0.0d, "No available aborted retry request should be triggered for single-get");
        Assert.assertTrue(sum2 > 0.0d, "No available aborted retry request should be triggered for batch-get streaming");
        Assert.assertTrue(sum3 > 0.0d, "No available aborted retry request should be triggered for compute streaming");
        double sum4 = MetricsUtils.getSum(".total--unhealthy_request.Count", this.veniceCluster.getVeniceRouters());
        double sum5 = MetricsUtils.getSum(".total--multiget_streaming_unhealthy_request.Count", this.veniceCluster.getVeniceRouters());
        double sum6 = MetricsUtils.getSum(".total--compute_streaming_unhealthy_request.Count", this.veniceCluster.getVeniceRouters());
        Assert.assertEquals(Double.valueOf(sum4), Double.valueOf(0.0d), "Unhealthy request for single-get is unexpected");
        Assert.assertEquals(Double.valueOf(sum5), Double.valueOf(0.0d), "Unhealthy request for batch-get streaming is unexpected");
        Assert.assertEquals(Double.valueOf(sum6), Double.valueOf(0.0d), "Unhealthy request for compute streaming is unexpected");
    }
}
