package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus;
import com.linkedin.venice.pushmonitor.ReadOnlyPartitionStatus;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.MockTestStateModel;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/helix/TestHelixCustomizedViewOfflinePushRepository.class */
public class TestHelixCustomizedViewOfflinePushRepository {
    private static final int WAIT_TIME = 1000;
    private SafeHelixManager manager0;
    private SafeHelixManager manager1;
    private SafeHelixManager controller;
    private HelixAdmin admin;
    private String clusterName = "UnitTestCLuster";
    private String resourceName = "storeName_v1";
    private String storeName = VeniceClusterWrapper.FORKED_PROCESS_STORE_NAME;
    private String zkAddress;
    private int httpPort0;
    private int httpPort1;
    private int adminPort;
    private int partitionId0;
    private int partitionId1;
    private int partitionId2;
    private ZkServerWrapper zkServerWrapper;
    private HelixHybridStoreQuotaRepository hybridStoreQuotaOnlyRepository;
    private HelixCustomizedViewOfflinePushRepository offlinePushOnlyRepository;
    private SafeHelixManager readManager;
    private HelixPartitionStatusAccessor accessor0;
    private HelixPartitionStatusAccessor accessor1;

    @BeforeMethod(alwaysRun = true)
    public void setupHelix() throws Exception {
        this.zkServerWrapper = ServiceFactory.getZkServer();
        this.zkAddress = this.zkServerWrapper.getAddress();
        this.admin = new ZKHelixAdmin(this.zkAddress);
        this.admin.addCluster(this.clusterName);
        HelixConfigScope build = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this.clusterName).build();
        HashMap hashMap = new HashMap();
        hashMap.put("allowParticipantAutoJoin", String.valueOf(true));
        this.admin.setConfig(build, hashMap);
        this.admin.addStateModelDef(this.clusterName, "MockTestStateModel", MockTestStateModel.getDefinition());
        this.admin.addResource(this.clusterName, this.resourceName, 3, "MockTestStateModel", IdealState.RebalanceMode.FULL_AUTO.toString());
        this.admin.rebalance(this.clusterName, this.resourceName, 2);
        HelixUtils.setupCustomizedStateConfig(this.admin, this.clusterName);
        this.partitionId0 = 0;
        this.partitionId1 = 1;
        this.partitionId2 = 2;
        this.httpPort0 = 50000 + ((int) (System.currentTimeMillis() % 10000));
        this.httpPort1 = 50000 + ((int) (System.currentTimeMillis() % 10000)) + 1;
        this.adminPort = 50000 + ((int) (System.currentTimeMillis() % 10000)) + 2;
        this.controller = new SafeHelixManager(HelixControllerMain.startHelixController(this.zkAddress, this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.adminPort), "STANDALONE"));
        this.manager0 = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.httpPort0), this.zkAddress, this.httpPort0, "MockTestStateModel");
        this.manager0.connect();
        Thread.sleep(1000L);
        this.accessor0 = new HelixPartitionStatusAccessor(this.manager0.getOriginalManager(), this.manager0.getInstanceName(), false);
        this.manager1 = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.httpPort1), this.zkAddress, this.httpPort1, "MockTestStateModel");
        this.manager1.connect();
        Thread.sleep(1000L);
        this.accessor1 = new HelixPartitionStatusAccessor(this.manager1.getOriginalManager(), this.manager1.getInstanceName(), true);
        this.readManager = new SafeHelixManager(HelixManagerFactory.getZKHelixManager(this.clusterName, "reader", InstanceType.SPECTATOR, this.zkAddress));
        this.readManager.connect();
        this.hybridStoreQuotaOnlyRepository = new HelixHybridStoreQuotaRepository(this.readManager);
        HelixReadWriteStoreRepository helixReadWriteStoreRepository = new HelixReadWriteStoreRepository(ZkClientFactory.newZkClient(this.zkAddress), new HelixAdapterSerializer(), this.clusterName, Optional.empty(), new ClusterLockManager(this.clusterName));
        Store createTestStore = TestUtils.createTestStore(this.storeName, "owner", System.currentTimeMillis());
        createTestStore.setPartitionCount(3);
        VersionImpl versionImpl = new VersionImpl(this.storeName, 1, "pushId");
        versionImpl.setPartitionCount(3);
        createTestStore.addVersion(versionImpl);
        helixReadWriteStoreRepository.addStore(createTestStore);
        this.offlinePushOnlyRepository = new HelixCustomizedViewOfflinePushRepository(this.readManager, helixReadWriteStoreRepository);
        this.hybridStoreQuotaOnlyRepository.refresh();
        this.offlinePushOnlyRepository.refresh();
        this.accessor0.updateReplicaStatus(this.resourceName, this.partitionId0, ExecutionStatus.COMPLETED);
        this.accessor0.updateReplicaStatus(this.resourceName, this.partitionId1, ExecutionStatus.END_OF_PUSH_RECEIVED);
        this.accessor1.updateReplicaStatus(this.resourceName, this.partitionId0, ExecutionStatus.END_OF_PUSH_RECEIVED);
        this.accessor1.updateReplicaStatus(this.resourceName, this.partitionId1, ExecutionStatus.COMPLETED);
        this.accessor0.updateHybridQuotaReplicaStatus(this.resourceName, this.partitionId0, HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED);
        this.accessor0.updateHybridQuotaReplicaStatus(this.resourceName, this.partitionId1, HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED);
        this.accessor1.updateHybridQuotaReplicaStatus(this.resourceName, this.partitionId1, HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED);
        this.accessor1.updateHybridQuotaReplicaStatus(this.resourceName, this.partitionId2, HybridStoreQuotaStatus.QUOTA_VIOLATED);
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return this.offlinePushOnlyRepository.containsKafkaTopic(this.resourceName) && this.offlinePushOnlyRepository.getReplicaStates(this.resourceName, this.partitionId0).size() == 2 && this.offlinePushOnlyRepository.getReplicaStates(this.resourceName, this.partitionId1).size() == 2;
        });
    }

    @AfterMethod(alwaysRun = true)
    public void cleanupHelix() {
        this.manager0.disconnect();
        this.manager1.disconnect();
        this.readManager.disconnect();
        this.controller.disconnect();
        this.admin.dropCluster(this.clusterName);
        this.admin.close();
        this.zkServerWrapper.close();
    }

    @Test
    public void testGetQuotaExceedStores() throws Exception {
        Assert.assertEquals(HybridStoreQuotaStatus.UNKNOWN, this.hybridStoreQuotaOnlyRepository.getHybridStoreQuotaStatus("FakeUnitTest"));
        Assert.assertEquals(1, this.hybridStoreQuotaOnlyRepository.getHybridQuotaViolatedStores().size());
        Assert.assertEquals(this.resourceName, (String) this.hybridStoreQuotaOnlyRepository.getHybridQuotaViolatedStores().get(0));
        Assert.assertEquals(HybridStoreQuotaStatus.QUOTA_VIOLATED, this.hybridStoreQuotaOnlyRepository.getHybridStoreQuotaStatus(this.resourceName));
        this.accessor1.updateHybridQuotaReplicaStatus(this.resourceName, this.partitionId2, HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED);
        Thread.sleep(1000L);
        Assert.assertEquals(0, this.hybridStoreQuotaOnlyRepository.getHybridQuotaViolatedStores().size());
        Assert.assertEquals(HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED, this.hybridStoreQuotaOnlyRepository.getHybridStoreQuotaStatus(this.resourceName));
        this.accessor0.updateHybridQuotaReplicaStatus(this.resourceName, this.partitionId1, HybridStoreQuotaStatus.QUOTA_VIOLATED);
        Thread.sleep(1000L);
        Assert.assertEquals(0, this.hybridStoreQuotaOnlyRepository.getHybridQuotaViolatedStores().size());
        Assert.assertEquals(HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED, this.hybridStoreQuotaOnlyRepository.getHybridStoreQuotaStatus(this.resourceName));
    }

    @Test
    public void testGetInstances() throws Exception {
        List readyToServeInstances = this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId0);
        Assert.assertEquals(1, readyToServeInstances.size());
        Instance instance = (Instance) readyToServeInstances.get(0);
        Assert.assertEquals(Utils.getHostName(), instance.getHost());
        Assert.assertEquals(this.httpPort0, instance.getPort());
        List readyToServeInstances2 = this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId1);
        Assert.assertEquals(1, readyToServeInstances2.size());
        Instance instance2 = (Instance) readyToServeInstances2.get(0);
        Assert.assertEquals(Utils.getHostName(), instance2.getHost());
        Assert.assertEquals(this.httpPort1, instance2.getPort());
        this.accessor1.updateReplicaStatus(this.resourceName, this.partitionId0, ExecutionStatus.COMPLETED);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            List readyToServeInstances3 = this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId0);
            Assert.assertEquals(readyToServeInstances3.size(), 2);
            Assert.assertEquals(new HashSet(Arrays.asList(Integer.valueOf(((Instance) readyToServeInstances3.get(0)).getPort()), Integer.valueOf(((Instance) readyToServeInstances3.get(1)).getPort()))), new HashSet(Arrays.asList(Integer.valueOf(this.httpPort0), Integer.valueOf(this.httpPort1))));
        });
        this.accessor0.updateReplicaStatus(this.resourceName, this.partitionId1, ExecutionStatus.COMPLETED);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            List readyToServeInstances3 = this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId1);
            Assert.assertEquals(readyToServeInstances3.size(), 2);
            Assert.assertEquals(new HashSet(Arrays.asList(Integer.valueOf(((Instance) readyToServeInstances3.get(0)).getPort()), Integer.valueOf(((Instance) readyToServeInstances3.get(1)).getPort()))), new HashSet(Arrays.asList(Integer.valueOf(this.httpPort0), Integer.valueOf(this.httpPort1))));
        });
        this.manager0.disconnect();
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(1, this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId0).size());
            Assert.assertEquals(1, this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId1).size());
        });
        int i = this.httpPort0 + 10;
        SafeHelixManager participant = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), i), this.zkAddress, i, "MockTestStateModel");
        participant.connect();
        HelixPartitionStatusAccessor helixPartitionStatusAccessor = new HelixPartitionStatusAccessor(participant.getOriginalManager(), participant.getInstanceName(), true);
        helixPartitionStatusAccessor.updateReplicaStatus(this.resourceName, this.partitionId0, ExecutionStatus.COMPLETED);
        helixPartitionStatusAccessor.updateReplicaStatus(this.resourceName, this.partitionId1, ExecutionStatus.COMPLETED);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId0).size(), 2);
            Assert.assertEquals(this.offlinePushOnlyRepository.getReadyToServeInstances(this.resourceName, this.partitionId1).size(), 2);
        });
        participant.disconnect();
    }

    @Test
    public void testGetReplicaStates() {
        List<ReplicaState> replicaStates = this.offlinePushOnlyRepository.getReplicaStates(this.resourceName, this.partitionId0);
        Assert.assertEquals(replicaStates.size(), 2, "Unexpected replication factor");
        for (ReplicaState replicaState : replicaStates) {
            Assert.assertEquals(replicaState.getPartition(), this.partitionId0, "Unexpected partition number");
            Assert.assertNotNull(replicaState.getParticipantId(), "Participant id should not be null");
            Assert.assertEquals(replicaState.isReadyToServe(), replicaState.getVenicePushStatus().equals(ExecutionStatus.COMPLETED.name()));
        }
    }

    @Test
    public void testGetNumberOfPartitions() throws Exception {
        Assert.assertEquals(3, this.offlinePushOnlyRepository.getNumberOfPartitions(this.resourceName));
        Assert.assertEquals(3, this.offlinePushOnlyRepository.getNumberOfPartitions(this.resourceName));
        this.manager0.disconnect();
        Thread.sleep(1000L);
        Assert.assertEquals(3, this.offlinePushOnlyRepository.getNumberOfPartitions(this.resourceName));
        Assert.assertEquals(3, this.offlinePushOnlyRepository.getNumberOfPartitions(this.resourceName));
    }

    @Test
    public void testGetNumberOfPartitionsWhenResourceDropped() throws Exception {
        Assert.assertTrue(this.admin.getResourcesInCluster(this.clusterName).contains(this.resourceName));
        this.offlinePushOnlyRepository.getNumberOfPartitions(this.resourceName);
        Thread.sleep(1000L);
        this.admin.dropResource(this.clusterName, this.resourceName);
        this.accessor0.deleteReplicaStatus(this.resourceName, this.partitionId0);
        this.accessor0.deleteReplicaStatus(this.resourceName, this.partitionId1);
        this.accessor1.deleteReplicaStatus(this.resourceName, this.partitionId0);
        this.accessor1.deleteReplicaStatus(this.resourceName, this.partitionId1);
        Thread.sleep(1000L);
        Assert.assertFalse(this.admin.getResourcesInCluster(this.clusterName).contains(this.resourceName));
        try {
            this.offlinePushOnlyRepository.getNumberOfPartitions(this.resourceName);
            Assert.fail("Exception should be thrown because resource does not exist now.");
        } catch (VeniceException e) {
        }
    }

    @Test
    public void testGetPartitions() throws Exception {
        PartitionAssignment partitionAssignments = this.offlinePushOnlyRepository.getPartitionAssignments(this.resourceName);
        Assert.assertEquals(2, partitionAssignments.getAssignedNumberOfPartitions());
        Assert.assertEquals(1, partitionAssignments.getPartition(this.partitionId0).getInstancesInState(ExecutionStatus.COMPLETED.name()).size());
        Assert.assertEquals(0, partitionAssignments.getPartition(this.partitionId0).getWorkingInstances().size());
        Instance instance = (Instance) partitionAssignments.getPartition(this.partitionId0).getInstancesInState(ExecutionStatus.COMPLETED.name()).get(0);
        Assert.assertEquals(Utils.getHostName(), instance.getHost());
        Assert.assertEquals(this.httpPort0, instance.getPort());
        Assert.assertEquals(0, this.offlinePushOnlyRepository.getPartitionAssignments(this.resourceName).getPartition(this.partitionId0).getWorkingInstances().size());
        this.manager0.disconnect();
        Thread.sleep(1000L);
        PartitionAssignment partitionAssignments2 = this.offlinePushOnlyRepository.getPartitionAssignments(this.resourceName);
        Assert.assertEquals(2, partitionAssignments2.getAssignedNumberOfPartitions());
        Assert.assertEquals(0, partitionAssignments2.getPartition(this.partitionId0).getInstancesInState(ExecutionStatus.COMPLETED.name()).size());
    }

    @Test
    public void testListeners() throws Exception {
        final boolean[] zArr = {false, false, false, false};
        RoutingDataRepository.RoutingDataChangedListener routingDataChangedListener = new RoutingDataRepository.RoutingDataChangedListener() { // from class: com.linkedin.venice.helix.TestHelixCustomizedViewOfflinePushRepository.1
            public void onExternalViewChange(PartitionAssignment partitionAssignment) {
                zArr[0] = true;
            }

            public void onCustomizedViewChange(PartitionAssignment partitionAssignment) {
                zArr[1] = true;
            }

            public void onPartitionStatusChange(String str, ReadOnlyPartitionStatus readOnlyPartitionStatus) {
                zArr[2] = true;
            }

            public void onRoutingDataDeleted(String str) {
                zArr[3] = true;
            }
        };
        this.offlinePushOnlyRepository.unSubscribeRoutingDataChange(this.resourceName, routingDataChangedListener);
        Thread.sleep(1000L);
        Assert.assertFalse(zArr[0], "Should not get notification after un-registering.");
        Assert.assertFalse(zArr[1], "Should not get notification after un-registering.");
        Assert.assertFalse(zArr[2], "Should not get notification after un-registering.");
        Assert.assertFalse(zArr[3], "Should not get notification after un-registering.");
        this.offlinePushOnlyRepository.subscribeRoutingDataChange(this.resourceName, routingDataChangedListener);
        this.accessor0.deleteReplicaStatus(this.resourceName, this.partitionId0);
        this.accessor0.deleteReplicaStatus(this.resourceName, this.partitionId1);
        this.accessor1.deleteReplicaStatus(this.resourceName, this.partitionId0);
        this.accessor1.deleteReplicaStatus(this.resourceName, this.partitionId1);
        Thread.sleep(1000L);
        Assert.assertFalse(zArr[0], "Should not get notification after resource is deleted.");
        Assert.assertFalse(zArr[1], "Should not get notification after resource is deleted.");
        Assert.assertFalse(zArr[2], "Should not get notification after resource is deleted.");
        Assert.assertTrue(zArr[3], "There is a resource deleted.");
    }
}
