package com.linkedin.venice.listener;

import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.ReadStrategy;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.RoutingStrategy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.stats.AggServerQuotaUsageStats;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/listener/ReadQuotaEnforcementHandlerListenerTest.class */
public class ReadQuotaEnforcementHandlerListenerTest {
    private String nodeId = "thisNodeId";

    @Test
    public void quotaEnforcementHandlerRegistersAsStoreChangeListener() {
        HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository = (HelixCustomizedViewOfflinePushRepository) Mockito.mock(HelixCustomizedViewOfflinePushRepository.class);
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        AggServerQuotaUsageStats aggServerQuotaUsageStats = (AggServerQuotaUsageStats) Mockito.mock(AggServerQuotaUsageStats.class);
        ArrayList arrayList = new ArrayList();
        ((ReadOnlyStoreRepository) Mockito.doAnswer(invocationOnMock -> {
            arrayList.add((StoreDataChangedListener) invocationOnMock.getArgument(0));
            return null;
        }).when(readOnlyStoreRepository)).registerStoreDataChangedListener((StoreDataChangedListener) Mockito.any());
        Assert.assertEquals(arrayList.get(0), new ReadQuotaEnforcementHandler(100L, readOnlyStoreRepository, CompletableFuture.completedFuture(helixCustomizedViewOfflinePushRepository), this.nodeId, aggServerQuotaUsageStats));
    }

    @Test
    public void quotaEnforcementHandlerStaysUpToDateWithStoreChanges() {
        HashSet hashSet = new HashSet();
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        ((ReadOnlyStoreRepository) Mockito.doAnswer(invocationOnMock -> {
            return getDummyStore((String) invocationOnMock.getArgument(0), Collections.EMPTY_LIST, 10L);
        }).when(readOnlyStoreRepository)).getStore(Mockito.anyString());
        HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository = (HelixCustomizedViewOfflinePushRepository) Mockito.mock(HelixCustomizedViewOfflinePushRepository.class);
        ((HelixCustomizedViewOfflinePushRepository) Mockito.doAnswer(invocationOnMock2 -> {
            hashSet.add((String) invocationOnMock2.getArgument(0));
            return null;
        }).when(helixCustomizedViewOfflinePushRepository)).subscribeRoutingDataChange(Mockito.anyString(), (RoutingDataRepository.RoutingDataChangedListener) Mockito.any());
        ((HelixCustomizedViewOfflinePushRepository) Mockito.doAnswer(invocationOnMock3 -> {
            hashSet.remove((String) invocationOnMock3.getArgument(0));
            return null;
        }).when(helixCustomizedViewOfflinePushRepository)).unSubscribeRoutingDataChange(Mockito.anyString(), (RoutingDataRepository.RoutingDataChangedListener) Mockito.any());
        ((HelixCustomizedViewOfflinePushRepository) Mockito.doAnswer(invocationOnMock4 -> {
            return getDummyPartitionAssignment((String) invocationOnMock4.getArgument(0), this.nodeId, helixCustomizedViewOfflinePushRepository);
        }).when(helixCustomizedViewOfflinePushRepository)).getPartitionAssignments(Mockito.anyString());
        ReadQuotaEnforcementHandler readQuotaEnforcementHandler = new ReadQuotaEnforcementHandler(100L, readOnlyStoreRepository, CompletableFuture.completedFuture(helixCustomizedViewOfflinePushRepository), this.nodeId, (AggServerQuotaUsageStats) Mockito.mock(AggServerQuotaUsageStats.class));
        Store dummyStore = getDummyStore("store1", Arrays.asList(1), 10L);
        dummyStore.setCurrentVersion(1);
        readQuotaEnforcementHandler.handleStoreCreated(dummyStore);
        Assert.assertTrue(hashSet.contains(Version.composeKafkaTopic(dummyStore.getName(), 1)), "After adding a store with version 1, the throttler should be subscribed to updates for that topic");
        Assert.assertTrue(readQuotaEnforcementHandler.listTopics().contains(Version.composeKafkaTopic(dummyStore.getName(), 1)), "After adding a store with version 1, the throttler should have a bucket for that topic");
        Store dummyStore2 = getDummyStore("store2", Arrays.asList(2, 3), 10L);
        dummyStore2.setCurrentVersion(3);
        readQuotaEnforcementHandler.handleStoreCreated(dummyStore2);
        Assert.assertTrue(hashSet.contains(Version.composeKafkaTopic(dummyStore2.getName(), 3)), "After adding a store with version 3, the throttler should be subscribed to updates for that topic");
        Assert.assertTrue(readQuotaEnforcementHandler.listTopics().contains(Version.composeKafkaTopic(dummyStore2.getName(), 3)), "After adding a store with version 3, the throttler should have a bucket for that topic");
        Store dummyStore3 = getDummyStore("store2", Arrays.asList(3, 4), 10L);
        dummyStore3.setCurrentVersion(4);
        readQuotaEnforcementHandler.handleStoreCreated(dummyStore3);
        Assert.assertTrue(hashSet.contains(Version.composeKafkaTopic(dummyStore3.getName(), 4)), "After adding a store with version 4, the throttler should be subscribed to updates for that topic");
        Assert.assertTrue(readQuotaEnforcementHandler.listTopics().contains(Version.composeKafkaTopic(dummyStore3.getName(), 4)), "After adding a store with version 4, the throttler should have a bucket for that topic");
        Assert.assertFalse(hashSet.contains(Version.composeKafkaTopic(dummyStore3.getName(), 2)), "After updating a store, the throttler should no longer be subscribed to retired topics");
        Assert.assertFalse(readQuotaEnforcementHandler.listTopics().contains(Version.composeKafkaTopic(dummyStore3.getName(), 2)), "After updating a store, the throttler should no longer have a bucket for that topic");
        readQuotaEnforcementHandler.handleStoreDeleted(dummyStore3.getName());
        for (Integer num : new Integer[]{2, 3, 4}) {
            int intValue = num.intValue();
            Assert.assertFalse(hashSet.contains(Version.composeKafkaTopic(dummyStore3.getName(), intValue)), "After deleting a store, the throttler should no longer be subscribed to retired topics");
            Assert.assertFalse(readQuotaEnforcementHandler.listTopics().contains(Version.composeKafkaTopic(dummyStore3.getName(), intValue)), "After deleting a store, the throttler should no longer have a bucket for retired topics");
        }
        Assert.assertTrue(hashSet.contains(Version.composeKafkaTopic(dummyStore.getName(), 1)), "After deleting a store, the throttler should still be subscribed to unrelated topics");
        Assert.assertTrue(readQuotaEnforcementHandler.listTopics().contains(Version.composeKafkaTopic(dummyStore.getName(), 1)), "After deleting a store, the throttler should still have buckets for unrelated topics");
    }

    private Store getDummyStore(String str, List<Integer> list, long j) {
        ZKStore zKStore = new ZKStore(str, "owner", System.currentTimeMillis(), PersistenceType.IN_MEMORY, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, 1);
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            zKStore.addVersion(new VersionImpl(str, intValue, Version.composeKafkaTopic(str, intValue)));
        }
        zKStore.setReadQuotaInCU(j);
        return zKStore;
    }

    public static PartitionAssignment getDummyPartitionAssignment(String str, String str2, HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository) {
        PartitionAssignment partitionAssignment = (PartitionAssignment) Mockito.mock(PartitionAssignment.class);
        ((PartitionAssignment) Mockito.doReturn(str).when(partitionAssignment)).getTopic();
        Instance instance = new Instance(str2, "dummyHost", 1234);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ((Partition) Mockito.doReturn(0).when(partition)).getId();
        ((PartitionAssignment) Mockito.doReturn(Collections.singletonList(partition)).when(partitionAssignment)).getAllPartitions();
        ArrayList arrayList = new ArrayList();
        ReplicaState replicaState = (ReplicaState) Mockito.mock(ReplicaState.class);
        ((ReplicaState) Mockito.doReturn(instance.getNodeId()).when(replicaState)).getParticipantId();
        ((ReplicaState) Mockito.doReturn(ExecutionStatus.COMPLETED.name()).when(replicaState)).getVenicePushStatus();
        arrayList.add(replicaState);
        Mockito.when(helixCustomizedViewOfflinePushRepository.getReplicaStates(str, partition.getId())).thenReturn(arrayList);
        return partitionAssignment;
    }
}
