package com.linkedin.venice.controller;

import com.linkedin.venice.helix.CachedReadOnlyStoreRepository;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pushmonitor.OfflinePushStatus;
import com.linkedin.venice.pushmonitor.PushMonitor;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/controller/ErrorPartitionResetTaskTest.class */
public class ErrorPartitionResetTaskTest {
    private static long PROCESSING_CYCLE_DELAY = 100;
    private static int ERROR_PARTITION_RESET_LIMIT = 1;
    private static int PARTITION_COUNT = 3;
    private static long VERIFY_TIMEOUT = PROCESSING_CYCLE_DELAY * 3;
    private final ExecutorService errorPartitionResetExecutorService = Executors.newSingleThreadExecutor();
    private final Instance[] instances = {new Instance("a", "a", 1), new Instance("b", "b", 2), new Instance("c", "c", 3), new Instance("d", "d", 4), new Instance("e", "e", 5)};
    private HelixAdminClient helixAdminClient;
    private CachedReadOnlyStoreRepository readOnlyStoreRepository;
    private HelixExternalViewRepository routingDataRepository;
    private PushMonitor pushMonitor;
    private MetricsRepository metricsRepository;

    @BeforeMethod
    public void setUp() {
        this.helixAdminClient = (HelixAdminClient) Mockito.mock(HelixAdminClient.class);
        this.readOnlyStoreRepository = (CachedReadOnlyStoreRepository) Mockito.mock(CachedReadOnlyStoreRepository.class);
        this.routingDataRepository = (HelixExternalViewRepository) Mockito.mock(HelixExternalViewRepository.class);
        this.pushMonitor = (PushMonitor) Mockito.mock(PushMonitor.class);
        this.metricsRepository = new MetricsRepository();
    }

    @AfterClass
    public void cleanUp() throws InterruptedException {
        TestUtils.shutdownExecutor(this.errorPartitionResetExecutorService);
    }

    @Test
    public void testErrorPartitionReset() {
        String uniqueString = Utils.getUniqueString("testCluster");
        Store storeWithCurrentVersion = getStoreWithCurrentVersion();
        String kafkaTopicName = ((Version) storeWithCurrentVersion.getVersion(storeWithCurrentVersion.getCurrentVersion()).get()).kafkaTopicName();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("ERROR", Arrays.asList(this.instances[0]));
        hashMap.put("LEADER", Arrays.asList(this.instances[1]));
        hashMap.put("STANDBY", Arrays.asList(this.instances[2]));
        hashMap2.put("LEADER", Arrays.asList(this.instances[0]));
        hashMap2.put("STANDBY", Arrays.asList(this.instances[1], this.instances[2]));
        Partition partition = new Partition(0, hashMap);
        Partition partition2 = new Partition(1, hashMap);
        Partition partition3 = new Partition(2, hashMap2);
        PartitionAssignment partitionAssignment = new PartitionAssignment(kafkaTopicName, PARTITION_COUNT);
        partitionAssignment.addPartition(partition);
        partitionAssignment.addPartition(partition2);
        partitionAssignment.addPartition(partition3);
        Partition partition4 = new Partition(1, hashMap2);
        PartitionAssignment partitionAssignment2 = new PartitionAssignment(kafkaTopicName, PARTITION_COUNT);
        partitionAssignment2.addPartition(partition);
        partitionAssignment2.addPartition(partition4);
        partitionAssignment2.addPartition(partition3);
        OfflinePushStatus offlinePushStatus = new OfflinePushStatus(kafkaTopicName, 3, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        ((CachedReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(storeWithCurrentVersion)).when(this.readOnlyStoreRepository)).getAllStores();
        Mockito.when(this.routingDataRepository.getPartitionAssignments(kafkaTopicName)).thenReturn(partitionAssignment).thenReturn(partitionAssignment2);
        Mockito.when(this.pushMonitor.getOfflinePushOrThrow(kafkaTopicName)).thenReturn(offlinePushStatus);
        Runnable errorPartitionResetTask = getErrorPartitionResetTask(uniqueString);
        this.errorPartitionResetExecutorService.submit(errorPartitionResetTask);
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.timeout(VERIFY_TIMEOUT).times(1))).resetPartition(uniqueString, this.instances[0].getNodeId(), kafkaTopicName, Arrays.asList(HelixUtils.getPartitionName(kafkaTopicName, partition.getId()), HelixUtils.getPartitionName(kafkaTopicName, partition2.getId())));
        ((CachedReadOnlyStoreRepository) Mockito.verify(this.readOnlyStoreRepository, Mockito.timeout(VERIFY_TIMEOUT).times(3))).getAllStores();
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.never())).resetPartition((String) Mockito.eq(uniqueString), (String) Mockito.eq(this.instances[1].getNodeId()), (String) Mockito.eq(kafkaTopicName), Mockito.anyList());
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.never())).resetPartition((String) Mockito.eq(uniqueString), (String) Mockito.eq(this.instances[2].getNodeId()), (String) Mockito.eq(kafkaTopicName), Mockito.anyList());
        Assert.assertEquals(Double.valueOf(this.metricsRepository.getMetric(String.format(".%s--current_version_error_partition_reset_attempt.Total", uniqueString)).value()), Double.valueOf(2.0d));
        Assert.assertEquals(Double.valueOf(this.metricsRepository.getMetric(String.format(".%s--current_version_error_partition_reset_attempt_errored.Count", uniqueString)).value()), Double.valueOf(0.0d));
        Assert.assertEquals(Double.valueOf(this.metricsRepository.getMetric(String.format(".%s--current_version_error_partition_recovered_from_reset.Total", uniqueString)).value()), Double.valueOf(1.0d));
        Assert.assertEquals(Double.valueOf(this.metricsRepository.getMetric(String.format(".%s--current_version_error_partition_unrecoverable_from_reset.Total", uniqueString)).value()), Double.valueOf(1.0d));
        errorPartitionResetTask.close();
    }

    @Test
    public void testErrorPartitionResetOnExcessErrorReplicas() {
        String uniqueString = Utils.getUniqueString("testCluster");
        Store storeWithCurrentVersion = getStoreWithCurrentVersion();
        String kafkaTopicName = ((Version) storeWithCurrentVersion.getVersion(storeWithCurrentVersion.getCurrentVersion()).get()).kafkaTopicName();
        HashMap hashMap = new HashMap();
        hashMap.put("ERROR", Arrays.asList(this.instances[0], this.instances[1]));
        hashMap.put("ONLINE", Arrays.asList(this.instances[2], this.instances[3], this.instances[4]));
        Partition partition = new Partition(0, hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("ONLINE", Arrays.asList(this.instances[0], this.instances[1], this.instances[2]));
        Partition partition2 = new Partition(1, hashMap2);
        Partition partition3 = new Partition(2, hashMap2);
        PartitionAssignment partitionAssignment = new PartitionAssignment(kafkaTopicName, PARTITION_COUNT);
        partitionAssignment.addPartition(partition);
        partitionAssignment.addPartition(partition2);
        partitionAssignment.addPartition(partition3);
        PartitionAssignment partitionAssignment2 = new PartitionAssignment(kafkaTopicName, PARTITION_COUNT);
        partitionAssignment2.addPartition(new Partition(0, hashMap2));
        partitionAssignment2.addPartition(new Partition(1, hashMap2));
        partitionAssignment2.addPartition(new Partition(2, hashMap2));
        ((CachedReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(storeWithCurrentVersion)).when(this.readOnlyStoreRepository)).getAllStores();
        Mockito.when(this.routingDataRepository.getPartitionAssignments(kafkaTopicName)).thenReturn(partitionAssignment).thenReturn(partitionAssignment2);
        Runnable errorPartitionResetTask = getErrorPartitionResetTask(uniqueString);
        this.errorPartitionResetExecutorService.submit(errorPartitionResetTask);
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.timeout(VERIFY_TIMEOUT).times(1))).resetPartition(uniqueString, this.instances[0].getNodeId(), kafkaTopicName, Arrays.asList(HelixUtils.getPartitionName(kafkaTopicName, partition.getId())));
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.timeout(VERIFY_TIMEOUT).times(1))).resetPartition(uniqueString, this.instances[1].getNodeId(), kafkaTopicName, Arrays.asList(HelixUtils.getPartitionName(kafkaTopicName, partition.getId())));
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.never())).resetPartition((String) Mockito.eq(uniqueString), (String) Mockito.eq(this.instances[2].getNodeId()), (String) Mockito.eq(kafkaTopicName), Mockito.anyList());
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.never())).resetPartition((String) Mockito.eq(uniqueString), (String) Mockito.eq(this.instances[3].getNodeId()), (String) Mockito.eq(kafkaTopicName), Mockito.anyList());
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.never())).resetPartition((String) Mockito.eq(uniqueString), (String) Mockito.eq(this.instances[4].getNodeId()), (String) Mockito.eq(kafkaTopicName), Mockito.anyList());
        errorPartitionResetTask.close();
    }

    private ErrorPartitionResetTask getErrorPartitionResetTask(String str) {
        return new ErrorPartitionResetTask(str, this.helixAdminClient, this.readOnlyStoreRepository, this.routingDataRepository, this.pushMonitor, this.metricsRepository, ERROR_PARTITION_RESET_LIMIT, PROCESSING_CYCLE_DELAY);
    }

    private Store getStoreWithCurrentVersion() {
        Store randomStore = TestUtils.getRandomStore();
        randomStore.addVersion(new VersionImpl(randomStore.getName(), 1, "", PARTITION_COUNT));
        randomStore.setCurrentVersion(1);
        return randomStore;
    }
}
