package com.linkedin.venice.endToEnd;

import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestTopicWiseSharedConsumerPoolResilience.class */
public class TestTopicWiseSharedConsumerPoolResilience {
    private static final Logger LOGGER = LogManager.getLogger(TestTopicWiseSharedConsumerPoolResilience.class);
    private VeniceClusterWrapper veniceCluster;

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        Properties properties = new Properties();
        properties.setProperty("admin.helix.messaging.channel.enabled", "false");
        properties.setProperty("participant.message.store.enabled", "true");
        properties.setProperty("participant.message.consumption.delay.ms", Integer.toString(Integer.MAX_VALUE));
        properties.setProperty("server.consumer.pool.size.per.kafka.cluster", Integer.toString(3));
        properties.setProperty("kafka.read.cycle.delay.ms", Integer.toString(1));
        properties.setProperty("server.shared.consumer.assignment.strategy", KafkaConsumerService.ConsumerAssignmentStrategy.TOPIC_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name());
        this.veniceCluster = ServiceFactory.getVeniceCluster(1, 2, 1, 2, VeniceClusterWrapper.NUM_RECORDS, false, false, properties);
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        if (this.veniceCluster != null) {
            this.veniceCluster.close();
        }
    }

    @Test
    public void testConsumerPoolShouldNotExhaustDuringRegularDataPushes() throws IOException {
        String uniqueString = Utils.getUniqueString("batch-store");
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString);
        Admin veniceAdmin = this.veniceCluster.getRandomVeniceController().getVeniceAdmin();
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps);
        for (int i = 1; i <= 5; i++) {
            try {
                int i2 = i;
                long currentTimeMillis = System.currentTimeMillis();
                VenicePushJob venicePushJob = new VenicePushJob(Utils.getUniqueString("hybrid-job-" + i2), defaultVPJProps);
                try {
                    venicePushJob.run();
                    TestUtils.waitForNonDeterministicCompletion(5L, TimeUnit.SECONDS, () -> {
                        return createStoreForJob.getStore((String) defaultVPJProps.get("venice.store.name")).getStore().getCurrentVersion() == i2;
                    });
                    LOGGER.info("**TIME** VPJ{} took {} ms", Integer.valueOf(i2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    venicePushJob.close();
                    if (i2 >= 3) {
                        String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, i - 2);
                        TestUtils.waitForNonDeterministicCompletion(10L, TimeUnit.SECONDS, () -> {
                            return !veniceAdmin.isResourceStillAlive(composeKafkaTopic);
                        });
                    }
                } catch (Throwable th) {
                    try {
                        venicePushJob.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createStoreForJob != null) {
                    try {
                        createStoreForJob.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
        if (createStoreForJob != null) {
            createStoreForJob.close();
        }
    }
}
