package com.linkedin.venice.pushmonitor;

import com.linkedin.venice.controller.HelixAdminClient;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.CachedReadOnlyStoreRepository;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreCleaner;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pushmonitor.AbstractPushMonitorTest;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Optional;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.class */
public class PartitionStatusBasedPushMonitorTest extends AbstractPushMonitorTest {
    HelixAdminClient helixAdminClient = (HelixAdminClient) Mockito.mock(HelixAdminClient.class);

    @Override // com.linkedin.venice.pushmonitor.AbstractPushMonitorTest
    protected AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner) {
        return new PartitionStatusBasedPushMonitor(getClusterName(), getMockAccessor(), storeCleaner, getMockStoreRepo(), getMockRoutingDataRepo(), getMockPushHealthStats(), (RealTimeTopicSwitcher) Mockito.mock(RealTimeTopicSwitcher.class), getClusterLockManager(), getAggregateRealTimeSourceKafkaUrl(), Collections.emptyList(), this.helixAdminClient, true, 120000L);
    }

    @Override // com.linkedin.venice.pushmonitor.AbstractPushMonitorTest
    protected AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher realTimeTopicSwitcher) {
        return new PartitionStatusBasedPushMonitor(getClusterName(), getMockAccessor(), getMockStoreCleaner(), getMockStoreRepo(), getMockRoutingDataRepo(), getMockPushHealthStats(), realTimeTopicSwitcher, getClusterLockManager(), getAggregateRealTimeSourceKafkaUrl(), Collections.emptyList(), (HelixAdminClient) Mockito.mock(HelixAdminClient.class), true, 120000L);
    }

    @Test
    public void testLoadRunningPushWhichIsNotUpdateToDate() {
        String topic = getTopic();
        Store prepareMockStore = prepareMockStore(topic);
        ArrayList arrayList = new ArrayList();
        OfflinePushStatus offlinePushStatus = new OfflinePushStatus(topic, getNumberOfPartition(), getReplicationFactor(), OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        arrayList.add(offlinePushStatus);
        ((OfflinePushAccessor) Mockito.doReturn(arrayList).when(getMockAccessor())).loadOfflinePushStatusesAndPartitionStatuses();
        PartitionAssignment partitionAssignment = new PartitionAssignment(topic, getNumberOfPartition());
        ((RoutingDataRepository) Mockito.doReturn(true).when(getMockRoutingDataRepo())).containsKafkaTopic((String) Mockito.eq(topic));
        ((RoutingDataRepository) Mockito.doReturn(partitionAssignment).when(getMockRoutingDataRepo())).getPartitionAssignments(topic);
        PushStatusDecider pushStatusDecider = (PushStatusDecider) Mockito.mock(PushStatusDecider.class);
        ((PushStatusDecider) Mockito.doReturn(new Pair(ExecutionStatus.COMPLETED, Optional.empty())).when(pushStatusDecider)).checkPushStatusAndDetailsByPartitionsStatus(offlinePushStatus, partitionAssignment, (DisableReplicaCallback) null);
        PushStatusDecider.updateDecider(OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, pushStatusDecider);
        Mockito.when(getMockAccessor().getOfflinePushStatusAndItsPartitionStatuses(Mockito.anyString())).thenAnswer(invocationOnMock -> {
            String str = (String) invocationOnMock.getArgument(0);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                OfflinePushStatus offlinePushStatus2 = (OfflinePushStatus) it.next();
                if (offlinePushStatus2.getKafkaTopic().equals(str)) {
                    return offlinePushStatus2;
                }
            }
            return null;
        });
        getMonitor().loadAllPushes();
        ((ReadWriteStoreRepository) Mockito.verify(getMockStoreRepo(), Mockito.atLeastOnce())).updateStore(prepareMockStore);
        ((StoreCleaner) Mockito.verify(getMockStoreCleaner(), Mockito.atLeastOnce())).retireOldStoreVersions(Mockito.anyString(), Mockito.anyString(), Mockito.eq(false), Mockito.anyInt());
        Assert.assertEquals(getMonitor().getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.COMPLETED);
        Assert.assertEquals(prepareMockStore.getCurrentVersion(), 1);
        PushStatusDecider.updateDecider(OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, new WaitNMinusOnePushStatusDecider());
        Mockito.reset(new OfflinePushAccessor[]{getMockAccessor()});
    }

    @Test
    public void testLoadRunningPushWhichIsNotUpdateToDateAndDeletionError() {
        String topic = getTopic();
        Store prepareMockStore = prepareMockStore(topic);
        ArrayList arrayList = new ArrayList();
        OfflinePushStatus offlinePushStatus = new OfflinePushStatus(topic, getNumberOfPartition(), getReplicationFactor(), OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        arrayList.add(offlinePushStatus);
        ((OfflinePushAccessor) Mockito.doReturn(arrayList).when(getMockAccessor())).loadOfflinePushStatusesAndPartitionStatuses();
        PartitionAssignment partitionAssignment = new PartitionAssignment(topic, getNumberOfPartition());
        ((RoutingDataRepository) Mockito.doReturn(true).when(getMockRoutingDataRepo())).containsKafkaTopic((String) Mockito.eq(topic));
        ((RoutingDataRepository) Mockito.doReturn(partitionAssignment).when(getMockRoutingDataRepo())).getPartitionAssignments(topic);
        PushStatusDecider pushStatusDecider = (PushStatusDecider) Mockito.mock(PushStatusDecider.class);
        ((PushStatusDecider) Mockito.doReturn(new Pair(ExecutionStatus.ERROR, Optional.empty())).when(pushStatusDecider)).checkPushStatusAndDetailsByPartitionsStatus(offlinePushStatus, partitionAssignment, (DisableReplicaCallback) null);
        PushStatusDecider.updateDecider(OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, pushStatusDecider);
        ((StoreCleaner) Mockito.doThrow(new Throwable[]{new VeniceException("Could not delete.")}).when(getMockStoreCleaner())).deleteOneStoreVersion(Mockito.anyString(), Mockito.anyString(), Mockito.anyInt());
        Mockito.when(getMockAccessor().getOfflinePushStatusAndItsPartitionStatuses(Mockito.anyString())).thenAnswer(invocationOnMock -> {
            String str = (String) invocationOnMock.getArgument(0);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                OfflinePushStatus offlinePushStatus2 = (OfflinePushStatus) it.next();
                if (offlinePushStatus2.getKafkaTopic().equals(str)) {
                    return offlinePushStatus2;
                }
            }
            return null;
        });
        getMonitor().loadAllPushes();
        ((ReadWriteStoreRepository) Mockito.verify(getMockStoreRepo(), Mockito.atLeastOnce())).updateStore(prepareMockStore);
        ((StoreCleaner) Mockito.verify(getMockStoreCleaner(), Mockito.atLeastOnce())).deleteOneStoreVersion(Mockito.anyString(), Mockito.anyString(), Mockito.anyInt());
        Assert.assertEquals(getMonitor().getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.ERROR);
        PushStatusDecider.updateDecider(OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, new WaitNMinusOnePushStatusDecider());
        Mockito.reset(new OfflinePushAccessor[]{getMockAccessor()});
    }

    @Test(timeOut = 30000)
    public void testOnExternalViewChangeDisablePartition() {
        Instance[] instanceArr = {new Instance("a", "a", 1), new Instance("disabled_host", "disabledHostName", 2), new Instance("b", "disabled_host", 3), new Instance("d", "d", 4), new Instance("e", "e", 5)};
        Store storeWithCurrentVersion = getStoreWithCurrentVersion();
        String kafkaTopicName = ((Version) storeWithCurrentVersion.getVersion(storeWithCurrentVersion.getCurrentVersion()).get()).kafkaTopicName();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("ERROR", Collections.singletonList(instanceArr[0]));
        hashMap.put("LEADER", Collections.singletonList(instanceArr[1]));
        hashMap.put("OFFLINE", Collections.singletonList(instanceArr[2]));
        hashMap2.put("LEADER", Collections.singletonList(instanceArr[0]));
        hashMap2.put("STANDBY", Arrays.asList(instanceArr[1], instanceArr[2]));
        Partition partition = new Partition(0, hashMap);
        Partition partition2 = new Partition(1, hashMap);
        Partition partition3 = new Partition(2, hashMap2);
        PartitionAssignment partitionAssignment = new PartitionAssignment(kafkaTopicName, 3);
        partitionAssignment.addPartition(partition);
        partitionAssignment.addPartition(partition2);
        partitionAssignment.addPartition(partition3);
        OfflinePushStatus offlinePushStatus = new OfflinePushStatus(kafkaTopicName, 3, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        PartitionStatus partitionStatus = new PartitionStatus(0);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(new ReplicaStatus("a"));
        arrayList.add(new ReplicaStatus("c"));
        arrayList.add(new ReplicaStatus("disabled_host"));
        ((ReplicaStatus) arrayList.get(2)).updateStatus(ExecutionStatus.ERROR);
        partitionStatus.setReplicaStatuses(arrayList);
        offlinePushStatus.setPartitionStatus(partitionStatus);
        PartitionStatus partitionStatus2 = new PartitionStatus(1);
        ArrayList arrayList2 = new ArrayList(3);
        arrayList2.add(new ReplicaStatus("a"));
        arrayList2.add(new ReplicaStatus("c"));
        arrayList2.add(new ReplicaStatus("disabled_host"));
        ((ReplicaStatus) arrayList2.get(2)).updateStatus(ExecutionStatus.ERROR);
        partitionStatus2.setReplicaStatuses(arrayList2);
        offlinePushStatus.setPartitionStatus(partitionStatus2);
        ((CachedReadOnlyStoreRepository) Mockito.doReturn(Arrays.asList(storeWithCurrentVersion)).when((CachedReadOnlyStoreRepository) Mockito.mock(CachedReadOnlyStoreRepository.class))).getAllStores();
        AbstractPushMonitor pushMonitor = getPushMonitor(new AbstractPushMonitorTest.MockStoreCleaner(this.clusterLockManager));
        HashMap hashMap3 = new HashMap();
        String composeKafkaTopic = Version.composeKafkaTopic(storeWithCurrentVersion.getName(), 1);
        hashMap3.put(composeKafkaTopic, Arrays.asList(HelixUtils.getPartitionName(composeKafkaTopic, 0)));
        ((HelixAdminClient) Mockito.doReturn(hashMap3).when(this.helixAdminClient)).getDisabledPartitionsMap(Mockito.anyString(), Mockito.anyString());
        ((RoutingDataRepository) Mockito.doReturn(true).when(this.mockRoutingDataRepo)).containsKafkaTopic(Mockito.anyString());
        ((RoutingDataRepository) Mockito.doReturn(partitionAssignment).when(this.mockRoutingDataRepo)).getPartitionAssignments(Mockito.anyString());
        pushMonitor.startMonitorOfflinePush(kafkaTopicName, 3, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        pushMonitor.updatePushStatus(offlinePushStatus, ExecutionStatus.STARTED, Optional.empty());
        pushMonitor.onExternalViewChange(partitionAssignment);
        Assert.assertEquals(PushStatusDecider.getDecider(offlinePushStatus.getStrategy()).checkPushStatusAndDetailsByPartitionsStatus(offlinePushStatus, partitionAssignment, (DisableReplicaCallback) null).getFirst(), ExecutionStatus.STARTED);
        ((HelixAdminClient) Mockito.verify(this.helixAdminClient, Mockito.times(1))).getDisabledPartitionsMap((String) Mockito.eq(getClusterName()), (String) Mockito.eq("disabled_host"));
    }

    private Store getStoreWithCurrentVersion() {
        Store randomStore = TestUtils.getRandomStore();
        randomStore.addVersion(new VersionImpl(randomStore.getName(), 1, "", 3));
        randomStore.setCurrentVersion(1);
        return randomStore;
    }
}
