package com.linkedin.venice.kafka;

import com.linkedin.venice.integration.utils.PubSubBrokerConfigs;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestMockTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/kafka/TopicManagerIntegrationTest.class */
public class TopicManagerIntegrationTest extends TopicManagerTest {
    private PubSubBrokerWrapper pubSubBrokerWrapper;

    @AfterClass
    public void tearDown() {
        this.pubSubBrokerWrapper.close();
        this.topicManager.close();
    }

    protected void createTopicManager() {
        this.pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(new PubSubBrokerConfigs.Builder().setMockTime(new TestMockTime()).build());
        this.topicManager = IntegrationTestPushUtils.getTopicManagerRepo(500L, 100L, 86400000L, this.pubSubBrokerWrapper.getAddress(), new PubSubTopicRepository()).getTopicManager();
    }

    protected PubSubProducerAdapter createPubSubProducerAdapter() {
        Properties properties = new Properties();
        properties.put("kafka.key.serializer", KafkaKeySerializer.class.getName());
        properties.put("kafka.value.serializer", KafkaValueSerializer.class.getName());
        properties.put("kafka.bootstrap.servers", this.pubSubBrokerWrapper.getAddress());
        return new ApacheKafkaProducerAdapter(new ApacheKafkaProducerConfig(properties));
    }

    @Test
    public void testRaceCondition() throws ExecutionException, InterruptedException {
        PubSubTopic topic = getTopic();
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 0);
        long currentTimeMillis = System.currentTimeMillis();
        produceToKafka(topic, true, currentTimeMillis);
        produceToKafka(topic, false, currentTimeMillis + 1000);
        Assert.assertEquals(this.topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartitionImpl, 1), currentTimeMillis);
        for (int i = 0; i < 100; i++) {
            currentTimeMillis += 1000;
            produceToKafka(topic, true, currentTimeMillis);
        }
        for (int i2 = 1; i2 <= 3; i2++) {
            produceToKafka(topic, false, currentTimeMillis + (i2 * 1000));
        }
        long j = currentTimeMillis - 1000;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(50);
        Future[] futureArr = new Future[50];
        Runnable[] runnableArr = {() -> {
            this.topicManager.getPartitionOffsetByTime(pubSubTopicPartitionImpl, j);
        }, () -> {
            this.topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartitionImpl, 1);
        }, () -> {
            this.topicManager.partitionsFor(topic);
        }, () -> {
            this.topicManager.getPartitionEarliestOffsetAndRetry(pubSubTopicPartitionImpl, 1);
        }, () -> {
            this.topicManager.getPartitionLatestOffsetAndRetry(pubSubTopicPartitionImpl, 1);
        }, () -> {
            this.topicManager.getTopicLatestOffsets(topic);
        }};
        for (int i3 = 0; i3 < 50; i3++) {
            futureArr[i3] = newFixedThreadPool.submit(runnableArr[i3 % runnableArr.length]);
        }
        for (int i4 = 0; i4 < 50; i4++) {
            futureArr[i4].get();
        }
    }
}
