package com.linkedin.venice.router.throttle;

import com.linkedin.venice.exceptions.QuotaExceededException;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.RoutersClusterConfig;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.router.VeniceRouterConfig;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.utils.TestUtils;
import java.util.Arrays;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/router/throttle/ReadRequestThrottlerTest.class */
public class ReadRequestThrottlerTest {
    private ReadOnlyStoreRepository storeRepository;
    private ZkRoutersClusterManager zkRoutersClusterManager;
    private RoutingDataRepository routingDataRepository;
    private AggRouterHttpRequestStats stats;
    private Store store;
    private long totalQuota;
    private int routerCount;
    private ReadRequestThrottler throttler;
    private static final long maxCapacity = 400;
    private VeniceRouterConfig routerConfig;

    @BeforeMethod
    public void setUp() {
        this.storeRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        this.zkRoutersClusterManager = (ZkRoutersClusterManager) Mockito.mock(ZkRoutersClusterManager.class);
        this.routingDataRepository = (RoutingDataRepository) Mockito.mock(RoutingDataRepository.class);
        this.routerConfig = (VeniceRouterConfig) Mockito.mock(VeniceRouterConfig.class);
        this.totalQuota = 1000L;
        this.routerCount = 5;
        this.store = TestUtils.createTestStore("testGetQuotaForStore", "test", System.currentTimeMillis());
        this.store.setReadQuotaInCU(this.totalQuota);
        this.store.setCurrentVersion(1);
        ((ReadOnlyStoreRepository) Mockito.doReturn(this.store).when(this.storeRepository)).getStore((String) Mockito.eq(this.store.getName()));
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(this.store)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(this.totalQuota)).when(this.storeRepository)).getTotalStoreReadQuota();
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        ((ZkRoutersClusterManager) Mockito.doReturn(true).when(this.zkRoutersClusterManager)).isQuotaRebalanceEnabled();
        ((ZkRoutersClusterManager) Mockito.doReturn(true).when(this.zkRoutersClusterManager)).isThrottlingEnabled();
        ((ZkRoutersClusterManager) Mockito.doReturn(true).when(this.zkRoutersClusterManager)).isMaxCapacityProtectionEnabled();
        ((VeniceRouterConfig) Mockito.doReturn(true).when(this.routerConfig)).isPerRouterStorageNodeThrottlerEnabled();
        this.stats = (AggRouterHttpRequestStats) Mockito.mock(AggRouterHttpRequestStats.class);
        this.throttler = new ReadRequestThrottler(this.zkRoutersClusterManager, this.storeRepository, this.routingDataRepository, maxCapacity, this.stats, 0.0d, 0.0d, 1000L, 1000L, true);
    }

    @Test
    public void testCalculateStoreQuotaPerRouter() {
        Assert.assertEquals(this.throttler.calculateStoreQuotaPerRouter(this.totalQuota), this.totalQuota / this.routerCount);
        this.routerCount--;
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        this.throttler.handleRouterCountChanged(this.routerCount);
        Assert.assertEquals(this.throttler.calculateStoreQuotaPerRouter(this.totalQuota), this.totalQuota / this.routerCount);
        this.routerCount /= 2;
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        this.throttler.handleRouterCountChanged(this.routerCount);
        Assert.assertEquals(this.throttler.calculateStoreQuotaPerRouter(this.totalQuota), maxCapacity);
    }

    @Test
    public void testMayThrottleRead() {
        for (int i = 0; i < 10; i++) {
            try {
                this.throttler.mayThrottleRead(this.store.getName(), (int) ((this.totalQuota / this.routerCount) / 10), "test");
            } catch (QuotaExceededException e) {
                Assert.fail("Usage has not exceeded the quota.");
            }
        }
        try {
            this.throttler.mayThrottleRead(this.store.getName(), 10.0d, "test");
            Assert.fail("Usage has exceed the quota. Should get the QuotaExceededException.");
        } catch (QuotaExceededException e2) {
        }
    }

    @Test
    public void testOnRouterCountChanged() {
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota / (this.routerCount - 1)), "test");
            Assert.fail("Usage has exceeded the quota.");
        } catch (QuotaExceededException e) {
        }
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount - 1)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        this.throttler.handleRouterCountChanged(this.routerCount - 1);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota / (this.routerCount - 1)), "test");
            ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordTotalQuota(this.totalQuota / (this.routerCount - 1));
            ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordQuota(this.store.getName(), this.totalQuota / (this.routerCount - 1));
        } catch (QuotaExceededException e2) {
            Assert.fail("Usage has not exceeded the quota.");
        }
        this.throttler.handleRouterCountChanged(((int) this.store.getReadQuotaInCU()) + 1);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota / (((int) this.store.getReadQuotaInCU()) + 1)), "test");
        } catch (QuotaExceededException e3) {
            Assert.fail("Usage should not exceed the quota as we have non-zero quota amount.");
        }
    }

    @Test
    public void testOnStoreQuotaChanged() {
        long j = this.totalQuota + 200;
        try {
            this.throttler.mayThrottleRead(this.store.getName(), j / this.routerCount, "test");
            Assert.fail("Quota has not been updated.");
        } catch (QuotaExceededException e) {
        }
        this.store.setReadQuotaInCU(j);
        ((ReadOnlyStoreRepository) Mockito.doReturn(this.store).when(this.storeRepository)).getStore((String) Mockito.eq(this.store.getName()));
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(this.store)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(j)).when(this.storeRepository)).getTotalStoreReadQuota();
        this.throttler.handleStoreChanged(this.store);
        ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordTotalQuota(j / this.routerCount);
        ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordQuota(this.store.getName(), j / this.routerCount);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), j / this.routerCount, "test");
        } catch (QuotaExceededException e2) {
            Assert.fail("Quota has been updated. Usage does not exceed the new quota.", e2);
        }
    }

    @Test
    public void testOnStoreQuotaChangedWithMultiStores() {
        this.routerCount = 2;
        Store[] storeArr = new Store[3];
        for (int i = 0; i < 3; i++) {
            storeArr[i] = TestUtils.createTestStore("testOnStoreQuotaChangedWithMultiStores" + i, "test", System.currentTimeMillis());
            storeArr[i].setReadQuotaInCU(100 * (i + 1));
            storeArr[i].setCurrentVersion(1);
        }
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(storeArr)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(600L).when(this.storeRepository)).getTotalStoreReadQuota();
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        ((VeniceRouterConfig) Mockito.doReturn(500L).when(this.routerConfig)).getMaxReadCapacityCu();
        ((VeniceRouterConfig) Mockito.doReturn(true).when(this.routerConfig)).isPerRouterStorageNodeThrottlerEnabled();
        ReadRequestThrottler readRequestThrottler = new ReadRequestThrottler(this.zkRoutersClusterManager, this.storeRepository, this.routingDataRepository, this.stats, this.routerConfig);
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertEquals(readRequestThrottler.getStoreReadThrottler("testOnStoreQuotaChangedWithMultiStores" + i2).getQuota(), storeArr[i2].getReadQuotaInCU() / this.routerCount);
        }
        long j = 600 + 400;
        storeArr[0].setReadQuotaInCU(storeArr[0].getReadQuotaInCU() + 400);
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(j)).when(this.storeRepository)).getTotalStoreReadQuota();
        readRequestThrottler.handleStoreChanged(storeArr[0]);
        ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordTotalQuota(j / this.routerCount);
        ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordQuota(storeArr[0].getName(), storeArr[0].getReadQuotaInCU() / this.routerCount);
        for (int i3 = 0; i3 < 3; i3++) {
            Assert.assertEquals(readRequestThrottler.getStoreReadThrottler("testOnStoreQuotaChangedWithMultiStores" + i3).getQuota(), storeArr[i3].getReadQuotaInCU() / this.routerCount);
        }
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount - 1)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        readRequestThrottler.handleRouterCountChanged(this.routerCount - 1);
        ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordTotalQuota(500L);
        ((AggRouterHttpRequestStats) Mockito.verify(this.stats, Mockito.atLeastOnce())).recordQuota(storeArr[0].getName(), storeArr[0].getReadQuotaInCU() / 2.0d);
        for (int i4 = 0; i4 < 3; i4++) {
            Assert.assertEquals(readRequestThrottler.getStoreReadThrottler("testOnStoreQuotaChangedWithMultiStores" + i4).getQuota(), storeArr[i4].getReadQuotaInCU() / 2);
        }
        long j2 = j - 250;
        storeArr[0].setReadQuotaInCU(storeArr[0].getReadQuotaInCU() - 250);
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(j2)).when(this.storeRepository)).getTotalStoreReadQuota();
        readRequestThrottler.handleStoreChanged(storeArr[0]);
        for (int i5 = 0; i5 < 3; i5++) {
            Assert.assertEquals(readRequestThrottler.getStoreReadThrottler("testOnStoreQuotaChangedWithMultiStores" + i5).getQuota(), (storeArr[i5].getReadQuotaInCU() * 500) / j2);
        }
        storeArr[2].setReadQuotaInCU(storeArr[2].getReadQuotaInCU() - 250);
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(j2 - 250)).when(this.storeRepository)).getTotalStoreReadQuota();
        readRequestThrottler.handleStoreChanged(storeArr[2]);
        for (int i6 = 0; i6 < 3; i6++) {
            Assert.assertEquals(readRequestThrottler.getStoreReadThrottler("testOnStoreQuotaChangedWithMultiStores" + i6).getQuota(), storeArr[i6].getReadQuotaInCU());
        }
    }

    @Test
    public void testOnStoreCreatedAndDeleted() {
        Store createTestStore = TestUtils.createTestStore("testOnStoreCreatedAndDeleted", "test", System.currentTimeMillis());
        createTestStore.setReadQuotaInCU(200L);
        createTestStore.setCurrentVersion(1);
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(this.store, createTestStore)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(this.totalQuota + 200)).when(this.storeRepository)).getTotalStoreReadQuota();
        this.throttler.handleStoreChanged(createTestStore);
        Assert.assertEquals(this.throttler.getStoreReadThrottler("testOnStoreCreatedAndDeleted").getQuota(), 200 / this.routerCount);
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(this.store)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(this.totalQuota)).when(this.storeRepository)).getTotalStoreReadQuota();
        this.throttler.handleStoreDeleted(createTestStore.getName());
        Assert.assertNull(this.throttler.getStoreReadThrottler("testOnStoreCreatedAndDeleted"));
        this.routerCount = 1;
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        this.throttler.handleRouterCountChanged(this.routerCount);
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(this.store, createTestStore)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(Long.valueOf(this.totalQuota + 200)).when(this.storeRepository)).getTotalStoreReadQuota();
        this.throttler.handleStoreCreated(createTestStore);
        Assert.assertEquals(this.throttler.getStoreReadThrottler(this.store.getName()).getQuota(), (this.store.getReadQuotaInCU() * maxCapacity) / (this.totalQuota + 200));
        Assert.assertEquals(this.throttler.getStoreReadThrottler("testOnStoreCreatedAndDeleted").getQuota(), (200 * maxCapacity) / (this.totalQuota + 200));
        ((ReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(createTestStore)).when(this.storeRepository)).getAllStores();
        ((ReadOnlyStoreRepository) Mockito.doReturn(200L).when(this.storeRepository)).getTotalStoreReadQuota();
        this.throttler.handleStoreDeleted(this.store.getName());
        Assert.assertEquals(this.throttler.getStoreReadThrottler("testOnStoreCreatedAndDeleted").getQuota(), 200L);
    }

    @Test
    public void testOnCurrentVersionChanged() {
        this.store.setCurrentVersion(100);
        PartitionAssignment partitionAssignment = (PartitionAssignment) Mockito.mock(PartitionAssignment.class);
        String composeKafkaTopic = Version.composeKafkaTopic(this.store.getName(), 100);
        ((PartitionAssignment) Mockito.doReturn(composeKafkaTopic).when(partitionAssignment)).getTopic();
        ((RoutingDataRepository) Mockito.doReturn(true).when(this.routingDataRepository)).containsKafkaTopic((String) Mockito.eq(composeKafkaTopic));
        ((RoutingDataRepository) Mockito.doReturn(partitionAssignment).when(this.routingDataRepository)).getPartitionAssignments((String) Mockito.eq(composeKafkaTopic));
        ((PartitionAssignment) Mockito.doReturn(1).when(partitionAssignment)).getExpectedNumberOfPartitions();
        this.throttler.handleStoreChanged(this.store);
        Assert.assertEquals(this.throttler.getStoreReadThrottler(this.store.getName()).getCurrentVersion(), 100);
        ((RoutingDataRepository) Mockito.verify(this.routingDataRepository, Mockito.atLeastOnce())).unSubscribeRoutingDataChange((String) Mockito.eq(Version.composeKafkaTopic(this.store.getName(), 0)), (RoutingDataRepository.RoutingDataChangedListener) Mockito.eq(this.throttler));
        ((RoutingDataRepository) Mockito.verify(this.routingDataRepository, Mockito.atLeastOnce())).subscribeRoutingDataChange((String) Mockito.eq(Version.composeKafkaTopic(this.store.getName(), 100)), (RoutingDataRepository.RoutingDataChangedListener) Mockito.eq(this.throttler));
        this.store.setCurrentVersion(101);
        this.throttler.handleStoreChanged(this.store);
        ((RoutingDataRepository) Mockito.verify(this.routingDataRepository, Mockito.times(1))).unSubscribeRoutingDataChange((String) Mockito.eq(composeKafkaTopic), (RoutingDataRepository.RoutingDataChangedListener) Mockito.eq(this.throttler));
        this.store.setCurrentVersion(0);
        this.throttler.handleStoreChanged(this.store);
        ((RoutingDataRepository) Mockito.verify(this.routingDataRepository, Mockito.times(1))).unSubscribeRoutingDataChange((String) Mockito.eq(composeKafkaTopic), (RoutingDataRepository.RoutingDataChangedListener) Mockito.eq(this.throttler));
    }

    @Test
    public void testOnRoutingDataChanged() {
        PartitionAssignment partitionAssignment = (PartitionAssignment) Mockito.mock(PartitionAssignment.class);
        ((PartitionAssignment) Mockito.doReturn(1).when(partitionAssignment)).getExpectedNumberOfPartitions();
        ((PartitionAssignment) Mockito.doReturn(Version.composeKafkaTopic(this.store.getName(), 1)).when(partitionAssignment)).getTopic();
        this.throttler.onExternalViewChange(partitionAssignment);
        Assert.assertEquals(this.throttler.getStoreReadThrottler(this.store.getName()).getCurrentVersion(), 1);
    }

    @Test
    public void testDisableThrottling() {
        ((ZkRoutersClusterManager) Mockito.doReturn(false).when(this.zkRoutersClusterManager)).isThrottlingEnabled();
        for (int i = 0; i < 10; i++) {
            try {
                this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota * 10), "test");
            } catch (QuotaExceededException e) {
                Assert.fail("Throttling should be disabled.");
            }
        }
        ((ZkRoutersClusterManager) Mockito.doReturn(true).when(this.zkRoutersClusterManager)).isThrottlingEnabled();
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota * 10), "test");
            Assert.fail("Usage has exceed the quota. Should get the QuotaExceededException.");
        } catch (QuotaExceededException e2) {
        }
    }

    @Test
    public void testDisableQuotaRebalance() {
        ((ZkRoutersClusterManager) Mockito.doReturn(false).when(this.zkRoutersClusterManager)).isQuotaRebalanceEnabled();
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getExpectedRoutersCount();
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount - 1)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        this.throttler.handleRouterCountChanged(this.routerCount - 1);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota / (this.routerCount - 1)), "test");
            Assert.fail("As quota re-balance has been disabled, quota per router should not be increased with one router failure.");
        } catch (QuotaExceededException e) {
        }
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota / this.routerCount), "test");
        } catch (QuotaExceededException e2) {
            Assert.fail("Usage does not exceed the quota, should not throttle the request.");
        }
        ((ZkRoutersClusterManager) Mockito.doReturn(true).when(this.zkRoutersClusterManager)).isQuotaRebalanceEnabled();
        this.throttler.handleRouterClusterConfigChanged((RoutersClusterConfig) null);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) (this.totalQuota / (this.routerCount - 1)), "test");
        } catch (QuotaExceededException e3) {
            Assert.fail("Usage does not exceed the quota, should not throttle the request.");
        }
    }

    @Test
    public void testDisableMaxCapacityProtection() {
        ((ZkRoutersClusterManager) Mockito.doReturn(false).when(this.zkRoutersClusterManager)).isMaxCapacityProtectionEnabled();
        this.routerCount = 1;
        ((ZkRoutersClusterManager) Mockito.doReturn(Integer.valueOf(this.routerCount)).when(this.zkRoutersClusterManager)).getLiveRoutersCount();
        this.throttler.handleRouterClusterConfigChanged((RoutersClusterConfig) null);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) this.totalQuota, "test");
        } catch (QuotaExceededException e) {
            Assert.fail("As router protection has been disable. Current usage does not exceed the quota, should not throttle the request.");
        }
        ((ZkRoutersClusterManager) Mockito.doReturn(true).when(this.zkRoutersClusterManager)).isMaxCapacityProtectionEnabled();
        this.throttler.handleRouterClusterConfigChanged((RoutersClusterConfig) null);
        try {
            this.throttler.mayThrottleRead(this.store.getName(), (int) this.totalQuota, "test");
            Assert.fail("As router protection has been enabled. Current usage exceeds the quota.");
        } catch (QuotaExceededException e2) {
        }
    }
}
