package com.linkedin.venice.integration.utils;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

/* loaded from: input_file:com/linkedin/venice/integration/utils/ZkServerWrapper.class */
public class ZkServerWrapper extends ProcessWrapper {
    public static final String SERVICE_NAME = "Zookeeper";
    private static final int MAX_WAIT_TIME_DURING_STARTUP = 5000;
    private static final int TICK_TIME = 200;
    private static final int MAX_SESSION_TIMEOUT = 10000;
    private static final int NUM_CONNECTIONS = 5000;
    private static final String CLIENT_PORT_PROP = "clientPort";
    private static final String TICK_TIME_PROP = "tickTime";
    private static final String MAX_SESSION_TIMEOUT_PROP = "maxSessionTimeout";
    private static final String NUM_CONNECTIONS_PROP = "maxClientCnxns";
    private static final String DATA_DIR_PROP = "dataDir";
    private final String chroot;
    private final ServerConfig configuration;
    private ZkThread zkThread;
    private ZooKeeperServer zkServer;
    private static final Logger LOGGER = LogManager.getLogger(ZkServerWrapper.class);
    private static final ConcurrentLinkedQueue<String> CHROOTS = new ConcurrentLinkedQueue<>();
    private static ZkServerWrapper INSTANCE = null;
    private static ZooKeeper zooKeeper = null;
    private static boolean isSingleton = false;

    /* loaded from: input_file:com/linkedin/venice/integration/utils/ZkServerWrapper$ZkThread.class */
    private class ZkThread extends Thread {
        volatile Exception exception;

        private ZkThread() {
            this.exception = null;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ZkServerWrapper.LOGGER.info("Starting ZK server");
                FileTxnSnapLog fileTxnSnapLog = null;
                try {
                    fileTxnSnapLog = new FileTxnSnapLog(ZkServerWrapper.this.configuration.getDataLogDir(), ZkServerWrapper.this.configuration.getDataDir());
                    ZkServerWrapper.this.zkServer.setTxnLogFactory(fileTxnSnapLog);
                    ZkServerWrapper.this.zkServer.setTickTime(ZkServerWrapper.this.configuration.getTickTime());
                    ZkServerWrapper.this.zkServer.setMinSessionTimeout(ZkServerWrapper.this.configuration.getMinSessionTimeout());
                    ZkServerWrapper.this.zkServer.setMaxSessionTimeout(ZkServerWrapper.this.configuration.getMaxSessionTimeout());
                    ServerCnxnFactory createFactory = ServerCnxnFactory.createFactory();
                    createFactory.configure(ZkServerWrapper.this.configuration.getClientPortAddress(), ZkServerWrapper.this.configuration.getMaxClientCnxns());
                    createFactory.startup(ZkServerWrapper.this.zkServer);
                    createFactory.join();
                    if (ZkServerWrapper.this.zkServer.isRunning()) {
                        ZkServerWrapper.this.zkServer.shutdown();
                    }
                    if (fileTxnSnapLog != null) {
                        fileTxnSnapLog.close();
                    }
                } catch (Throwable th) {
                    if (ZkServerWrapper.this.zkServer.isRunning()) {
                        ZkServerWrapper.this.zkServer.shutdown();
                    }
                    if (fileTxnSnapLog != null) {
                        fileTxnSnapLog.close();
                    }
                    throw th;
                }
            } catch (Exception e) {
                this.exception = e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StatefulServiceProvider<ZkServerWrapper> generateService() {
        return (str, file) -> {
            if (!isSingleton) {
                return createRealZkServerWrapper(str, Utils.getFreePort(), file);
            }
            synchronized (ZkServerWrapper.class) {
                if (INSTANCE == null) {
                    try {
                        INSTANCE = createRealZkServerWrapper(str, Utils.getFreePort(), file);
                        Runtime runtime = Runtime.getRuntime();
                        ZkServerWrapper zkServerWrapper = INSTANCE;
                        Objects.requireNonNull(zkServerWrapper);
                        runtime.addShutdownHook(new Thread(zkServerWrapper::close));
                        INSTANCE.start();
                    } catch (Exception e) {
                        INSTANCE = null;
                        throw e;
                    }
                }
            }
            String poll = CHROOTS.poll();
            if (poll == null) {
                CHROOTS.addAll(addPathsToZk(INSTANCE.getAddress(), 1));
                poll = CHROOTS.poll();
            }
            return new ZkServerWrapper(file, poll);
        };
    }

    private static ZkServerWrapper createRealZkServerWrapper(String str, int i, File file) {
        LOGGER.info("Creating ZkServerWrapper on port: {}", Integer.valueOf(i));
        Properties properties = new Properties();
        properties.setProperty(TICK_TIME_PROP, Integer.toString(TICK_TIME));
        properties.setProperty(MAX_SESSION_TIMEOUT_PROP, Integer.toString(MAX_SESSION_TIMEOUT));
        properties.setProperty(NUM_CONNECTIONS_PROP, Integer.toString(5000));
        properties.setProperty(CLIENT_PORT_PROP, Integer.toString(i));
        properties.setProperty(DATA_DIR_PROP, file.getAbsolutePath());
        QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
        try {
            quorumPeerConfig.parseProperties(properties);
            ServerConfig serverConfig = new ServerConfig();
            serverConfig.readFrom(quorumPeerConfig);
            return new ZkServerWrapper(file, serverConfig);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private ZkServerWrapper(File file, ServerConfig serverConfig) {
        super(SERVICE_NAME, file);
        this.chroot = null;
        this.zkThread = new ZkThread();
        this.zkServer = new ZooKeeperServer();
        this.configuration = serverConfig;
    }

    private ZkServerWrapper(File file, String str) {
        super(SERVICE_NAME, file);
        this.chroot = str;
        this.zkThread = null;
        this.zkServer = null;
        this.configuration = null;
    }

    @Override // com.linkedin.venice.integration.utils.ProcessWrapper
    public String getHost() {
        return "localhost";
    }

    @Override // com.linkedin.venice.integration.utils.ProcessWrapper
    public int getPort() {
        return (!isSingleton || this == INSTANCE) ? this.configuration.getClientPortAddress().getPort() : INSTANCE.getPort();
    }

    @Override // com.linkedin.venice.integration.utils.ProcessWrapper
    public String getAddress() {
        return (!isSingleton || this == INSTANCE) ? getHost() + ":" + getPort() : INSTANCE.getAddress() + "/" + this.chroot;
    }

    public static boolean isSingleton() {
        return isSingleton;
    }

    @Override // com.linkedin.venice.integration.utils.ProcessWrapper
    protected void internalStart() throws Exception {
        if (!isSingleton || this == INSTANCE) {
            synchronized (ZkServerWrapper.class) {
                long currentTimeMillis = System.currentTimeMillis() + 5000;
                this.zkThread.start();
                while (!this.zkServer.isRunning() && this.zkThread.exception == null) {
                    if (currentTimeMillis < System.currentTimeMillis()) {
                        close();
                        throw new VeniceException("Unable to start ZK within the maximum allotted time (5000 ms).");
                    }
                    Thread.sleep(100L);
                }
                if (this.zkThread.exception != null) {
                    INSTANCE = null;
                    throw new VeniceException("ZooKeeper failed to start.", this.zkThread.exception);
                }
                LOGGER.info("ZK is running: {}", Boolean.valueOf(this.zkServer.isRunning()));
            }
        }
    }

    @Override // com.linkedin.venice.integration.utils.ProcessWrapper
    protected synchronized void internalStop() throws Exception {
        if (!isSingleton || this == INSTANCE) {
            synchronized (ZkServerWrapper.class) {
                this.zkThread.interrupt();
                this.zkThread.join();
                INSTANCE = null;
            }
        }
    }

    @Override // com.linkedin.venice.integration.utils.ProcessWrapper
    protected void newProcess() throws Exception {
        if (isSingleton) {
            throw new RuntimeException("newProcess is not implemented for singleton ZkServerWrappers");
        }
        this.zkThread = new ZkThread();
        this.zkServer = new ZooKeeperServer();
    }

    private static synchronized List<String> addPathsToZk(String str, int i) {
        if (zooKeeper != null && zooKeeper.getState() != ZooKeeper.States.CONNECTED) {
            try {
                zooKeeper.close();
                zooKeeper = null;
            } catch (InterruptedException e) {
                throw new VeniceException(e);
            }
        }
        if (zooKeeper == null) {
            try {
                zooKeeper = new ZooKeeper(str, MAX_SESSION_TIMEOUT, watchedEvent -> {
                });
                TestUtils.waitForNonDeterministicCompletion(5L, TimeUnit.SECONDS, () -> {
                    return zooKeeper.getState().equals(ZooKeeper.States.CONNECTED);
                });
            } catch (Exception e2) {
                throw new RuntimeException("Failed to initialize ZK client: " + str, e2);
            }
        }
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            String uniqueString = Utils.getUniqueString("test");
            try {
                zooKeeper.create("/" + uniqueString, (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                arrayList.add(uniqueString);
            } catch (Exception e3) {
                throw new RuntimeException("Failed to create paths on zookeeper at: " + str, e3);
            }
        }
        return arrayList;
    }

    public String toString() {
        return "ZkService[" + getServiceName() + "@" + getHost() + ":" + getPort() + "]";
    }
}
