package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StorageUtilizationManagerTest.class */
public class StorageUtilizationManagerTest {
    private static final long storeQuotaInBytes = 100;
    private static final long newStoreQuotaInBytes = 200;
    private static final int storePartitionCount = 10;
    private static final String storeName = "TestTopic";
    private static final String topic = Version.composeKafkaTopic(storeName, 1);
    private static final int storeVersion = Version.parseVersionFromKafkaTopicName(topic);
    private ConcurrentMap<Integer, PartitionConsumptionState> partitionConsumptionStateMap;
    private AbstractStorageEngine storageEngine;
    private StatusReportAdapter statusReportAdapter;
    private Store store;
    private Version version;
    private StorageUtilizationManager quotaEnforcer;

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StorageUtilizationManagerTest$PartitionNumberMatcher.class */
    private static class PartitionNumberMatcher implements ArgumentMatcher<PartitionConsumptionState> {
        private final int expectedPartition;

        public PartitionNumberMatcher(int i) {
            this.expectedPartition = i;
        }

        public boolean matches(PartitionConsumptionState partitionConsumptionState) {
            return partitionConsumptionState.getPartition() == this.expectedPartition;
        }
    }

    @BeforeClass
    public void setUp() {
        this.storageEngine = (AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class);
        this.store = (Store) Mockito.mock(Store.class);
        this.version = (Version) Mockito.mock(Version.class);
    }

    @BeforeMethod
    public void buildNewQuotaEnforcer() {
        this.statusReportAdapter = (StatusReportAdapter) Mockito.mock(StatusReportAdapter.class);
        this.partitionConsumptionStateMap = new VeniceConcurrentHashMap();
        for (int i = 1; i <= 10; i++) {
            this.partitionConsumptionStateMap.put(Integer.valueOf(i), new PartitionConsumptionState(i, 1, (OffsetRecord) Mockito.mock(OffsetRecord.class), true));
        }
        Mockito.when(this.store.getName()).thenReturn(storeName);
        Mockito.when(Long.valueOf(this.store.getStorageQuotaInByte())).thenReturn(Long.valueOf(storeQuotaInBytes));
        Mockito.when(Integer.valueOf(this.store.getPartitionCount())).thenReturn(10);
        Mockito.when(this.store.getVersion(storeVersion)).thenReturn(Optional.of(this.version));
        Mockito.when(Boolean.valueOf(this.store.isHybridStoreDiskQuotaEnabled())).thenReturn(true);
        Mockito.when(this.version.getStatus()).thenReturn(VersionStatus.STARTED);
        this.quotaEnforcer = new StorageUtilizationManager(this.storageEngine, this.store, topic, 10, this.partitionConsumptionStateMap, true, true, this.statusReportAdapter, (str, i2) -> {
        }, (str2, i3) -> {
        });
    }

    @Test
    public void testDataUpdatedWithStoreChangeListener() throws Exception {
        Mockito.when(Long.valueOf(this.store.getStorageQuotaInByte())).thenReturn(Long.valueOf(newStoreQuotaInBytes));
        Mockito.when(this.version.getStatus()).thenReturn(VersionStatus.ONLINE);
        this.quotaEnforcer.handleStoreChanged(this.store);
        Assert.assertTrue(this.quotaEnforcer.isVersionOnline());
        Assert.assertEquals(this.quotaEnforcer.getStoreQuotaInBytes(), newStoreQuotaInBytes);
        Assert.assertEquals(this.quotaEnforcer.getPartitionQuotaInBytes(), 20L);
        for (int i = 1; i <= 10; i++) {
            ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter, Mockito.times(1))).reportQuotaNotViolated(this.partitionConsumptionStateMap.get(Integer.valueOf(i)));
        }
        ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter, Mockito.times(0))).reportQuotaViolated((PartitionConsumptionState) Mockito.any());
        addUsageToAllPartitions(20);
        ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter, Mockito.times(10))).reportQuotaViolated((PartitionConsumptionState) Mockito.any());
        for (int i2 = 1; i2 <= 10; i2++) {
            Assert.assertTrue(this.quotaEnforcer.isPartitionPausedIngestion(i2));
            ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter)).reportQuotaViolated(this.partitionConsumptionStateMap.get(Integer.valueOf(i2)));
        }
        Mockito.when(Boolean.valueOf(this.store.isHybridStoreDiskQuotaEnabled())).thenReturn(false);
        this.quotaEnforcer.handleStoreChanged(this.store);
        for (int i3 = 1; i3 <= 10; i3++) {
            Assert.assertFalse(this.quotaEnforcer.isPartitionPausedIngestion(i3));
            ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter, Mockito.times(2))).reportQuotaNotViolated(this.partitionConsumptionStateMap.get(Integer.valueOf(i3)));
        }
    }

    @Test
    public void testHybridStoreBatchPushExceededQuota() throws Exception {
        addUsageToAllPartitions(10);
    }

    @Test
    public void testHybridStorePushNotExceededQuota() throws Exception {
        addUsageToAllPartitions(5);
    }

    @Test
    public void testRTJobNotExceededQuota() throws Exception {
        setUpOnlineVersion();
        addUsageToAllPartitions(5);
        for (int i = 1; i <= 10; i++) {
            Assert.assertFalse(this.quotaEnforcer.isPartitionPausedIngestion(i));
        }
    }

    @Test
    public void testRTJobExceededQuota() throws Exception {
        setUpOnlineVersion();
        addUsageToAllPartitions(10);
        ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter, Mockito.times(10))).reportQuotaViolated((PartitionConsumptionState) Mockito.any());
        for (int i = 1; i <= 10; i++) {
            Assert.assertTrue(this.quotaEnforcer.isPartitionPausedIngestion(i));
            ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter)).reportQuotaViolated(this.partitionConsumptionStateMap.get(Integer.valueOf(i)));
        }
        addUsageToAllPartitions(0);
        for (int i2 = 1; i2 <= 10; i2++) {
            Assert.assertTrue(this.quotaEnforcer.isPartitionPausedIngestion(i2));
        }
        Mockito.when(Long.valueOf(this.store.getStorageQuotaInByte())).thenReturn(Long.valueOf(newStoreQuotaInBytes));
        this.quotaEnforcer.handleStoreChanged(this.store);
        for (int i3 = 1; i3 <= 10; i3++) {
            Assert.assertFalse(this.quotaEnforcer.isPartitionPausedIngestion(i3));
        }
    }

    @Test
    public void testReportCompletionForOnlineVersion() throws Exception {
        setUpOnlineVersion();
        addUsageToAllPartitions(10);
        for (int i = 1; i <= 10; i++) {
            PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
            Mockito.when(Boolean.valueOf(partitionConsumptionState.isCompletionReported())).thenReturn(false);
            Mockito.when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.STANDBY);
            this.partitionConsumptionStateMap.put(Integer.valueOf(i), partitionConsumptionState);
        }
        ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter, Mockito.times(10))).reportCompleted((PartitionConsumptionState) Mockito.any());
        for (int i2 = 1; i2 <= 10; i2++) {
            Assert.assertTrue(this.quotaEnforcer.isPartitionPausedIngestion(i2));
            ((StatusReportAdapter) Mockito.verify(this.statusReportAdapter)).reportCompleted((PartitionConsumptionState) Mockito.argThat(new PartitionNumberMatcher(i2)));
        }
    }

    private void addUsageToAllPartitions(int i) {
        for (int i2 = 1; i2 <= 10; i2++) {
            this.quotaEnforcer.enforcePartitionQuota(i2, i);
        }
    }

    private void setUpOnlineVersion() {
        Mockito.when(this.version.getStatus()).thenReturn(VersionStatus.ONLINE);
        this.quotaEnforcer.handleStoreChanged(this.store);
    }
}
