package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.pushmonitor.ReadOnlyPartitionStatus;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.utils.MockTestStateModel;
import com.linkedin.venice.utils.MockTestStateModelFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/helix/TestHelixExternalViewRepository.class */
public class TestHelixExternalViewRepository {
    private static final int WAIT_TIME = 1000;
    private SafeHelixManager manager;
    private SafeHelixManager controller;
    private HelixAdmin admin;
    private String clusterName = "UnitTestCLuster";
    private String resourceName = "UnitTest";
    private String zkAddress;
    private int httpPort;
    private int adminPort;
    private ZkServerWrapper zkServerWrapper;
    private HelixExternalViewRepository repository;
    private SafeHelixManager readManager;

    @BeforeMethod(alwaysRun = true)
    public void setupHelix() throws Exception {
        this.zkServerWrapper = ServiceFactory.getZkServer();
        this.zkAddress = this.zkServerWrapper.getAddress();
        this.admin = new ZKHelixAdmin(this.zkAddress);
        this.admin.addCluster(this.clusterName);
        HelixConfigScope build = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this.clusterName).build();
        HashMap hashMap = new HashMap();
        hashMap.put("allowParticipantAutoJoin", String.valueOf(true));
        this.admin.setConfig(build, hashMap);
        this.admin.addStateModelDef(this.clusterName, "MockTestStateModel", MockTestStateModel.getDefinition());
        this.admin.addResource(this.clusterName, this.resourceName, 1, "MockTestStateModel", IdealState.RebalanceMode.FULL_AUTO.toString());
        this.admin.rebalance(this.clusterName, this.resourceName, 1);
        this.httpPort = 50000 + ((int) (System.currentTimeMillis() % 10000));
        this.adminPort = 50000 + ((int) (System.currentTimeMillis() % 10000)) + 1;
        this.controller = new SafeHelixManager(HelixControllerMain.startHelixController(this.zkAddress, this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.adminPort), "STANDALONE"));
        this.manager = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.httpPort), this.zkAddress, this.httpPort, "MockTestStateModel");
        this.manager.connect();
        Thread.sleep(1000L);
        this.readManager = new SafeHelixManager(HelixManagerFactory.getZKHelixManager(this.clusterName, "reader", InstanceType.SPECTATOR, this.zkAddress));
        this.readManager.connect();
        this.repository = new HelixExternalViewRepository(this.readManager);
        this.repository.refresh();
        TestUtils.waitForNonDeterministicCompletion(5L, TimeUnit.SECONDS, () -> {
            return this.repository.containsKafkaTopic(this.resourceName);
        });
    }

    @AfterMethod(alwaysRun = true)
    public void cleanupHelix() {
        this.manager.disconnect();
        this.readManager.disconnect();
        this.controller.disconnect();
        this.admin.dropCluster(this.clusterName);
        this.admin.close();
        this.zkServerWrapper.close();
    }

    @Test
    public void testGetInstances() throws Exception {
        List readyToServeInstances = this.repository.getReadyToServeInstances(this.resourceName, 0);
        Assert.assertEquals(1, readyToServeInstances.size());
        Instance instance = (Instance) readyToServeInstances.get(0);
        Assert.assertEquals(Utils.getHostName(), instance.getHost());
        Assert.assertEquals(this.httpPort, instance.getPort());
        this.manager.disconnect();
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(0, this.repository.getReadyToServeInstances(this.resourceName, 0).size());
        });
        Assert.assertEquals(0, this.repository.getReadyToServeInstances(this.resourceName, 0).size());
        int i = this.httpPort + 10;
        SafeHelixManager participant = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), i), this.zkAddress, i, "MockTestStateModel");
        participant.connect();
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            List readyToServeInstances2 = this.repository.getReadyToServeInstances(this.resourceName, 0);
            Assert.assertEquals(readyToServeInstances2.size(), 1);
            Assert.assertEquals(((Instance) readyToServeInstances2.get(0)).getPort(), i);
        });
        participant.disconnect();
    }

    @Test
    public void testGetReplicaStates() {
        List replicaStates = this.repository.getReplicaStates(this.resourceName, 0);
        Assert.assertEquals(replicaStates.size(), 1, "Unexpected replication factor");
        ReplicaState replicaState = (ReplicaState) replicaStates.iterator().next();
        Assert.assertEquals(replicaState.getPartition(), 0, "Unexpected partition number");
        Assert.assertNotNull(replicaState.getParticipantId(), "Participant id should not be null");
        Assert.assertEquals(replicaState.getExternalViewStatus(), "ONLINE");
        Assert.assertEquals(replicaState.isReadyToServe(), replicaState.getExternalViewStatus().equals("ONLINE"));
    }

    @Test
    public void testGetNumberOfPartitions() throws Exception {
        Assert.assertEquals(1, this.repository.getNumberOfPartitions(this.resourceName));
        this.manager.disconnect();
        Thread.sleep(1000L);
        Assert.assertEquals(1, this.repository.getNumberOfPartitions(this.resourceName));
    }

    @Test(groups = {"flaky"})
    public void testGetNumberOfPartitionsWhenResourceDropped() throws Exception {
        Assert.assertTrue(this.admin.getResourcesInCluster(this.clusterName).contains(this.resourceName));
        Thread.sleep(1000L);
        this.admin.dropResource(this.clusterName, this.resourceName);
        Thread.sleep(1000L);
        Assert.assertFalse(this.admin.getResourcesInCluster(this.clusterName).contains(this.resourceName));
        try {
            this.repository.getNumberOfPartitions(this.resourceName);
            Assert.fail("IAE should be thrown because resource does not exist now.");
        } catch (VeniceException e) {
        }
    }

    @Test
    public void testGetPartitions() throws Exception {
        PartitionAssignment partitionAssignments = this.repository.getPartitionAssignments(this.resourceName);
        Assert.assertEquals(1, partitionAssignments.getAssignedNumberOfPartitions());
        Assert.assertEquals(1, partitionAssignments.getPartition(0).getWorkingInstances().size());
        Assert.assertEquals(1, partitionAssignments.getPartition(0).getWorkingInstances().size());
        Instance instance = (Instance) partitionAssignments.getPartition(0).getWorkingInstances().get(0);
        Assert.assertEquals(Utils.getHostName(), instance.getHost());
        Assert.assertEquals(this.httpPort, instance.getPort());
        Assert.assertEquals((Instance) partitionAssignments.getPartition(0).getWorkingInstances().get(0), instance);
        this.manager.disconnect();
        Thread.sleep(1000L);
        Assert.assertEquals(0, this.repository.getPartitionAssignments(this.resourceName).getAssignedNumberOfPartitions());
    }

    @Test
    public void testListeners() throws Exception {
        final boolean[] zArr = {false};
        RoutingDataRepository.RoutingDataChangedListener routingDataChangedListener = new RoutingDataRepository.RoutingDataChangedListener() { // from class: com.linkedin.venice.helix.TestHelixExternalViewRepository.1
            public void onExternalViewChange(PartitionAssignment partitionAssignment) {
                zArr[0] = true;
            }

            public void onCustomizedViewChange(PartitionAssignment partitionAssignment) {
                zArr[0] = true;
            }

            public void onPartitionStatusChange(String str, ReadOnlyPartitionStatus readOnlyPartitionStatus) {
                zArr[0] = true;
            }

            public void onRoutingDataDeleted(String str) {
                zArr[0] = true;
            }
        };
        this.repository.subscribeRoutingDataChange(this.resourceName, routingDataChangedListener);
        this.manager.disconnect();
        Thread.sleep(1000L);
        Assert.assertEquals(zArr[0], true, "Can not get notification from repository.");
        zArr[0] = false;
        this.repository.unSubscribeRoutingDataChange(this.resourceName, routingDataChangedListener);
        this.manager.connect();
        Thread.sleep(1000L);
        Assert.assertEquals(zArr[0], false, "Should not get notification after un-registering.");
    }

    @Test
    public void testControllerChanged() throws Exception {
        Instance leaderController = this.repository.getLeaderController();
        Assert.assertEquals(leaderController.getHost(), Utils.getHostName());
        Assert.assertEquals(leaderController.getPort(), this.adminPort);
        int i = this.adminPort + 1;
        SafeHelixManager safeHelixManager = new SafeHelixManager(HelixManagerFactory.getZKHelixManager(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), i), InstanceType.CONTROLLER, this.zkAddress));
        safeHelixManager.connect();
        this.controller.disconnect();
        Thread.sleep(1000L);
        Instance leaderController2 = this.repository.getLeaderController();
        Assert.assertEquals(leaderController2.getHost(), Utils.getHostName());
        Assert.assertEquals(leaderController2.getPort(), i);
        safeHelixManager.disconnect();
    }

    @Test
    public void testNodeChanged() {
        Assert.assertTrue(this.repository.getReadyToServeInstances(this.resourceName, 0).size() > 0);
        Assert.assertTrue(this.repository.getPartitionAssignments(this.resourceName).getAssignedNumberOfPartitions() > 0);
        this.manager.disconnect();
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.repository.getReadyToServeInstances(this.resourceName, 0).size(), 0);
            Assert.assertEquals(this.repository.getPartitionAssignments(this.resourceName).getAssignedNumberOfPartitions(), 0);
        });
    }

    public void testGetBootstrapInstances() throws Exception {
        this.manager.disconnect();
        MockTestStateModelFactory mockTestStateModelFactory = new MockTestStateModelFactory(new VeniceOfflinePushMonitorAccessor(this.clusterName, new ZkClient(this.zkAddress), new HelixAdapterSerializer(), 3, 1000L));
        mockTestStateModelFactory.setBlockTransition(true);
        this.manager = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.httpPort + 1), this.zkAddress, this.httpPort + 1, mockTestStateModelFactory, "MockTestStateModel");
        this.manager.connect();
        Thread.sleep(1000L);
        Assert.assertEquals(this.repository.getReadyToServeInstances(this.resourceName, 0).size(), 0, "Transition should be delayed, so there is no online instance.");
        Assert.assertEquals(this.repository.getPartitionAssignments(this.resourceName).getAssignedNumberOfPartitions(), 1);
        Assert.assertEquals(this.repository.getPartitionAssignments(this.resourceName).getPartition(0).getWorkingInstances().size(), 1, "One bootstrap instance should be found");
        mockTestStateModelFactory.makeTransitionCompleted(this.resourceName, 0);
        Thread.sleep(1000L);
        Assert.assertEquals(this.repository.getReadyToServeInstances(this.resourceName, 0).size(), 1, "One online instance should be found");
        Assert.assertEquals(this.repository.getPartitionAssignments(this.resourceName).getAssignedNumberOfPartitions(), 1);
        Assert.assertEquals(this.repository.getPartitionAssignments(this.resourceName).getPartition(0).getWorkingInstances().size(), 1, "One online instance should be found");
        Assert.assertEquals(this.repository.getPartitionAssignments(this.resourceName).getPartition(0).getWorkingInstances().size(), 1, "One online instance should be found");
    }

    @Test
    public void testPartitionMove() throws Exception {
        this.admin.addResource(this.clusterName, "testPartitionMove", 6, "MockTestStateModel", IdealState.RebalanceMode.FULL_AUTO.toString());
        this.admin.rebalance(this.clusterName, "testPartitionMove", 1);
        SafeHelixManager participant = TestUtils.getParticipant(this.clusterName, Utils.getHelixNodeIdentifier(Utils.getHostName(), this.httpPort + WAIT_TIME), this.zkAddress, this.httpPort + WAIT_TIME, "MockTestStateModel");
        participant.connect();
        Thread.sleep(3000L);
        System.out.println(this.httpPort);
        for (int i = 0; i < 6; i++) {
            System.out.println(((Instance) this.repository.getReadyToServeInstances("testPartitionMove", i).get(0)).getNodeId());
        }
        this.admin.dropResource(this.clusterName, "testPartitionMove");
        Thread.sleep(3000L);
        participant.disconnect();
    }
}
