package com.linkedin.venice.controller;

import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/venice/controller/TestDelayedRebalance.class */
public class TestDelayedRebalance {
    private VeniceClusterWrapper cluster;
    int partitionSize = 1000;
    int replicaFactor = 2;
    int numberOfServer = 3;
    long testTimeOutMS = 10000;
    long delayRebalanceMS = this.testTimeOutMS * 2;
    int minActiveReplica = this.replicaFactor - 1;

    @BeforeMethod
    public void setUp() {
        this.cluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().numberOfControllers(1).numberOfServers(this.numberOfServer).numberOfRouters(1).replicationFactor(this.replicaFactor).partitionSize(this.partitionSize).rebalanceDelayMs(this.delayRebalanceMS).minActiveReplica(this.minActiveReplica).build());
    }

    @AfterMethod
    public void cleanUp() {
        this.cluster.close();
    }

    @Test(timeOut = 60000)
    public void testFailOneServerWithDelayedRebalance() throws InterruptedException {
        String createVersionAndPushData = createVersionAndPushData();
        int stopAServer = stopAServer(createVersionAndPushData);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 1);
        });
        this.cluster.restartVeniceServer(stopAServer);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 2);
        });
    }

    @Test(timeOut = 60000)
    public void testFailOneServerWithDelayedRebalanceTimeout() throws InterruptedException {
        String createVersionAndPushData = createVersionAndPushData();
        int stopAServer = stopAServer(createVersionAndPushData);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            this.cluster.refreshAllRouterMetaData();
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 1, "Right after taking down a server, the number of live instances should not have dropped to 0");
        });
        Thread.sleep(this.delayRebalanceMS / 2);
        Assert.assertEquals(this.cluster.getRandomVeniceRouter().getRoutingDataRepository().getReadyToServeInstances(createVersionAndPushData, 0).size(), 1, "With delayed rebalance, helix should not move the partition to other machine during the delayed rebalance time.");
        Thread.sleep(this.delayRebalanceMS / 2);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(this.cluster.getRandomVeniceRouter().getRoutingDataRepository().getReadyToServeInstances(createVersionAndPushData, 0).size(), 2);
        });
        Assert.assertNull(this.cluster.getRandomVeniceRouter().getRoutingDataRepository().getPartitionAssignments(createVersionAndPushData).getPartition(0).getInstanceStatusById(Utils.getHelixNodeIdentifier(Utils.getHostName(), stopAServer)));
    }

    @Test
    public void testModifyDelayedRebalanceTime() {
        String createVersionAndPushData = createVersionAndPushData();
        this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().setDelayedRebalanceTime(this.cluster.getClusterName(), this.testTimeOutMS / 2);
        Assert.assertEquals(this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().getDelayedRebalanceTime(this.cluster.getClusterName()), this.testTimeOutMS / 2);
        stopAServer(createVersionAndPushData);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 1);
        });
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 2);
        });
    }

    @Test
    public void testDisableRebalanceTemporarily() throws InterruptedException {
        String createVersionAndPushData = createVersionAndPushData();
        this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().setDelayedRebalanceTime(this.cluster.getClusterName(), 0L);
        Assert.assertEquals(this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().getDelayedRebalanceTime(this.cluster.getClusterName()), 0L);
        Thread.sleep(this.testTimeOutMS);
        int stopAServer = stopAServer(createVersionAndPushData);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 2);
            Assert.assertNull(routingDataRepository.getPartitionAssignments(createVersionAndPushData).getPartition(0).getInstanceStatusById(Utils.getHelixNodeIdentifier(Utils.getHostName(), stopAServer)));
        });
    }

    @Test
    public void testEnableDelayedRebalance() throws InterruptedException {
        String createVersionAndPushData = createVersionAndPushData();
        this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().setDelayedRebalanceTime(this.cluster.getClusterName(), 0L);
        Assert.assertEquals(this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().getDelayedRebalanceTime(this.cluster.getClusterName()), 0L);
        this.cluster.getLeaderVeniceController().getVeniceHelixAdmin().setDelayedRebalanceTime(this.cluster.getClusterName(), this.delayRebalanceMS);
        Thread.sleep(this.testTimeOutMS);
        int stopAServer = stopAServer(createVersionAndPushData);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 1);
        });
        this.cluster.restartVeniceServer(stopAServer);
        TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, () -> {
            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
            Assert.assertTrue(routingDataRepository.containsKafkaTopic(createVersionAndPushData));
            Assert.assertEquals(routingDataRepository.getReadyToServeInstances(createVersionAndPushData, 0).size(), 2);
        });
        Assert.assertEquals(this.cluster.getRandomVeniceRouter().getRoutingDataRepository().getPartitionAssignments(createVersionAndPushData).getPartition(0).getInstanceStatusById(Utils.getHelixNodeIdentifier(Utils.getHostName(), stopAServer)), ExecutionStatus.COMPLETED.name());
    }

    private int stopAServer(String str) {
        int port = ((Instance) this.cluster.getRandomVeniceRouter().getRoutingDataRepository().getReadyToServeInstances(str, 0).get(0)).getPort();
        this.cluster.stopVeniceServer(port);
        return port;
    }

    private String createVersionAndPushData() {
        String uniqueString = Utils.getUniqueString("TestDelayedRebalance");
        this.cluster.getNewStore(uniqueString);
        this.cluster.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(1 * this.partitionSize));
        VersionCreationResponse newVersion = this.cluster.getNewVersion(uniqueString);
        Assert.assertFalse(newVersion.isError());
        String kafkaTopic = newVersion.getKafkaTopic();
        VeniceWriter<String, String, byte[]> veniceWriter = this.cluster.getVeniceWriter(kafkaTopic);
        try {
            veniceWriter.broadcastStartOfPush(new HashMap());
            veniceWriter.put("test", "test", 1);
            veniceWriter.broadcastEndOfPush(new HashMap());
            if (veniceWriter != null) {
                veniceWriter.close();
            }
            TestUtils.waitForNonDeterministicAssertion(this.testTimeOutMS, TimeUnit.MILLISECONDS, true, true, () -> {
                HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
                Assert.assertTrue(routingDataRepository.containsKafkaTopic(kafkaTopic));
                Assert.assertEquals(routingDataRepository.getReadyToServeInstances(kafkaTopic, 0).size(), 2);
            });
            return kafkaTopic;
        } catch (Throwable th) {
            if (veniceWriter != null) {
                try {
                    veniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
