package com.linkedin.venice.helixrebalance;

import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/helixrebalance/LeaderFollowerThreadPoolTest.class */
public class LeaderFollowerThreadPoolTest {
    private static final Logger LOGGER = LogManager.getLogger(LeaderFollowerThreadPoolTest.class);
    private VeniceClusterWrapper cluster;
    private VeniceServerWrapper server0;
    private VeniceServerWrapper server1;
    int replicaFactor = 2;
    int partitionSize = 1000;
    int partitionNum = 1;
    private Lock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    private boolean isBlockingTaskStarted = false;
    private String storeName;

    @BeforeMethod
    public void setUp() {
        this.cluster = ServiceFactory.getVeniceCluster(1, 0, 1, this.replicaFactor, this.partitionSize, false, false);
    }

    @AfterMethod
    public void cleanUp() {
        this.cluster.close();
    }

    @Test(timeOut = 120000)
    public void testLeaderFollowerDualThreadPool() throws Exception {
        commonTestProcedures(true);
        String createVersionAndPushData = createVersionAndPushData(this.storeName);
        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), createVersionAndPushData).getExecutionStatus(), ExecutionStatus.COMPLETED);
        });
    }

    @Test(timeOut = 150000)
    public void testLeaderFollowerSingleThreadPool() throws Exception {
        commonTestProcedures(false);
        try {
            createVersionAndPushData(this.storeName);
            Assert.fail("new version creation should have failed.");
        } catch (AssertionError e) {
            Assert.assertTrue(e.getMessage().contains("does not have enough replicas"));
        }
    }

    private void commonTestProcedures(boolean z) throws InterruptedException {
        setUpServers(z);
        this.storeName = Utils.getUniqueString("testLeaderFollowerThreadPools_" + (z ? "DualPool" : "SinglePool"));
        this.cluster.getNewStore(this.storeName);
        this.cluster.updateStore(this.storeName, new UpdateStoreQueryParams().setStorageQuotaInByte(this.partitionNum * this.partitionSize));
        String createVersionAndPushData = createVersionAndPushData(this.storeName);
        TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), createVersionAndPushData).getExecutionStatus(), ExecutionStatus.COMPLETED);
        });
        this.server0.getVeniceServer().getHelixParticipationService().getLeaderFollowerHelixStateTransitionThreadPool().submit(() -> {
            LOGGER.info("blocking task is running...");
            this.lock.lock();
            try {
                this.isBlockingTaskStarted = true;
                this.condition.signal();
                try {
                    Thread.sleep(120000L);
                } catch (InterruptedException e) {
                    LOGGER.warn(e.getMessage());
                }
            } finally {
                this.lock.unlock();
            }
        });
        waitBlockingTaskStarted();
    }

    private void waitBlockingTaskStarted() throws InterruptedException {
        this.lock.lock();
        while (!this.isBlockingTaskStarted) {
            try {
                this.condition.await(120L, TimeUnit.SECONDS);
            } finally {
                this.lock.unlock();
            }
        }
    }

    private String createVersionAndPushData(String str) {
        VersionCreationResponse newVersion = this.cluster.getNewVersion(str);
        String kafkaTopic = newVersion.getKafkaTopic();
        Assert.assertEquals(newVersion.getReplicas(), this.replicaFactor);
        Assert.assertEquals(newVersion.getPartitions(), this.partitionNum);
        VeniceWriter<String, String, byte[]> veniceWriter = this.cluster.getVeniceWriter(kafkaTopic);
        try {
            veniceWriter.broadcastStartOfPush(new HashMap());
            veniceWriter.put("test", "test", 1);
            veniceWriter.broadcastEndOfPush(new HashMap());
            if (veniceWriter != null) {
                veniceWriter.close();
            }
            return kafkaTopic;
        } catch (Throwable th) {
            if (veniceWriter != null) {
                try {
                    veniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void setUpServers(boolean z) {
        Properties properties = new Properties();
        properties.put("max.leader.follower.state.transition.thread.number", 1);
        if (z) {
            properties.put("max.future.version.leader.follower.state.transition.thread.number", 3);
            properties.put("leader.follower.state.transition.thread.pool.strategy", LeaderFollowerPartitionStateModelFactory.LeaderFollowerThreadPoolStrategy.DUAL_POOL_STRATEGY.name());
        } else {
            properties.put("leader.follower.state.transition.thread.pool.strategy", LeaderFollowerPartitionStateModelFactory.LeaderFollowerThreadPoolStrategy.SINGLE_POOL_STRATEGY.name());
        }
        this.server0 = this.cluster.addVeniceServer(new Properties(), properties);
        this.server1 = this.cluster.addVeniceServer(new Properties(), properties);
    }
}
