package com.linkedin.venice.pushmonitor;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.helix.ResourceAssignment;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreCleaner;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pushmonitor/AbstractPushMonitorTest.class */
public abstract class AbstractPushMonitorTest {
    private OfflinePushAccessor mockAccessor;
    private AbstractPushMonitor monitor;
    private ReadWriteStoreRepository mockStoreRepo;
    protected RoutingDataRepository mockRoutingDataRepo;
    protected StoreCleaner mockStoreCleaner;
    private AggPushHealthStats mockPushHealthStats;
    protected ClusterLockManager clusterLockManager;
    private static final String clusterName = Utils.getUniqueString("test_cluster");
    private static final String aggregateRealTimeSourceKafkaUrl = "aggregate-real-time-source-kafka-url";
    private String storeName;
    private String topic;
    private static final int numberOfPartition = 1;
    private static final int replicationFactor = 3;
    private PushStatusStoreReader statusStoreReaderMock;
    private HelixCustomizedViewOfflinePushRepository customizedViewMock;
    private static final String incrementalPushVersion = "IncPush_42";
    private static final int partitionCountForIncPushTests = 3;
    private static final int replicationFactorForIncPushTests = 3;

    /* loaded from: input_file:com/linkedin/venice/pushmonitor/AbstractPushMonitorTest$MockStoreCleaner.class */
    protected class MockStoreCleaner implements StoreCleaner {
        private final ClusterLockManager clusterLockManager;

        public MockStoreCleaner(ClusterLockManager clusterLockManager) {
            this.clusterLockManager = clusterLockManager;
        }

        public void deleteOneStoreVersion(String str, String str2, int i) {
            AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(str2);
            try {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                }
            } catch (Throwable th) {
                if (createStoreWriteLock != null) {
                    try {
                        createStoreWriteLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        public void retireOldStoreVersions(String str, String str2, boolean z, int i) {
            AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(str2);
            try {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                }
            } catch (Throwable th) {
                if (createStoreWriteLock != null) {
                    try {
                        createStoreWriteLock.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        public void topicCleanupWhenPushComplete(String str, String str2, int i) {
        }

        public boolean containsHelixResource(String str, String str2) {
            return true;
        }

        public void deleteHelixResource(String str, String str2) {
            AutoCloseableLock createStoreWriteLock = this.clusterLockManager.createStoreWriteLock(AbstractPushMonitorTest.this.storeName);
            try {
                try {
                    Thread.sleep(1000L);
                } catch (Throwable th) {
                    if (createStoreWriteLock != null) {
                        try {
                            createStoreWriteLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        }
    }

    protected AbstractPushMonitor getPushMonitor() {
        return getPushMonitor((RealTimeTopicSwitcher) Mockito.mock(RealTimeTopicSwitcher.class));
    }

    protected abstract AbstractPushMonitor getPushMonitor(StoreCleaner storeCleaner);

    protected abstract AbstractPushMonitor getPushMonitor(RealTimeTopicSwitcher realTimeTopicSwitcher);

    @BeforeMethod
    public void setUp() {
        this.storeName = Utils.getUniqueString("test_store");
        this.topic = this.storeName + "_v1";
        this.mockAccessor = (OfflinePushAccessor) Mockito.mock(OfflinePushAccessor.class);
        this.mockStoreCleaner = (StoreCleaner) Mockito.mock(StoreCleaner.class);
        this.mockStoreRepo = (ReadWriteStoreRepository) Mockito.mock(ReadWriteStoreRepository.class);
        this.mockRoutingDataRepo = (RoutingDataRepository) Mockito.mock(RoutingDataRepository.class);
        this.mockPushHealthStats = (AggPushHealthStats) Mockito.mock(AggPushHealthStats.class);
        this.clusterLockManager = new ClusterLockManager(clusterName);
        this.monitor = getPushMonitor();
    }

    @Test
    public void testStartMonitorOfflinePush() {
        this.monitor.startMonitorOfflinePush(this.topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        OfflinePushStatus offlinePushOrThrow = this.monitor.getOfflinePushOrThrow(this.topic);
        Assert.assertEquals(offlinePushOrThrow.getCurrentStatus(), ExecutionStatus.STARTED);
        Assert.assertEquals(offlinePushOrThrow.getKafkaTopic(), this.topic);
        Assert.assertEquals(offlinePushOrThrow.getNumberOfPartition(), numberOfPartition);
        Assert.assertEquals(offlinePushOrThrow.getReplicationFactor(), 3);
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.atLeastOnce())).createOfflinePushStatusAndItsPartitionStatuses(offlinePushOrThrow);
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.atLeastOnce())).subscribePartitionStatusChange(offlinePushOrThrow, this.monitor);
        ((RoutingDataRepository) Mockito.verify(this.mockRoutingDataRepo, Mockito.atLeastOnce())).subscribeRoutingDataChange(getTopic(), this.monitor);
        ((RoutingDataRepository) Mockito.doReturn(Mockito.mock(ResourceAssignment.class)).when(this.mockRoutingDataRepo)).getResourceAssignment();
        try {
            this.monitor.startMonitorOfflinePush(this.topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
            Assert.fail("Duplicated monitoring is not allowed. ");
        } catch (VeniceException e) {
        }
    }

    @Test
    public void testStartMonitorOfflinePushWhenThereIsAnExistingErrorPush() {
        this.monitor.startMonitorOfflinePush(this.topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        OfflinePushStatus offlinePushOrThrow = this.monitor.getOfflinePushOrThrow(this.topic);
        Assert.assertEquals(offlinePushOrThrow.getCurrentStatus(), ExecutionStatus.STARTED);
        Assert.assertEquals(offlinePushOrThrow.getKafkaTopic(), this.topic);
        Assert.assertEquals(offlinePushOrThrow.getNumberOfPartition(), numberOfPartition);
        Assert.assertEquals(offlinePushOrThrow.getReplicationFactor(), 3);
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.atLeastOnce())).createOfflinePushStatusAndItsPartitionStatuses(offlinePushOrThrow);
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.atLeastOnce())).subscribePartitionStatusChange(offlinePushOrThrow, this.monitor);
        ((RoutingDataRepository) Mockito.verify(this.mockRoutingDataRepo, Mockito.atLeastOnce())).subscribeRoutingDataChange(getTopic(), this.monitor);
        this.monitor.markOfflinePushAsError(this.topic, "mocked_error_push");
        Assert.assertEquals(this.monitor.getPushStatus(this.topic), ExecutionStatus.ERROR);
        this.monitor.startMonitorOfflinePush(this.topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
    }

    @Test
    public void testStopMonitorOfflinePush() {
        this.monitor.startMonitorOfflinePush(this.topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        OfflinePushStatus offlinePushOrThrow = this.monitor.getOfflinePushOrThrow(this.topic);
        this.monitor.stopMonitorOfflinePush(this.topic, true, false);
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.atLeastOnce())).deleteOfflinePushStatusAndItsPartitionStatuses(offlinePushOrThrow.getKafkaTopic());
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.atLeastOnce())).unsubscribePartitionsStatusChange(offlinePushOrThrow, this.monitor);
        ((RoutingDataRepository) Mockito.verify(this.mockRoutingDataRepo, Mockito.atLeastOnce())).unSubscribeRoutingDataChange(getTopic(), this.monitor);
        try {
            this.monitor.getOfflinePushOrThrow(this.topic);
            Assert.fail("Push status should be deleted by stopMonitorOfflinePush method");
        } catch (VeniceException e) {
        }
    }

    @Test
    public void testStopMonitorErrorOfflinePush() {
        String storeName = getStoreName();
        for (int i = 0; i < 5; i += numberOfPartition) {
            String composeKafkaTopic = Version.composeKafkaTopic(storeName, i);
            this.monitor.startMonitorOfflinePush(composeKafkaTopic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
            this.monitor.getOfflinePushOrThrow(composeKafkaTopic).updateStatus(ExecutionStatus.ERROR);
            this.monitor.stopMonitorOfflinePush(composeKafkaTopic, true, false);
        }
        for (int i2 = 0; i2 < 5; i2 += numberOfPartition) {
            Assert.assertNotNull(this.monitor.getOfflinePushOrThrow(Version.composeKafkaTopic(storeName, i2)));
        }
        String composeKafkaTopic2 = Version.composeKafkaTopic(storeName, 6);
        this.monitor.startMonitorOfflinePush(composeKafkaTopic2, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        this.monitor.getOfflinePushOrThrow(composeKafkaTopic2).updateStatus(ExecutionStatus.ERROR);
        this.monitor.stopMonitorOfflinePush(composeKafkaTopic2, true, false);
        try {
            this.monitor.getOfflinePushOrThrow(Version.composeKafkaTopic(storeName, 0));
            Assert.fail("Oldest error push should be collected.");
        } catch (VeniceException e) {
        }
        Assert.assertNotNull(this.monitor.getOfflinePushOrThrow(composeKafkaTopic2));
    }

    @Test
    public void testLoadAllPushes() {
        ArrayList arrayList = new ArrayList(3);
        for (int i = 0; i < 3; i += numberOfPartition) {
            OfflinePushStatus offlinePushStatus = new OfflinePushStatus("testLoadAllPushes_v" + i, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
            offlinePushStatus.setCurrentStatus(ExecutionStatus.COMPLETED);
            arrayList.add(offlinePushStatus);
        }
        ((OfflinePushAccessor) Mockito.doReturn(arrayList).when(this.mockAccessor)).loadOfflinePushStatusesAndPartitionStatuses();
        Mockito.when(this.mockAccessor.getOfflinePushStatusAndItsPartitionStatuses(Mockito.anyString())).thenAnswer(invocationOnMock -> {
            String str = (String) invocationOnMock.getArgument(0);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                OfflinePushStatus offlinePushStatus2 = (OfflinePushStatus) it.next();
                if (offlinePushStatus2.getKafkaTopic().equals(str)) {
                    return offlinePushStatus2;
                }
            }
            return null;
        });
        this.monitor.loadAllPushes();
        for (int i2 = 0; i2 < 3; i2 += numberOfPartition) {
            Assert.assertEquals(this.monitor.getOfflinePushOrThrow("testLoadAllPushes_v" + i2).getCurrentStatus(), ExecutionStatus.COMPLETED);
        }
    }

    @Test
    public void testClearOldErrorVersion() {
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i += numberOfPartition) {
            OfflinePushStatus offlinePushStatus = new OfflinePushStatus("testLoadAllPushes_v" + i, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
            if (i == 0) {
                offlinePushStatus.setCurrentStatus(ExecutionStatus.COMPLETED);
            } else {
                offlinePushStatus.setCurrentStatus(ExecutionStatus.ERROR);
            }
            arrayList.add(offlinePushStatus);
        }
        ((OfflinePushAccessor) Mockito.doReturn(arrayList).when(this.mockAccessor)).loadOfflinePushStatusesAndPartitionStatuses();
        Mockito.when(this.mockAccessor.getOfflinePushStatusAndItsPartitionStatuses(Mockito.anyString())).thenAnswer(invocationOnMock -> {
            String str = (String) invocationOnMock.getArgument(0);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                OfflinePushStatus offlinePushStatus2 = (OfflinePushStatus) it.next();
                if (offlinePushStatus2.getKafkaTopic().equals(str)) {
                    return offlinePushStatus2;
                }
            }
            return null;
        });
        this.monitor.loadAllPushes();
        ((OfflinePushAccessor) Mockito.verify(this.mockAccessor, Mockito.times(10 - 5))).deleteOfflinePushStatusAndItsPartitionStatuses((String) ArgumentMatchers.any());
        Assert.assertEquals(this.monitor.getPushStatus("testLoadAllPushes_v0"), ExecutionStatus.COMPLETED);
        int i2 = numberOfPartition;
        while (i2 <= 5) {
            try {
                this.monitor.getOfflinePushOrThrow("testLoadAllPushes_v" + i2);
                Assert.fail("Old error pushes should be collected after loading.");
            } catch (VeniceException e) {
            }
            i2 += numberOfPartition;
        }
        while (i2 < 10) {
            Assert.assertEquals(this.monitor.getPushStatus("testLoadAllPushes_v" + i2), ExecutionStatus.ERROR);
            i2 += numberOfPartition;
        }
    }

    @Test
    public void testOnRoutingDataDeleted() {
        String topic = getTopic();
        prepareMockStore(topic);
        this.monitor.startMonitorOfflinePush(topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        ((RoutingDataRepository) Mockito.doReturn(true).when(this.mockRoutingDataRepo)).doesResourcesExistInIdealState(topic);
        this.monitor.onRoutingDataDeleted(topic);
        Assert.assertEquals(this.monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.STARTED);
        ((RoutingDataRepository) Mockito.doReturn(false).when(this.mockRoutingDataRepo)).doesResourcesExistInIdealState(topic);
        this.monitor.onRoutingDataDeleted(topic);
        Assert.assertEquals(this.monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.ERROR);
        ((AggPushHealthStats) Mockito.verify(this.mockPushHealthStats, Mockito.times(numberOfPartition))).recordFailedPush((String) ArgumentMatchers.eq(getStoreName()), ArgumentMatchers.anyLong());
    }

    private void prepareForIncrementalPushStatusTest(Map<Integer, Map<CharSequence, Integer>> map, Map<Integer, Integer> map2) {
        this.statusStoreReaderMock = (PushStatusStoreReader) Mockito.mock(PushStatusStoreReader.class);
        this.customizedViewMock = (HelixCustomizedViewOfflinePushRepository) Mockito.mock(HelixCustomizedViewOfflinePushRepository.class);
        Mockito.when(this.statusStoreReaderMock.getPartitionStatuses(getStoreName(), Version.parseVersionFromVersionTopicName(getTopic()), incrementalPushVersion, 3)).thenReturn(map);
        Mockito.when(this.customizedViewMock.getCompletedStatusReplicas(getTopic(), 3)).thenReturn(map2);
    }

    private Map<Integer, Integer> getCompletedStatusData() {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 3; i += numberOfPartition) {
            hashMap.put(Integer.valueOf(i), 3);
        }
        return hashMap;
    }

    private Map<Integer, Map<CharSequence, Integer>> getPushStatusData(ExecutionStatus executionStatus) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 3; i += numberOfPartition) {
            HashMap hashMap2 = new HashMap();
            for (int i2 = 0; i2 < 3; i2 += numberOfPartition) {
                hashMap2.put("instance-" + i2, Integer.valueOf(executionStatus.getValue()));
            }
            hashMap.put(Integer.valueOf(i), hashMap2);
        }
        return hashMap;
    }

    @Test(description = "Expect NOT_CREATED status when incremental push statuses are empty for all partitions")
    public void testGetIncrementalPushStatusWhenPushStatusIsEmptyForAllPartitions() {
        prepareForIncrementalPushStatusTest(Collections.emptyMap(), Collections.emptyMap());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.NOT_CREATED);
    }

    @Test(description = "Expect EOIP status when numberOfReplicasInCompletedState == replicationFactor and all these replicas have seen EOIP")
    public void testCheckIncrementalPushStatusWhenAllPartitionReplicasHaveSeenEoip() {
        prepareForIncrementalPushStatusTest(getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED), getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect EOIP status when for just one partition numberOfReplicasInCompletedState == (replicationFactor - 1) and only one replica of that partition has not seen EOIP yet")
    void testCheckIncrementalPushStatusWhenAllCompletedStateReplicasOfPartitionHaveSeenEoip() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).remove("instance-0");
        Map<Integer, Integer> completedStatusData = getCompletedStatusData();
        completedStatusData.put(0, 2);
        prepareForIncrementalPushStatusTest(pushStatusData, completedStatusData);
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect EOIP status when for one partition numberOfReplicasInCompletedState == (replicationFactor - 2) and only one replica of that partition has not seen EOIP yet")
    public void testCheckIncrementalPushStatusWhenNMinusOneReplicasOfPartitionHaveSeenEoip() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).remove("instance-0");
        Map<Integer, Integer> completedStatusData = getCompletedStatusData();
        completedStatusData.put(0, Integer.valueOf(numberOfPartition));
        prepareForIncrementalPushStatusTest(pushStatusData, completedStatusData);
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect SOIP status when numberOfReplicasInCompletedState == replicationFactor and just one replica of one partition has not seen EOIP yet")
    public void testCheckIncrementalPushStatusWhenOneCompletedStateReplicaOfPartitionHasNotSeenEoip() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).remove("instance-0");
        prepareForIncrementalPushStatusTest(pushStatusData, getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect SOIP status when for one partition numberOfReplicasInCompletedState == (replicationFactor - 2) and two replicas of that partition have not seen EOIP yet")
    public void testCheckIncrementalPushStatusWhenNMinusTwoReplicasOfPartitionHaveSeenEoip() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).put("instance-0", Integer.valueOf(ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED.getValue()));
        pushStatusData.get(0).put("instance-1", Integer.valueOf(ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED.getValue()));
        Map<Integer, Integer> completedStatusData = getCompletedStatusData();
        completedStatusData.put(0, Integer.valueOf(numberOfPartition));
        prepareForIncrementalPushStatusTest(pushStatusData, completedStatusData);
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect SOIP status when for one partition numberOfReplicasInCompletedState == replicationFactor and just one replica of only one partition has seen SOIP")
    public void testCheckIncrementalPushStatusOnlyOneReplicasHasSeenSOIP() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).remove("instance-0");
        pushStatusData.get(0).remove("instance-1");
        for (int i = numberOfPartition; i < 3; i += numberOfPartition) {
            pushStatusData.remove(Integer.valueOf(i));
        }
        prepareForIncrementalPushStatusTest(pushStatusData, getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect SOIP status when for one partition numberOfReplicasInCompletedState == replicationFactor and just one replica of only one partition has seen EOIP")
    public void testCheckIncrementalPushStatusOnlyOneReplicasHasSeenEOIP() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).remove("instance-0");
        pushStatusData.get(0).remove("instance-1");
        for (int i = numberOfPartition; i < 3; i += numberOfPartition) {
            pushStatusData.remove(Integer.valueOf(i));
        }
        prepareForIncrementalPushStatusTest(pushStatusData, getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect EOIP status when numberOfReplicasInCompletedState == (replicationFactor) and (replicationFactor + 1) replicas have seen EOIP")
    public void testCheckIncrementalPushStatusWhenNumberOfReplicasWithEoipAreMoreThanReplicationFactor() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).put("instance-3", Integer.valueOf(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.getValue()));
        prepareForIncrementalPushStatusTest(pushStatusData, getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
    }

    @Test(description = "Expect ERROR status when any one replica belonging to any partition has non-incremental push status")
    public void testCheckIncrementalPushStatusWhenAnyOfTheReplicaHasNonIncPushStatus() {
        Map<Integer, Map<CharSequence, Integer>> pushStatusData = getPushStatusData(ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED);
        pushStatusData.get(0).put("instance-0", Integer.valueOf(ExecutionStatus.UNKNOWN.getValue()));
        prepareForIncrementalPushStatusTest(pushStatusData, getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.ERROR);
    }

    @Test(description = "Expect NOT_CREATED status when push status map is null")
    public void testCheckIncrementalPushStatusWhenPushStatusMapisNull() {
        prepareForIncrementalPushStatusTest(null, getCompletedStatusData());
        Assert.assertEquals((ExecutionStatus) this.monitor.getIncrementalPushStatusFromPushStatusStore(getTopic(), incrementalPushVersion, this.customizedViewMock, this.statusStoreReaderMock, 3, 3).getFirst(), ExecutionStatus.NOT_CREATED);
    }

    @Test
    public void testGetOngoingIncrementalPushVersions() {
        HashMap<CharSequence, Integer> hashMap = new HashMap<CharSequence, Integer>() { // from class: com.linkedin.venice.pushmonitor.AbstractPushMonitorTest.1
            {
                put("incPush1", 7);
                put("incPush2", 7);
                put("incPush3", 7);
                put("incPush4", 7);
            }
        };
        this.statusStoreReaderMock = (PushStatusStoreReader) Mockito.mock(PushStatusStoreReader.class);
        Mockito.when(this.statusStoreReaderMock.getSupposedlyOngoingIncrementalPushVersions(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt())).thenReturn(hashMap);
        Set ongoingIncrementalPushVersions = this.monitor.getOngoingIncrementalPushVersions(getTopic(), this.statusStoreReaderMock);
        Iterator<CharSequence> it = hashMap.keySet().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(ongoingIncrementalPushVersions.contains(it.next().toString()));
        }
        ((PushStatusStoreReader) Mockito.verify(this.statusStoreReaderMock)).getSupposedlyOngoingIncrementalPushVersions(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
    }

    @Test
    public void testOnPartitionStatusChangeForHybridStore() {
        String topic = getTopic();
        Store prepareMockStore = prepareMockStore(topic);
        prepareMockStore.setHybridStoreConfig(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP));
        RealTimeTopicSwitcher realTimeTopicSwitcher = (RealTimeTopicSwitcher) Mockito.mock(RealTimeTopicSwitcher.class);
        this.monitor.setRealTimeTopicSwitcher(realTimeTopicSwitcher);
        this.monitor.startMonitorOfflinePush(topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i += numberOfPartition) {
            arrayList.add(new ReplicaStatus("test" + i));
        }
        ReadOnlyPartitionStatus readOnlyPartitionStatus = new ReadOnlyPartitionStatus(0, arrayList);
        ((RoutingDataRepository) Mockito.doReturn(true).when(this.mockRoutingDataRepo)).containsKafkaTopic(topic);
        ((RoutingDataRepository) Mockito.doReturn(new PartitionAssignment(topic, numberOfPartition)).when(this.mockRoutingDataRepo)).getPartitionAssignments(topic);
        this.monitor.onPartitionStatusChange(topic, readOnlyPartitionStatus);
        ((RealTimeTopicSwitcher) Mockito.verify(realTimeTopicSwitcher, Mockito.never())).switchToRealTimeTopic((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (Store) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), ArgumentMatchers.anyList());
        Assert.assertEquals(this.monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.STARTED, "Hybrid push is not ready to send SOBR.");
        ((ReplicaStatus) arrayList.get(0)).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED);
        this.monitor.onPartitionStatusChange(topic, readOnlyPartitionStatus);
        ((RealTimeTopicSwitcher) Mockito.verify(realTimeTopicSwitcher, Mockito.times(numberOfPartition))).switchToRealTimeTopic((String) ArgumentMatchers.eq(Version.composeRealTimeTopic(prepareMockStore.getName())), (String) ArgumentMatchers.eq(topic), (Store) ArgumentMatchers.eq(prepareMockStore), (String) ArgumentMatchers.eq(aggregateRealTimeSourceKafkaUrl), ArgumentMatchers.anyList());
        Assert.assertEquals(this.monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.END_OF_PUSH_RECEIVED, "At least one replica already received end_of_push, so we send SOBR and update push status to END_OF_PUSH_RECEIVED");
        ((ReplicaStatus) arrayList.get(numberOfPartition)).updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED);
        RealTimeTopicSwitcher realTimeTopicSwitcher2 = (RealTimeTopicSwitcher) Mockito.mock(RealTimeTopicSwitcher.class);
        this.monitor.setRealTimeTopicSwitcher(realTimeTopicSwitcher2);
        this.monitor.onPartitionStatusChange(topic, readOnlyPartitionStatus);
        ((RealTimeTopicSwitcher) Mockito.verify(realTimeTopicSwitcher2, Mockito.never())).switchToRealTimeTopic((String) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (Store) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), ArgumentMatchers.anyList());
    }

    @Test
    public void testOnPartitionStatusChangeForHybridStoreParallel() throws InterruptedException {
        String topic = getTopic();
        Store prepareMockStore = prepareMockStore(topic);
        prepareMockStore.setHybridStoreConfig(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP));
        RealTimeTopicSwitcher realTimeTopicSwitcher = (RealTimeTopicSwitcher) Mockito.mock(RealTimeTopicSwitcher.class);
        this.monitor.setRealTimeTopicSwitcher(realTimeTopicSwitcher);
        this.monitor.startMonitorOfflinePush(topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        ((RoutingDataRepository) Mockito.doReturn(true).when(this.mockRoutingDataRepo)).containsKafkaTopic(topic);
        ((RoutingDataRepository) Mockito.doReturn(new PartitionAssignment(topic, numberOfPartition)).when(this.mockRoutingDataRepo)).getPartitionAssignments(topic);
        Thread[] threadArr = new Thread[8];
        for (int i = 0; i < 8; i += numberOfPartition) {
            Thread thread = new Thread(() -> {
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < 3; i2 += numberOfPartition) {
                    ReplicaStatus replicaStatus = new ReplicaStatus("test" + i2);
                    replicaStatus.updateStatus(ExecutionStatus.END_OF_PUSH_RECEIVED);
                    arrayList.add(replicaStatus);
                }
                this.monitor.onPartitionStatusChange(topic, new ReadOnlyPartitionStatus(0, arrayList));
            });
            threadArr[i] = thread;
            thread.start();
        }
        for (int i2 = 0; i2 < 8; i2 += numberOfPartition) {
            threadArr[i2].join();
        }
        ((RealTimeTopicSwitcher) Mockito.verify(realTimeTopicSwitcher, Mockito.only())).switchToRealTimeTopic((String) ArgumentMatchers.eq(Version.composeRealTimeTopic(prepareMockStore.getName())), (String) ArgumentMatchers.eq(topic), (Store) ArgumentMatchers.eq(prepareMockStore), (String) ArgumentMatchers.eq(aggregateRealTimeSourceKafkaUrl), ArgumentMatchers.anyList());
        Assert.assertEquals(this.monitor.getOfflinePushOrThrow(topic).getCurrentStatus(), ExecutionStatus.END_OF_PUSH_RECEIVED, "At least one replica already received end_of_push, so we send SOBR and update push status to END_OF_PUSH_RECEIVED");
    }

    @Test(timeOut = 30000)
    public void testOnExternalViewChangeDeadlock() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            AbstractPushMonitor pushMonitor = getPushMonitor(new MockStoreCleaner(this.clusterLockManager));
            prepareMockStore("test-lock_v1");
            HashMap hashMap = new HashMap();
            hashMap.put("ONLINE", Collections.singletonList(new Instance("test_instance", "a", numberOfPartition)));
            PartitionAssignment partitionAssignment = new PartitionAssignment("test-lock_v1", numberOfPartition);
            partitionAssignment.addPartition(new Partition(0, hashMap));
            ReplicaStatus replicaStatus = new ReplicaStatus("test_instance");
            replicaStatus.updateStatus(ExecutionStatus.COMPLETED);
            ReadOnlyPartitionStatus readOnlyPartitionStatus = new ReadOnlyPartitionStatus(0, Collections.singletonList(replicaStatus));
            pushMonitor.startMonitorOfflinePush("test-lock_v1", numberOfPartition, numberOfPartition, OfflinePushStrategy.WAIT_ALL_REPLICAS);
            pushMonitor.onPartitionStatusChange("test-lock_v1", readOnlyPartitionStatus);
            newSingleThreadExecutor.submit(() -> {
                System.out.println("T1 will acquire cluster level write lock");
                AutoCloseableLock createClusterWriteLock = this.clusterLockManager.createClusterWriteLock();
                try {
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    pushMonitor.stopAllMonitoring();
                    if (createClusterWriteLock != null) {
                        createClusterWriteLock.close();
                    }
                    System.out.println("T1 released cluster level write lock");
                } catch (Throwable th) {
                    if (createClusterWriteLock != null) {
                        try {
                            createClusterWriteLock.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            });
            Thread.sleep(1000L);
            System.out.println("T2 will acquire cluster level read lock and store level write lock");
            pushMonitor.onExternalViewChange(partitionAssignment);
            System.out.println("T2 released cluster level read lock and store level write lock");
            TestUtils.shutdownExecutor(newSingleThreadExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(newSingleThreadExecutor);
            throw th;
        }
    }

    @Test(timeOut = 30000)
    public void testOnPartitionStatusChangeDeadLock() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            AbstractPushMonitor pushMonitor = getPushMonitor(new MockStoreCleaner(this.clusterLockManager));
            prepareMockStore("test-lock_v1");
            HashMap hashMap = new HashMap();
            hashMap.put("ONLINE", Collections.singletonList(new Instance("test_instance", "a", numberOfPartition)));
            PartitionAssignment partitionAssignment = new PartitionAssignment("test-lock_v1", numberOfPartition);
            partitionAssignment.addPartition(new Partition(0, hashMap));
            ((RoutingDataRepository) Mockito.doReturn(true).when(this.mockRoutingDataRepo)).containsKafkaTopic("test-lock_v1");
            ((RoutingDataRepository) Mockito.doReturn(partitionAssignment).when(this.mockRoutingDataRepo)).getPartitionAssignments("test-lock_v1");
            ReplicaStatus replicaStatus = new ReplicaStatus("test_instance");
            replicaStatus.updateStatus(ExecutionStatus.COMPLETED);
            ReadOnlyPartitionStatus readOnlyPartitionStatus = new ReadOnlyPartitionStatus(0, Collections.singletonList(replicaStatus));
            pushMonitor.startMonitorOfflinePush("test-lock_v1", numberOfPartition, numberOfPartition, OfflinePushStrategy.WAIT_ALL_REPLICAS);
            newSingleThreadExecutor.submit(() -> {
                System.out.println("T1 will acquire cluster level read lock and store level write lock");
                pushMonitor.onPartitionStatusChange("test-lock_v1", readOnlyPartitionStatus);
                System.out.println("T1 released cluster level read lock and store level write lock");
            });
            Thread.sleep(1000L);
            System.out.println("T2 will acquire cluster level write lock");
            AutoCloseableLock createClusterWriteLock = this.clusterLockManager.createClusterWriteLock();
            try {
                pushMonitor.stopAllMonitoring();
                if (createClusterWriteLock != null) {
                    createClusterWriteLock.close();
                }
                System.out.println("T2 released cluster level write lock");
                TestUtils.shutdownExecutor(newSingleThreadExecutor);
            } finally {
            }
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(newSingleThreadExecutor);
            throw th;
        }
    }

    @Test
    public void testGetUncompletedPartitions() {
        this.monitor.startMonitorOfflinePush(this.topic, numberOfPartition, 3, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        Assert.assertEquals(this.monitor.getUncompletedPartitions(this.topic).size(), numberOfPartition);
        this.monitor.updatePushStatus(this.monitor.getOfflinePushOrThrow(this.topic), ExecutionStatus.COMPLETED, Optional.empty());
        Assert.assertTrue(this.monitor.getUncompletedPartitions(this.topic).isEmpty());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Store prepareMockStore(String str) {
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(str);
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(str);
        Store createTestStore = TestUtils.createTestStore(parseStoreFromKafkaTopicName, "test", System.currentTimeMillis());
        VersionImpl versionImpl = new VersionImpl(parseStoreFromKafkaTopicName, parseVersionFromKafkaTopicName);
        versionImpl.setStatus(VersionStatus.STARTED);
        createTestStore.addVersion(versionImpl);
        ((ReadWriteStoreRepository) Mockito.doReturn(createTestStore).when(this.mockStoreRepo)).getStore(parseStoreFromKafkaTopicName);
        return createTestStore;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OfflinePushAccessor getMockAccessor() {
        return this.mockAccessor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPushMonitor getMonitor() {
        return this.monitor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReadWriteStoreRepository getMockStoreRepo() {
        return this.mockStoreRepo;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RoutingDataRepository getMockRoutingDataRepo() {
        return this.mockRoutingDataRepo;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StoreCleaner getMockStoreCleaner() {
        return this.mockStoreCleaner;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AggPushHealthStats getMockPushHealthStats() {
        return this.mockPushHealthStats;
    }

    protected String getStoreName() {
        return this.storeName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getClusterName() {
        return clusterName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getTopic() {
        return this.topic;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getNumberOfPartition() {
        return numberOfPartition;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getReplicationFactor() {
        return 3;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClusterLockManager getClusterLockManager() {
        return this.clusterLockManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getAggregateRealTimeSourceKafkaUrl() {
        return aggregateRealTimeSourceKafkaUrl;
    }
}
