package com.linkedin.davinci.ingestion;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.ingestion.isolated.IsolatedIngestionServer;
import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/ingestion/IsolatedIngestionServerTest.class */
public class IsolatedIngestionServerTest {
    private static final int TIMEOUT_MS = 60000;
    private ZkServerWrapper zkServerWrapper;

    @BeforeClass
    public void setUp() {
        this.zkServerWrapper = ServiceFactory.getZkServer();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.zkServerWrapper});
    }

    @Test(timeOut = 60000)
    public void testShutdownAfterHeartbeatTimeout() {
        VeniceConfigLoader configLoader = getConfigLoader(Utils.getFreePort());
        MainIngestionRequestClient mainIngestionRequestClient = new MainIngestionRequestClient(configLoader);
        try {
            Process startForkedIngestionProcess = mainIngestionRequestClient.startForkedIngestionProcess(configLoader);
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                Assert.assertTrue(mainIngestionRequestClient.sendHeartbeatRequest());
            });
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                Assert.assertFalse(startForkedIngestionProcess.isAlive());
            });
            mainIngestionRequestClient.close();
        } catch (Throwable th) {
            try {
                mainIngestionRequestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void testReleaseTargetPortBinding() {
        int freePort = Utils.getFreePort();
        VeniceConfigLoader configLoader = getConfigLoader(freePort);
        MainIngestionRequestClient mainIngestionRequestClient = new MainIngestionRequestClient(configLoader);
        try {
            ForkedJavaProcess startForkedIngestionProcess = mainIngestionRequestClient.startForkedIngestionProcess(configLoader);
            Assert.assertTrue(startForkedIngestionProcess.isAlive());
            int i = 0;
            String str = "lsof -t -i :" + freePort;
            String executeShellCommand = IsolatedIngestionUtils.executeShellCommand("/usr/sbin/" + str);
            if (executeShellCommand.isEmpty()) {
                executeShellCommand = IsolatedIngestionUtils.executeShellCommand(str);
            }
            for (String str2 : executeShellCommand.split("\n")) {
                if (!str2.equals("")) {
                    int parseInt = Integer.parseInt(str2);
                    if (IsolatedIngestionUtils.executeShellCommand("ps -p " + parseInt + " -o command").contains(IsolatedIngestionServer.class.getName())) {
                        i++;
                        Assert.assertEquals(parseInt, startForkedIngestionProcess.pid());
                    }
                }
            }
            Assert.assertEquals(i, 1);
            IsolatedIngestionUtils.releaseTargetPortBinding(freePort);
            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(IsolatedIngestionUtils.executeShellCommand("/usr/sbin/" + str), "");
                Assert.assertEquals(IsolatedIngestionUtils.executeShellCommand(str), "");
            });
            mainIngestionRequestClient.close();
        } catch (Throwable th) {
            try {
                mainIngestionRequestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCustomizedConfigs() throws Exception {
        System.setProperty("com.linkedin.app.env", "ei-ltx1");
        VeniceProperties parseProperties = Utils.parseProperties(IsolatedIngestionUtils.buildAndSaveConfigsForForkedIngestionProcess(getConfigLoader(Utils.getFreePort())));
        Assert.assertEquals(parseProperties.getInt("server.partition.graceful.drop.time.in.seconds"), 10);
        Assert.assertEquals(RegionUtils.getLocalRegionName(parseProperties, false), "ei-ltx1");
    }

    private VeniceConfigLoader getConfigLoader(int i) {
        VeniceProperties build = new PropertyBuilder().put("cluster.name", Utils.getUniqueString()).put("zookeeper.address", this.zkServerWrapper.getAddress()).put("kafka.bootstrap.servers", "localhost:1234").put("r2d2Client.zkHosts", this.zkServerWrapper.getAddress()).put("server.partition.graceful.drop.time.in.seconds", 100).put("server.ingestion.isolation.connection.timeout.seconds", 10).put("isolated.server.partition.graceful.drop.time.in.seconds", 10).put("server.ingestion.isolation.service.port", Integer.valueOf(i)).build();
        return new VeniceConfigLoader(build, build);
    }
}
