package com.linkedin.venice.controller;

import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.integration.utils.HelixAsAServiceWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.LiveInstance;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/controller/TestHAASController.class */
public class TestHAASController {
    private Properties enableControllerClusterHAASProperties;
    private Properties enableControllerAndStorageClusterHAASProperties;

    /* loaded from: input_file:com/linkedin/venice/controller/TestHAASController$InitTask.class */
    private static class InitTask implements Callable<Void> {
        private final HelixAdminClient client;
        private final HashMap<String, String> helixClusterProperties = new HashMap<>();

        public InitTask(HelixAdminClient helixAdminClient) {
            this.client = helixAdminClient;
            this.helixClusterProperties.put("allowParticipantAutoJoin", String.valueOf(true));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            this.client.createVeniceControllerCluster(false);
            this.client.addClusterToGrandCluster("venice-controllers");
            for (int i = 0; i < 10; i++) {
                String str = "cluster-" + String.valueOf(i);
                this.client.createVeniceStorageCluster(str, new HashMap(), false);
                this.client.addClusterToGrandCluster(str);
                this.client.addVeniceStorageClusterToControllerCluster(str);
            }
            return null;
        }
    }

    @BeforeClass
    public void setUp() {
        this.enableControllerClusterHAASProperties = new Properties();
        this.enableControllerClusterHAASProperties.put("controller.cluster.leader.haas.enabled", String.valueOf(true));
        this.enableControllerClusterHAASProperties.put("controller.haas.super.cluster.name", HelixAsAServiceWrapper.HELIX_SUPER_CLUSTER_NAME);
        this.enableControllerAndStorageClusterHAASProperties = (Properties) this.enableControllerClusterHAASProperties.clone();
        this.enableControllerAndStorageClusterHAASProperties.put("venice.cluster.leader.haas.enabled", String.valueOf(true));
    }

    @Test(timeOut = 60000)
    public void testStartHAASHelixControllerAsControllerClusterLeader() {
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(0, 0, 0, 1);
        try {
            HelixAsAServiceWrapper startAndWaitForHAASToBeAvailable = startAndWaitForHAASToBeAvailable(veniceCluster.getZk().getAddress());
            try {
                VeniceControllerWrapper addVeniceController = veniceCluster.addVeniceController(this.enableControllerClusterHAASProperties);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertTrue(addVeniceController.isLeaderControllerOfControllerCluster(), "The only Venice controller should be appointed the colo leader");
                });
                Assert.assertTrue(addVeniceController.isLeaderController(veniceCluster.getClusterName()), "The only Venice controller should be the leader of Venice cluster " + veniceCluster.getClusterName());
                veniceCluster.addVeniceServer(new Properties(), new Properties());
                NewStoreResponse assertCommand = TestUtils.assertCommand(veniceCluster.getNewStore(Utils.getUniqueString("venice-store")));
                veniceCluster.useControllerClient(controllerClient -> {
                    TestUtils.assertCommand(controllerClient.sendEmptyPushAndWait(assertCommand.getName(), Utils.getUniqueString(), 100L, 30000L));
                });
                String composeKafkaTopic = Version.composeKafkaTopic(assertCommand.getName(), 1);
                veniceCluster.useControllerClient(controllerClient2 -> {
                    TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(TestUtils.assertCommand(controllerClient2.queryJobStatus(composeKafkaTopic)).getStatus(), ExecutionStatus.COMPLETED.toString());
                    });
                });
                if (startAndWaitForHAASToBeAvailable != null) {
                    startAndWaitForHAASToBeAvailable.close();
                }
                if (veniceCluster != null) {
                    veniceCluster.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testTransitionToHAASControllerAsControllerClusterLeader() {
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(3, 1, 0, 1);
        try {
            HelixAsAServiceWrapper startAndWaitForHAASToBeAvailable = startAndWaitForHAASToBeAvailable(veniceCluster.getZk().getAddress());
            try {
                NewStoreResponse newStore = veniceCluster.getNewStore(Utils.getUniqueString("venice-store"));
                veniceCluster.useControllerClient(controllerClient -> {
                    TestUtils.assertCommand(controllerClient.sendEmptyPushAndWait(newStore.getName(), Utils.getUniqueString(), 100L, 1800000L));
                });
                List<VeniceControllerWrapper> veniceControllers = veniceCluster.getVeniceControllers();
                ArrayList arrayList = new ArrayList();
                for (VeniceControllerWrapper veniceControllerWrapper : veniceControllers) {
                    veniceCluster.stopVeniceController(veniceControllerWrapper.getPort());
                    veniceControllerWrapper.close();
                    arrayList.add(veniceCluster.addVeniceController(this.enableControllerClusterHAASProperties));
                }
                TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        VeniceControllerWrapper veniceControllerWrapper2 = (VeniceControllerWrapper) it.next();
                        if (veniceControllerWrapper2.isLeaderController(veniceCluster.getClusterName())) {
                            Assert.assertTrue(veniceControllerWrapper2.isLeaderControllerOfControllerCluster(), "The colo leader Venice controller should be the leader of the only Venice cluster");
                            return true;
                        }
                    }
                    return false;
                });
                String composeKafkaTopic = Version.composeKafkaTopic(newStore.getName(), 1);
                veniceCluster.useControllerClient(controllerClient2 -> {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(TestUtils.assertCommand(controllerClient2.queryJobStatus(composeKafkaTopic)).getStatus(), ExecutionStatus.COMPLETED.toString());
                    });
                });
                if (startAndWaitForHAASToBeAvailable != null) {
                    startAndWaitForHAASToBeAvailable.close();
                }
                if (veniceCluster != null) {
                    veniceCluster.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testStartHAASControllerAsStorageClusterLeader() {
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(0, 0, 0, 1);
        try {
            HelixAsAServiceWrapper startAndWaitForHAASToBeAvailable = startAndWaitForHAASToBeAvailable(veniceCluster.getZk().getAddress());
            try {
                VeniceControllerWrapper addVeniceController = veniceCluster.addVeniceController(this.enableControllerAndStorageClusterHAASProperties);
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertTrue(addVeniceController.isLeaderController(veniceCluster.getClusterName()), "The only Venice controller should become the leader of the only Venice cluster");
                });
                veniceCluster.addVeniceServer(new Properties(), new Properties());
                veniceCluster.addVeniceServer(new Properties(), new Properties());
                NewStoreResponse newStore = veniceCluster.getNewStore(Utils.getUniqueString("venice-store"));
                veniceCluster.useControllerClient(controllerClient -> {
                    TestUtils.assertCommand(controllerClient.sendEmptyPushAndWait(newStore.getName(), Utils.getUniqueString(), 100L, 1800000L));
                });
                String composeKafkaTopic = Version.composeKafkaTopic(newStore.getName(), 1);
                veniceCluster.useControllerClient(controllerClient2 -> {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(TestUtils.assertCommand(controllerClient2.queryJobStatus(composeKafkaTopic)).getStatus(), ExecutionStatus.COMPLETED.toString());
                    });
                });
                if (startAndWaitForHAASToBeAvailable != null) {
                    startAndWaitForHAASToBeAvailable.close();
                }
                if (veniceCluster != null) {
                    veniceCluster.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testTransitionToHAASControllerAsStorageClusterLeader() {
        VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(3, 1, 0, 1);
        try {
            HelixAsAServiceWrapper startAndWaitForHAASToBeAvailable = startAndWaitForHAASToBeAvailable(veniceCluster.getZk().getAddress());
            try {
                NewStoreResponse newStore = veniceCluster.getNewStore(Utils.getUniqueString("venice-store"));
                veniceCluster.useControllerClient(controllerClient -> {
                    TestUtils.assertCommand(controllerClient.sendEmptyPushAndWait(newStore.getName(), Utils.getUniqueString(), 100L, 1800000L));
                });
                List<VeniceControllerWrapper> veniceControllers = veniceCluster.getVeniceControllers();
                ArrayList arrayList = new ArrayList();
                LiveInstance clusterLeader = startAndWaitForHAASToBeAvailable.getClusterLeader(veniceCluster.getClusterName());
                Assert.assertNotNull(clusterLeader, "Could not find the cluster leader from HAAS!");
                Assert.assertFalse(clusterLeader.getId().startsWith(HelixAsAServiceWrapper.HELIX_INSTANCE_NAME_PREFIX), "The cluster leader should not start with: helix_controller_");
                for (VeniceControllerWrapper veniceControllerWrapper : veniceControllers) {
                    veniceCluster.stopVeniceController(veniceControllerWrapper.getPort());
                    veniceControllerWrapper.close();
                    arrayList.add(veniceCluster.addVeniceController(this.enableControllerAndStorageClusterHAASProperties));
                }
                TestUtils.waitForNonDeterministicAssertion(15L, TimeUnit.SECONDS, () -> {
                    LiveInstance clusterLeader2 = startAndWaitForHAASToBeAvailable.getClusterLeader(veniceCluster.getClusterName());
                    Assert.assertNotNull(clusterLeader2, "Could not find the cluster leader from HAAS after the rolling bounce of all controllers!");
                    Assert.assertTrue(clusterLeader2.getId().startsWith(HelixAsAServiceWrapper.HELIX_INSTANCE_NAME_PREFIX), "The cluster leader should start with: helix_controller_");
                });
                veniceCluster.useControllerClient(controllerClient2 -> {
                    String composeKafkaTopic = Version.composeKafkaTopic(newStore.getName(), 1);
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                        Assert.assertEquals(TestUtils.assertCommand(controllerClient2.queryJobStatus(composeKafkaTopic)).getStatus(), ExecutionStatus.COMPLETED.toString());
                    });
                });
                if (startAndWaitForHAASToBeAvailable != null) {
                    startAndWaitForHAASToBeAvailable.close();
                }
                if (veniceCluster != null) {
                    veniceCluster.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (veniceCluster != null) {
                try {
                    veniceCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testConcurrentClusterInitialization() throws InterruptedException, ExecutionException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 3, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new DaemonThreadFactory("test-concurrent-cluster-init"));
        try {
            VeniceClusterWrapper veniceCluster = ServiceFactory.getVeniceCluster(0, 0, 0, 1);
            try {
                HelixAsAServiceWrapper startAndWaitForHAASToBeAvailable = startAndWaitForHAASToBeAvailable(veniceCluster.getZk().getAddress());
                try {
                    VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig = (VeniceControllerMultiClusterConfig) Mockito.mock(VeniceControllerMultiClusterConfig.class);
                    ((VeniceControllerMultiClusterConfig) Mockito.doReturn(startAndWaitForHAASToBeAvailable.getZkAddress()).when(veniceControllerMultiClusterConfig)).getZkAddress();
                    ((VeniceControllerMultiClusterConfig) Mockito.doReturn(HelixAsAServiceWrapper.HELIX_SUPER_CLUSTER_NAME).when(veniceControllerMultiClusterConfig)).getControllerHAASSuperClusterName();
                    ((VeniceControllerMultiClusterConfig) Mockito.doReturn("venice-controllers").when(veniceControllerMultiClusterConfig)).getControllerClusterName();
                    ((VeniceControllerMultiClusterConfig) Mockito.doReturn(3).when(veniceControllerMultiClusterConfig)).getControllerClusterReplica();
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < 3; i++) {
                        arrayList.add(new InitTask(new ZkHelixAdminClient(veniceControllerMultiClusterConfig, new MetricsRepository())));
                    }
                    Iterator it = threadPoolExecutor.invokeAll(arrayList).iterator();
                    while (it.hasNext()) {
                        ((Future) it.next()).get();
                    }
                    if (startAndWaitForHAASToBeAvailable != null) {
                        startAndWaitForHAASToBeAvailable.close();
                    }
                    if (veniceCluster != null) {
                        veniceCluster.close();
                    }
                } catch (Throwable th) {
                    if (startAndWaitForHAASToBeAvailable != null) {
                        try {
                            startAndWaitForHAASToBeAvailable.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            TestUtils.shutdownExecutor(threadPoolExecutor);
        }
    }

    private HelixAsAServiceWrapper startAndWaitForHAASToBeAvailable(String str) {
        HelixAsAServiceWrapper helixController = ServiceFactory.getHelixController(str);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertNotNull(helixController.getSuperClusterLeader(), "Helix super cluster doesn't have a leader yet");
        });
        return helixController;
    }
}
