package com.linkedin.venice.controller;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.MockTestStateModelFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;

/* loaded from: input_file:com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.class */
class AbstractTestVeniceHelixAdmin {
    static final long LEADER_CHANGE_TIMEOUT_MS = 10000;
    static final long TOTAL_TIMEOUT_FOR_LONG_TEST_MS = 60000;
    static final long TOTAL_TIMEOUT_FOR_SHORT_TEST_MS = 10000;
    static final int DEFAULT_REPLICA_COUNT = 1;
    static final String KEY_SCHEMA = "\"string\"";
    static final String VALUE_SCHEMA = "\"string\"";
    static final int MAX_NUMBER_OF_PARTITION = 16;
    static String NODE_ID = "localhost_9985";
    static int SERVER_LISTENING_PORT = 9985;
    private static final Logger LOGGER = LogManager.getLogger(AbstractTestVeniceHelixAdmin.class);
    VeniceHelixAdmin veniceAdmin;
    String clusterName;
    VeniceControllerConfig controllerConfig;
    String zkAddress;
    ZkServerWrapper zkServerWrapper;
    PubSubBrokerWrapper pubSubBrokerWrapper;
    SafeHelixManager helixManager;
    VeniceProperties controllerProps;
    HelixMessageChannelStats helixMessageChannelStats;
    VeniceControllerMultiClusterConfig multiClusterConfig;
    String storeOwner = "Doge of Venice";
    Map<String, SafeHelixManager> helixManagerByNodeID = new ConcurrentHashMap();
    Map<String, MockTestStateModelFactory> stateModelFactoryByNodeID = new ConcurrentHashMap();
    final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    public void setupCluster() throws Exception {
        setupCluster(true, new MetricsRepository());
    }

    public void setupCluster(boolean z) throws Exception {
        setupCluster(z, new MetricsRepository());
    }

    public void setupCluster(boolean z, MetricsRepository metricsRepository) throws Exception {
        Utils.thisIsLocalhost();
        this.zkServerWrapper = ServiceFactory.getZkServer();
        this.zkAddress = this.zkServerWrapper.getAddress();
        this.pubSubBrokerWrapper = ServiceFactory.getPubSubBroker();
        this.clusterName = Utils.getUniqueString("test-cluster");
        Properties controllerProperties = getControllerProperties(this.clusterName);
        if (!z) {
            controllerProperties.put("participant.message.store.enabled", false);
            controllerProperties.put("admin.helix.messaging.channel.enabled", true);
        }
        controllerProperties.put("unregister.metric.for.deleted.store.enabled", true);
        this.controllerProps = new VeniceProperties(controllerProperties);
        this.helixMessageChannelStats = new HelixMessageChannelStats(new MetricsRepository(), this.clusterName);
        this.controllerConfig = new VeniceControllerConfig(this.controllerProps);
        this.multiClusterConfig = TestUtils.getMultiClusterConfigFromOneCluster(this.controllerConfig);
        this.veniceAdmin = new VeniceHelixAdmin(this.multiClusterConfig, metricsRepository, D2TestUtils.getAndStartD2Client(this.zkAddress), this.pubSubTopicRepository, this.pubSubBrokerWrapper.getPubSubClientsFactory());
        this.veniceAdmin.initStorageCluster(this.clusterName);
        startParticipant();
        waitUntilIsLeader(this.veniceAdmin, this.clusterName, 10000L);
        if (z) {
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                Store store = this.veniceAdmin.getStore(this.clusterName, VeniceSystemStoreUtils.getParticipantStoreNameForCluster(this.clusterName));
                Assert.assertNotNull(store);
                Assert.assertEquals(store.getCurrentVersion(), 1);
            });
        }
    }

    public void cleanupCluster() {
        stopAllParticipants();
        try {
            this.veniceAdmin.stop(this.clusterName);
            this.veniceAdmin.close();
        } catch (Exception e) {
            LOGGER.warn(e);
        }
        this.zkServerWrapper.close();
        this.pubSubBrokerWrapper.close();
    }

    void startParticipant() throws Exception {
        startParticipant(false, NODE_ID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void delayParticipantJobCompletion(boolean z) {
        Iterator<MockTestStateModelFactory> it = this.stateModelFactoryByNodeID.values().iterator();
        while (it.hasNext()) {
            it.next().setBlockTransition(z);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startParticipant(boolean z, String str) throws Exception {
        startParticipant(z, str, "LeaderStandby");
    }

    void startParticipant(boolean z, String str, String str2) throws Exception {
        MockTestStateModelFactory mockTestStateModelFactory;
        VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor = new VeniceOfflinePushMonitorAccessor(this.clusterName, new ZkClient(this.zkAddress), new HelixAdapterSerializer(), 3, 1000L);
        if (this.stateModelFactoryByNodeID.containsKey(str)) {
            mockTestStateModelFactory = this.stateModelFactoryByNodeID.get(str);
        } else {
            mockTestStateModelFactory = new MockTestStateModelFactory(veniceOfflinePushMonitorAccessor);
            this.stateModelFactoryByNodeID.put(str, mockTestStateModelFactory);
        }
        mockTestStateModelFactory.setBlockTransition(z);
        this.helixManager = TestUtils.getParticipant(this.clusterName, str, this.zkAddress, SERVER_LISTENING_PORT, mockTestStateModelFactory, str2);
        this.helixManager.connect();
        this.helixManagerByNodeID.put(str, this.helixManager);
        HelixUtils.setupInstanceConfig(this.clusterName, str, this.zkAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopAllParticipants() {
        Iterator<String> it = this.stateModelFactoryByNodeID.keySet().iterator();
        while (it.hasNext()) {
            stopParticipant(it.next());
        }
        this.stateModelFactoryByNodeID.clear();
        this.helixManagerByNodeID.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopParticipant(String str) {
        if (this.helixManagerByNodeID.containsKey(str)) {
            this.helixManagerByNodeID.get(str).disconnect();
            this.helixManagerByNodeID.remove(str);
            this.stateModelFactoryByNodeID.remove(str).stopAllStateModelThreads();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Properties getControllerProperties(String str) throws IOException {
        Properties propertiesForControllerConfig = TestUtils.getPropertiesForControllerConfig();
        propertiesForControllerConfig.put("kafka.replication.factor", 1);
        propertiesForControllerConfig.put("zookeeper.address", this.zkAddress);
        propertiesForControllerConfig.put("cluster.name", str);
        propertiesForControllerConfig.put("kafka.bootstrap.servers", this.pubSubBrokerWrapper.getAddress());
        propertiesForControllerConfig.put("default.partition.max.count", Integer.valueOf(MAX_NUMBER_OF_PARTITION));
        propertiesForControllerConfig.put("default.partition.size", 10);
        propertiesForControllerConfig.put("cluster.to.d2", TestUtils.getClusterToD2String(Collections.singletonMap(str, "dummy_d2")));
        propertiesForControllerConfig.put("cluster.to.server.d2", TestUtils.getClusterToD2String(Collections.singletonMap(str, "dummy_server_d2")));
        propertiesForControllerConfig.put("controller.add.version.via.admin.protocol", true);
        propertiesForControllerConfig.put("admin.helix.messaging.channel.enabled", false);
        propertiesForControllerConfig.put("participant.message.store.enabled", true);
        propertiesForControllerConfig.put("topic.cleanup.send.concurrent.delete.requests.enabled", true);
        propertiesForControllerConfig.put("controller.system.schema.cluster.name", str);
        propertiesForControllerConfig.put("child.cluster.allowlist", "dc-0");
        propertiesForControllerConfig.put("controller.ssl.enabled", false);
        return propertiesForControllerConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitUntilIsLeader(VeniceHelixAdmin veniceHelixAdmin, String str, long j) {
        waitForALeader(Collections.singletonList(veniceHelixAdmin), str, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForALeader(List<VeniceHelixAdmin> list, String str, long j) {
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                break;
            }
            Iterator<VeniceHelixAdmin> it = list.iterator();
            while (it.hasNext()) {
                if (it.next().isLeaderControllerFor(str)) {
                    return;
                }
            }
            try {
                Thread.sleep(100);
                j2 = j3 + 100;
            } catch (InterruptedException e) {
            }
        }
        Assert.fail("No VeniceHelixAdmin became leader for cluster: " + str + " after timeout: " + j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VeniceHelixAdmin getLeader(List<VeniceHelixAdmin> list, String str) {
        for (VeniceHelixAdmin veniceHelixAdmin : list) {
            if (veniceHelixAdmin.isLeaderControllerFor(str)) {
                return veniceHelixAdmin;
            }
        }
        throw new VeniceException("no leader found for cluster: " + str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VeniceHelixAdmin getFollower(List<VeniceHelixAdmin> list, String str) {
        for (VeniceHelixAdmin veniceHelixAdmin : list) {
            if (!veniceHelixAdmin.isLeaderControllerFor(str)) {
                return veniceHelixAdmin;
            }
        }
        throw new VeniceException("no follower found for cluster: " + str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void verifyParticipantMessageStoreSetup() {
        String participantStoreNameForCluster = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(this.clusterName);
        TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
            Store store = this.veniceAdmin.getStore(this.clusterName, participantStoreNameForCluster);
            Assert.assertNotNull(store);
            Assert.assertEquals(store.getVersions().size(), 1);
        });
        TestUtils.waitForNonDeterministicAssertion(3L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.veniceAdmin.getRealTimeTopic(this.clusterName, participantStoreNameForCluster), Version.composeRealTimeTopic(participantStoreNameForCluster));
        });
    }
}
