package com.linkedin.venice.throttle;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.router.throttle.ReadRequestThrottler;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.Metric;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/throttle/TestRouterReadQuotaThrottler.class */
public class TestRouterReadQuotaThrottler {
    private VeniceClusterWrapper cluster;
    private int numberOfRouter = 2;
    private String storeName;
    private int currentVersion;

    @BeforeClass(alwaysRun = true)
    public void setUp() throws Exception {
        this.cluster = ServiceFactory.getVeniceCluster(1, 1, this.numberOfRouter);
        VersionCreationResponse newStoreVersion = this.cluster.getNewStoreVersion();
        Assert.assertFalse(newStoreVersion.isError());
        this.storeName = newStoreVersion.getName();
        this.currentVersion = newStoreVersion.getVersion();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(this.cluster.getKafka().getAddress());
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"string\"");
        try {
            VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer("\"string\"");
            try {
                VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(newStoreVersion.getKafkaTopic()).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).build());
                try {
                    String uniqueString = Utils.getUniqueString("key");
                    String uniqueString2 = Utils.getUniqueString("value");
                    createVeniceWriter.broadcastStartOfPush(new HashMap());
                    createVeniceWriter.put(uniqueString, uniqueString2, 1).get();
                    createVeniceWriter.broadcastEndOfPush(new HashMap());
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                    veniceAvroKafkaSerializer2.close();
                    veniceAvroKafkaSerializer.close();
                    TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
                        if (ControllerClient.getStore(this.cluster.getAllControllersURLs(), this.cluster.getClusterName(), this.storeName).getStore().getCurrentVersion() == 0) {
                            return false;
                        }
                        this.cluster.refreshAllRouterMetaData();
                        return true;
                    });
                } catch (Throwable th) {
                    if (createVeniceWriter != null) {
                        try {
                            createVeniceWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            try {
                veniceAvroKafkaSerializer.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        this.cluster.close();
    }

    @Test(groups = {"flaky"}, timeOut = 60000)
    public void testReadRequestBeThrottled() throws InterruptedException {
        long seconds = TimeUnit.MILLISECONDS.toSeconds(ReadRequestThrottler.DEFAULT_STORE_QUOTA_TIME_WINDOW);
        long j = 10;
        this.cluster.getLeaderVeniceController().getVeniceAdmin().updateStore(this.cluster.getClusterName(), this.storeName, new UpdateStoreQueryParams().setReadQuotaInCU(10L));
        TestUtils.waitForNonDeterministicCompletion(3L, TimeUnit.SECONDS, () -> {
            Store store = this.cluster.getRandomVeniceRouter().getMetaDataRepository().getStore(this.storeName);
            return store.getCurrentVersion() == this.currentVersion && store.getReadQuotaInCU() == j;
        });
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.cluster.getRandomRouterURL()));
        for (int i = 0; i < (10 / this.numberOfRouter) * seconds; i++) {
            try {
                andStartGenericAvroClient.get("empty-key").get();
            } catch (ExecutionException e) {
                Assert.fail("Usage has not exceeded the quota.");
            }
        }
        for (int i2 = 0; i2 < 5; i2++) {
            try {
                andStartGenericAvroClient.get("empty-key").get();
            } catch (ExecutionException e2) {
            }
        }
        Assert.fail("Usage has exceeded the quota, should get the QuotaExceededException.");
        this.cluster.stopVeniceRouter(this.cluster.getRandomVeniceRouter().getPort());
        TestUtils.waitForNonDeterministicCompletion(3L, TimeUnit.SECONDS, () -> {
            return this.cluster.getRandomVeniceRouter().getRoutersClusterManager().getLiveRoutersCount() == 1;
        });
        AvroGenericStoreClient andStartGenericAvroClient2 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.cluster.getRandomRouterURL()));
        for (int i3 = 0; i3 < 10 * seconds; i3++) {
            try {
                andStartGenericAvroClient2.get("empty-key").get();
            } catch (ExecutionException e3) {
                Assert.fail("Usage has not exceeded the quota.");
            }
        }
        for (int i4 = 0; i4 < 5; i4++) {
            try {
                andStartGenericAvroClient2.get("empty-key").get();
            } catch (ExecutionException e4) {
                return;
            }
        }
        Assert.fail("Usage has exceeded the quota, should get the QuotaExceededException.");
    }

    @Test(priority = 1, groups = {"flaky"}, timeOut = 60000)
    public void testNoopThrottlerCanReportPerRouterStoreQuota() {
        Properties properties = new Properties();
        properties.put("router.enable.read.throttling", "false");
        Iterator<VeniceRouterWrapper> it = this.cluster.getVeniceRouters().iterator();
        while (it.hasNext()) {
            this.cluster.removeVeniceRouter(it.next().getPort());
            this.cluster.addVeniceRouter(properties);
        }
        Map metrics = this.cluster.getRandomVeniceRouter().getMetricsRepository().metrics();
        ControllerClient controllerClient = new ControllerClient(this.cluster.getClusterName(), this.cluster.getAllControllersURLs());
        Assert.assertFalse(controllerClient.getStore(this.storeName).isError());
        double readQuotaInCU = r0.getStore().getReadQuotaInCU() / this.numberOfRouter;
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(metrics.containsKey("." + this.storeName + "--read_quota_per_router.Gauge"));
            Assert.assertEquals(((Metric) metrics.get("." + this.storeName + "--read_quota_per_router.Gauge")).value(), readQuotaInCU, 1.0E-4d);
        });
        controllerClient.updateStore(this.storeName, new UpdateStoreQueryParams().setReadQuotaInCU(1000000L));
        Assert.assertFalse(controllerClient.getStore(this.storeName).isError());
        double readQuotaInCU2 = r0.getStore().getReadQuotaInCU() / this.numberOfRouter;
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(metrics.containsKey("." + this.storeName + "--read_quota_per_router.Gauge"));
            Assert.assertEquals(((Metric) metrics.get("." + this.storeName + "--read_quota_per_router.Gauge")).value(), readQuotaInCU2, 1.0E-4d);
        });
    }
}
