package com.linkedin.venice.kafka;

import com.github.benmanes.caffeine.cache.Cache;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.EndOfPush;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pubsub.PubSubTopicConfiguration;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.systemstore.schemas.StoreProperties;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.unit.kafka.MockInMemoryAdminAdapter;
import com.linkedin.venice.unit.kafka.consumer.MockInMemoryConsumer;
import com.linkedin.venice.unit.kafka.consumer.poll.RandomPollStrategy;
import com.linkedin.venice.unit.kafka.producer.MockInMemoryProducerAdapter;
import com.linkedin.venice.utils.AvroRecordUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.VeniceProperties;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/kafka/TopicManagerTest.class */
public class TopicManagerTest {
    private static final Logger LOGGER = LogManager.getLogger(TopicManagerTest.class);
    private static final int WAIT_TIME_IN_SECONDS = 10;
    protected static final long MIN_COMPACTION_LAG = 86400000;
    protected final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    protected TopicManager topicManager;
    private InMemoryKafkaBroker inMemoryKafkaBroker;

    @BeforeClass
    public void setUp() {
        createTopicManager();
        Cache cache = (Cache) Mockito.mock(Cache.class);
        Mockito.when(cache.getIfPresent(Mockito.any())).thenReturn((Object) null);
        this.topicManager.setTopicConfigCache(cache);
    }

    protected void createTopicManager() {
        this.inMemoryKafkaBroker = new InMemoryKafkaBroker("local");
        PubSubAdminAdapterFactory pubSubAdminAdapterFactory = (PubSubAdminAdapterFactory) Mockito.mock(ApacheKafkaAdminAdapterFactory.class);
        MockInMemoryAdminAdapter mockInMemoryAdminAdapter = new MockInMemoryAdminAdapter(this.inMemoryKafkaBroker);
        ((PubSubAdminAdapterFactory) Mockito.doReturn(mockInMemoryAdminAdapter).when(pubSubAdminAdapterFactory)).create((VeniceProperties) Mockito.any(), (PubSubTopicRepository) Mockito.eq(this.pubSubTopicRepository));
        MockInMemoryConsumer mockInMemoryConsumer = new MockInMemoryConsumer(this.inMemoryKafkaBroker, new RandomPollStrategy(), (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class));
        mockInMemoryConsumer.setMockInMemoryAdminAdapter(mockInMemoryAdminAdapter);
        PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = (PubSubConsumerAdapterFactory) Mockito.mock(PubSubConsumerAdapterFactory.class);
        ((PubSubConsumerAdapterFactory) Mockito.doReturn(mockInMemoryConsumer).when(pubSubConsumerAdapterFactory)).create((VeniceProperties) Mockito.any(), ArgumentMatchers.anyBoolean(), (PubSubMessageDeserializer) Mockito.any(), ArgumentMatchers.anyString());
        this.topicManager = TopicManagerRepository.builder().setPubSubProperties(str -> {
            return new VeniceProperties();
        }).setPubSubTopicRepository(this.pubSubTopicRepository).setLocalKafkaBootstrapServers("localhost:1234").setPubSubConsumerAdapterFactory(pubSubConsumerAdapterFactory).setPubSubAdminAdapterFactory(pubSubAdminAdapterFactory).setKafkaOperationTimeoutMs(500L).setTopicDeletionStatusPollIntervalMs(100L).setTopicMinLogCompactionLagMs(MIN_COMPACTION_LAG).build().getTopicManager();
    }

    protected PubSubProducerAdapter createPubSubProducerAdapter() {
        return new MockInMemoryProducerAdapter(this.inMemoryKafkaBroker);
    }

    @AfterClass
    public void cleanUp() throws IOException {
        this.topicManager.close();
    }

    protected void produceToKafka(PubSubTopic pubSubTopic, boolean z, long j) throws ExecutionException, InterruptedException {
        PubSubProducerAdapter createPubSubProducerAdapter = createPubSubProducerAdapter();
        byte[] bArr = {0, 1};
        KafkaKey kafkaKey = new KafkaKey(z ? MessageType.PUT : MessageType.CONTROL_MESSAGE, bArr);
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.producerMetadata = new ProducerMetadata();
        kafkaMessageEnvelope.producerMetadata.producerGUID = new GUID();
        kafkaMessageEnvelope.producerMetadata.messageTimestamp = j;
        kafkaMessageEnvelope.leaderMetadataFooter = new LeaderMetadata();
        kafkaMessageEnvelope.leaderMetadataFooter.hostName = "localhost";
        if (z) {
            Put put = new Put();
            put.putValue = ByteBuffer.wrap(new byte[]{0, 1});
            put.replicationMetadataPayload = ByteBuffer.wrap(bArr);
            kafkaMessageEnvelope.payloadUnion = put;
        } else {
            ControlMessage controlMessage = new ControlMessage();
            controlMessage.controlMessageType = ControlMessageType.END_OF_PUSH.getValue();
            controlMessage.controlMessageUnion = new EndOfPush();
            controlMessage.debugInfo = Collections.emptyMap();
            kafkaMessageEnvelope.payloadUnion = controlMessage;
        }
        createPubSubProducerAdapter.sendMessage(pubSubTopic.getName(), 0, kafkaKey, kafkaMessageEnvelope, (PubSubMessageHeaders) null, (PubSubProducerCallback) Mockito.mock(PubSubProducerCallback.class)).get();
    }

    protected PubSubTopic getTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString(Thread.currentThread().getStackTrace()[2].getMethodName()));
        this.topicManager.createTopic(topic, 1, 1, false);
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
        });
        return topic;
    }

    @Test
    public void testGetProducerTimestampOfLastDataRecord() throws ExecutionException, InterruptedException {
        PubSubTopic topic = getTopic();
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 0);
        long currentTimeMillis = System.currentTimeMillis();
        produceToKafka(topic, true, currentTimeMillis - 1000);
        produceToKafka(topic, true, currentTimeMillis);
        Assert.assertEquals(this.topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartitionImpl, 1), currentTimeMillis);
    }

    @Test
    public void testGetProducerTimestampOfLastDataRecordWithControlMessage() throws ExecutionException, InterruptedException {
        PubSubTopic topic = getTopic();
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 0);
        long currentTimeMillis = System.currentTimeMillis();
        produceToKafka(topic, true, currentTimeMillis);
        produceToKafka(topic, false, currentTimeMillis + 1000);
        Assert.assertEquals(this.topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartitionImpl, 1), currentTimeMillis);
        for (int i = 0; i < WAIT_TIME_IN_SECONDS; i++) {
            currentTimeMillis += 1000;
            produceToKafka(topic, true, currentTimeMillis);
        }
        for (int i2 = 1; i2 <= 3; i2++) {
            produceToKafka(topic, false, currentTimeMillis + (i2 * 1000));
        }
        Assert.assertEquals(this.topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartitionImpl, 1), currentTimeMillis);
    }

    @Test
    public void testGetProducerTimestampOfLastDataRecordOnEmptyTopic() {
        Assert.assertEquals(this.topicManager.getProducerTimestampOfLastDataRecord(new PubSubTopicPartitionImpl(getTopic(), 0), 1), -1L);
    }

    @Test
    public void testGetProducerTimestampOfLastDataRecordWithOnlyControlMessages() throws ExecutionException, InterruptedException {
        PubSubTopic topic = getTopic();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < WAIT_TIME_IN_SECONDS; i++) {
            produceToKafka(topic, false, currentTimeMillis);
            currentTimeMillis += 10;
        }
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 0);
        Assert.assertThrows(VeniceException.class, () -> {
            this.topicManager.getProducerTimestampOfLastDataRecord(pubSubTopicPartitionImpl, 1);
        });
    }

    @Test
    public void testCreateTopic() throws Exception {
        PubSubTopic topic = getTopic();
        this.topicManager.createTopic(topic, 1, 1, true);
        Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
        Assert.assertEquals(this.topicManager.getTopicRetention(topic), Long.MAX_VALUE);
        PubSubTopic topic2 = getTopic();
        this.topicManager.createTopic(topic2, 1, 1, false);
        Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic2));
        Assert.assertEquals(this.topicManager.getTopicRetention(topic2), 432000000L);
        Assert.assertEquals(1, this.topicManager.getReplicationFactor(topic2));
    }

    @Test
    public void testCreateTopicWhenTopicExists() throws Exception {
        PubSubTopic topic = getTopic();
        PubSubTopic topic2 = getTopic();
        this.topicManager.createTopic(topic, 1, 1, false);
        this.topicManager.createTopic(topic2, 1, 1, false);
        this.topicManager.updateTopicRetention(topic, 0L);
        this.topicManager.updateTopicRetention(topic2, 0L);
        Assert.assertEquals(this.topicManager.getTopicRetention(topic), 0L);
        Assert.assertEquals(this.topicManager.getTopicRetention(topic2), 0L);
        this.topicManager.createTopic(topic, 1, 1, true);
        Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
        Assert.assertEquals(this.topicManager.getTopicRetention(topic), Long.MAX_VALUE);
        this.topicManager.createTopic(topic2, 1, 1, false);
        Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic2));
        Assert.assertEquals(this.topicManager.getTopicRetention(topic2), 432000000L);
    }

    @Test
    public void testDeleteTopic() throws ExecutionException {
        PubSubTopic topic = getTopic();
        this.topicManager.ensureTopicIsDeletedAndBlock(topic);
        Assert.assertFalse(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
    }

    @Test
    public void testDeleteTopicWithRetry() throws ExecutionException {
        PubSubTopic topic = getTopic();
        this.topicManager.ensureTopicIsDeletedAndBlockWithRetry(topic);
        Assert.assertFalse(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
    }

    @Test
    public void testDeleteTopicWithTimeout() throws ExecutionException {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("mockTopicName_v1");
        TopicManager topicManager = (TopicManager) Mockito.spy(this.topicManager);
        ((TopicManager) Mockito.doThrow(VeniceOperationAgainstKafkaTimedOut.class).when(topicManager)).ensureTopicIsDeletedAndBlock(topic);
        ((TopicManager) Mockito.doCallRealMethod().when(topicManager)).ensureTopicIsDeletedAndBlockWithRetry(topic);
        Assert.assertThrows(VeniceOperationAgainstKafkaTimedOut.class, () -> {
            topicManager.ensureTopicIsDeletedAndBlockWithRetry(topic);
        });
        ((TopicManager) Mockito.verify(topicManager, Mockito.times(3))).ensureTopicIsDeletedAndBlock(topic);
    }

    @Test
    public void testSyncDeleteTopic() throws ExecutionException {
        PubSubTopic topic = getTopic();
        this.topicManager.ensureTopicIsDeletedAndBlock(topic);
        Assert.assertFalse(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
    }

    @Test
    public void testGetLastOffsets() {
        Int2LongMap topicLatestOffsets = this.topicManager.getTopicLatestOffsets(getTopic());
        TestUtils.waitForNonDeterministicAssertion(2L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(topicLatestOffsets.containsKey(0), "single partition topic has an offset for partition 0");
            Assert.assertEquals(topicLatestOffsets.keySet().size(), 1, "single partition topic has only an offset for one partition");
            Assert.assertEquals(((Long) topicLatestOffsets.get(0)).longValue(), 0L, "new topic must end at partition 0");
        });
    }

    @Test
    public void testListOffsetsOnEmptyTopic() {
        ((KafkaConsumer) Mockito.doReturn(new HashMap()).when((KafkaConsumer) Mockito.mock(KafkaConsumer.class))).listTopics();
        Assert.assertEquals(this.topicManager.getTopicLatestOffsets(this.pubSubTopicRepository.getTopic("myTopic_v1")).size(), 0);
    }

    @Test
    public void testGetTopicConfig() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        this.topicManager.createTopic(topic, 1, 1, true);
        PubSubTopicConfiguration topicConfig = this.topicManager.getTopicConfig(topic);
        Assert.assertTrue(topicConfig.retentionInMs().isPresent());
        Assert.assertTrue(((Long) topicConfig.retentionInMs().get()).longValue() > 0, "retention.ms should be positive");
    }

    @Test(expectedExceptions = {TopicDoesNotExistException.class})
    public void testGetTopicConfigWithUnknownTopic() {
        this.topicManager.getTopicConfig(this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic")));
    }

    @Test
    public void testUpdateTopicRetention() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        this.topicManager.createTopic(topic, 1, 1, true);
        this.topicManager.updateTopicRetention(topic, 0L);
        PubSubTopicConfiguration topicConfig = this.topicManager.getTopicConfig(topic);
        Assert.assertTrue(topicConfig.retentionInMs().isPresent());
        Assert.assertTrue(((Long) topicConfig.retentionInMs().get()).longValue() == 0);
    }

    @Test
    public void testListAllTopics() {
        HashSet hashSet = new HashSet(this.topicManager.listTopics());
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        PubSubTopic topic3 = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        this.topicManager.createTopic(topic, 1, 1, true);
        hashSet.add(topic);
        Assert.assertEquals(this.topicManager.listTopics(), hashSet);
        this.topicManager.createTopic(topic2, 1, 1, false);
        hashSet.add(topic2);
        Assert.assertEquals(this.topicManager.listTopics(), hashSet);
        this.topicManager.createTopic(topic3, 1, 1, false);
        hashSet.add(topic3);
        Assert.assertEquals(this.topicManager.listTopics(), hashSet);
    }

    @Test
    public void testGetAllTopicRetentions() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        PubSubTopic topic3 = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        this.topicManager.createTopic(topic, 1, 1, true);
        this.topicManager.createTopic(topic2, 1, 1, false);
        this.topicManager.createTopic(topic3, 1, 1, false);
        this.topicManager.updateTopicRetention(topic3, 5000L);
        Map allTopicRetentions = this.topicManager.getAllTopicRetentions();
        Assert.assertTrue(allTopicRetentions.size() > 3, "There should be at least 3 topics, which were created by this test");
        Assert.assertEquals(((Long) allTopicRetentions.get(topic)).longValue(), Long.MAX_VALUE);
        Assert.assertEquals(((Long) allTopicRetentions.get(topic2)).longValue(), 432000000L);
        Assert.assertEquals(((Long) allTopicRetentions.get(topic3)).longValue(), 5000L);
        Assert.assertFalse(this.topicManager.isTopicTruncated(topic, 5000L), "Topic1 should not be deprecated because of unlimited retention policy");
        Assert.assertFalse(this.topicManager.isTopicTruncated(topic2, 5000L), "Topic2 should not be deprecated because of unknown retention policy");
        Assert.assertTrue(this.topicManager.isTopicTruncated(topic3, 5000L), "Topic3 should be deprecated because of low retention policy");
        Assert.assertFalse(this.topicManager.isRetentionBelowTruncatedThreshold(5000 + 1, 5000L));
        Assert.assertFalse(this.topicManager.isRetentionBelowTruncatedThreshold(Long.MIN_VALUE, 5000L));
        Assert.assertTrue(this.topicManager.isRetentionBelowTruncatedThreshold(5000 - 1, 5000L));
    }

    @Test
    public void testUpdateTopicCompactionPolicy() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        this.topicManager.createTopic(topic, 1, 1, true);
        Assert.assertFalse(this.topicManager.isTopicCompactionEnabled(topic), "topic: " + topic + " should be with compaction disabled");
        this.topicManager.updateTopicCompactionPolicy(topic, true);
        Assert.assertTrue(this.topicManager.isTopicCompactionEnabled(topic), "topic: " + topic + " should be with compaction enabled");
        Assert.assertEquals(this.topicManager.getTopicMinLogCompactionLagMs(topic), MIN_COMPACTION_LAG);
        this.topicManager.updateTopicCompactionPolicy(topic, false);
        Assert.assertFalse(this.topicManager.isTopicCompactionEnabled(topic), "topic: " + topic + " should be with compaction disabled");
        Assert.assertEquals(this.topicManager.getTopicMinLogCompactionLagMs(topic), 0L);
    }

    @Test
    public void testGetConfigForNonExistingTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("non-existing-topic"));
        Assert.assertThrows(TopicDoesNotExistException.class, () -> {
            this.topicManager.getTopicConfig(topic);
        });
    }

    @Test
    public void testGetLatestOffsetForNonExistingTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("non-existing-topic"));
        Assert.assertThrows(TopicDoesNotExistException.class, () -> {
            this.topicManager.getPartitionLatestOffsetAndRetry(new PubSubTopicPartitionImpl(topic, 0), WAIT_TIME_IN_SECONDS);
        });
    }

    @Test
    public void testGetLatestProducerTimestampForNonExistingTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("non-existing-topic"));
        Assert.assertThrows(TopicDoesNotExistException.class, () -> {
            this.topicManager.getProducerTimestampOfLastDataRecord(new PubSubTopicPartitionImpl(topic, 0), WAIT_TIME_IN_SECONDS);
        });
    }

    @Test
    public void testGetAndUpdateTopicRetentionForNonExistingTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("non-existing-topic"));
        Assert.assertThrows(TopicDoesNotExistException.class, () -> {
            this.topicManager.getTopicRetention(topic);
        });
        Assert.assertThrows(TopicDoesNotExistException.class, () -> {
            this.topicManager.updateTopicRetention(topic, TimeUnit.DAYS.toMillis(1L));
        });
    }

    @Test
    public void testUpdateTopicCompactionPolicyForNonExistingTopic() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("non-existing-topic"));
        Assert.assertThrows(TopicDoesNotExistException.class, () -> {
            this.topicManager.updateTopicCompactionPolicy(topic, true);
        });
    }

    @Test
    public void testTimeoutOnGettingMaxOffset() throws IOException {
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic")), 0);
        PubSubAdminAdapter pubSubAdminAdapter = (PubSubAdminAdapter) Mockito.mock(PubSubAdminAdapter.class);
        ((PubSubAdminAdapter) Mockito.doReturn(true).when(pubSubAdminAdapter)).containsTopicWithPartitionCheckExpectationAndRetry((PubSubTopicPartition) Mockito.eq(pubSubTopicPartitionImpl), Mockito.anyInt(), Mockito.eq(true));
        PubSubConsumerAdapter pubSubConsumerAdapter = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        ((PubSubConsumerAdapter) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(pubSubConsumerAdapter)).endOffsets((Collection) Mockito.any(), (Duration) Mockito.any());
        PubSubAdminAdapterFactory pubSubAdminAdapterFactory = (PubSubAdminAdapterFactory) Mockito.mock(PubSubAdminAdapterFactory.class);
        PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory = (PubSubConsumerAdapterFactory) Mockito.mock(PubSubConsumerAdapterFactory.class);
        ((PubSubConsumerAdapterFactory) Mockito.doReturn(pubSubConsumerAdapter).when(pubSubConsumerAdapterFactory)).create((VeniceProperties) Mockito.any(), ArgumentMatchers.anyBoolean(), (PubSubMessageDeserializer) Mockito.any(), ArgumentMatchers.anyString());
        ((PubSubAdminAdapterFactory) Mockito.doReturn(pubSubAdminAdapter).when(pubSubAdminAdapterFactory)).create((VeniceProperties) Mockito.any(), (PubSubTopicRepository) Mockito.eq(this.pubSubTopicRepository));
        TopicManager topicManager = TopicManagerRepository.builder().setPubSubProperties(str -> {
            return new VeniceProperties();
        }).setPubSubTopicRepository(this.pubSubTopicRepository).setLocalKafkaBootstrapServers("localhost:1234").setPubSubAdminAdapterFactory(pubSubAdminAdapterFactory).setPubSubConsumerAdapterFactory(pubSubConsumerAdapterFactory).setKafkaOperationTimeoutMs(30000L).setTopicDeletionStatusPollIntervalMs(100L).setTopicMinLogCompactionLagMs(MIN_COMPACTION_LAG).build().getTopicManager();
        try {
            Assert.assertThrows(VeniceOperationAgainstKafkaTimedOut.class, () -> {
                topicManager.getPartitionLatestOffsetAndRetry(pubSubTopicPartitionImpl, WAIT_TIME_IN_SECONDS);
            });
            if (topicManager != null) {
                topicManager.close();
            }
        } catch (Throwable th) {
            if (topicManager != null) {
                try {
                    topicManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testContainsTopicWithExpectationAndRetry() throws InterruptedException {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        Assert.assertFalse(this.topicManager.containsTopicWithExpectationAndRetry(topic, 3, true));
        this.topicManager.createTopic(topic, 1, 1, false);
        Assert.assertTrue(this.topicManager.containsTopicWithExpectationAndRetry(topic, 3, true));
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            countDownLatch.countDown();
            LOGGER.info("Thread started and it will create topic {} in {} second(s)", topic2, 1L);
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
            } catch (InterruptedException e) {
                Assert.fail("Got unexpected exception...", e);
            }
            this.topicManager.createTopic(topic2, 1, 1, false);
            LOGGER.info("Created this initially-not-exist topic: {}", topic2);
        });
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Duration ofSeconds = Duration.ofSeconds(3L);
        Duration ofSeconds2 = Duration.ofSeconds(ofSeconds.getSeconds() + 1);
        Duration ofSeconds3 = Duration.ofSeconds(3 * ofSeconds2.getSeconds());
        Assert.assertFalse(runAsync.isDone());
        Assert.assertTrue(this.topicManager.containsTopicWithExpectationAndRetry(topic2, 3, true, ofSeconds, ofSeconds2, ofSeconds3));
        Assert.assertTrue(runAsync.isDone());
    }

    @Test
    public void testMinimumExpectedRetentionTime() {
        StoreProperties prefillAvroRecordWithDefaultValue = AvroRecordUtils.prefillAvroRecordWithDefaultValue(new StoreProperties());
        prefillAvroRecordWithDefaultValue.name = "storeName";
        prefillAvroRecordWithDefaultValue.owner = "owner";
        prefillAvroRecordWithDefaultValue.createdTime = System.currentTimeMillis();
        prefillAvroRecordWithDefaultValue.bootstrapToOnlineTimeoutInHours = 12;
        Assert.assertEquals(TopicManager.getExpectedRetentionTimeInMs(new ZKStore(prefillAvroRecordWithDefaultValue), new HybridStoreConfigImpl(172800L, 20000L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), 432000000L);
    }

    @Test
    public void testExpectedRetentionTime() {
        StoreProperties prefillAvroRecordWithDefaultValue = AvroRecordUtils.prefillAvroRecordWithDefaultValue(new StoreProperties());
        prefillAvroRecordWithDefaultValue.name = "storeName";
        prefillAvroRecordWithDefaultValue.owner = "owner";
        prefillAvroRecordWithDefaultValue.createdTime = System.currentTimeMillis();
        prefillAvroRecordWithDefaultValue.bootstrapToOnlineTimeoutInHours = 72;
        Assert.assertEquals(TopicManager.getExpectedRetentionTimeInMs(new ZKStore(prefillAvroRecordWithDefaultValue), new HybridStoreConfigImpl(172800L, 20000L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), 604800000L);
    }

    @Test
    public void testContainsTopicAndAllPartitionsAreOnline() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("a-new-topic"));
        Assert.assertFalse(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
        this.topicManager.createTopic(topic, 1, 1, true);
        Assert.assertTrue(this.topicManager.containsTopicAndAllPartitionsAreOnline(topic));
    }

    @Test
    public void testUpdateTopicMinISR() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(TestUtils.getUniqueTopicString("topic"));
        this.topicManager.createTopic(topic, 1, 1, true);
        Assert.assertTrue(((Integer) this.topicManager.getTopicConfig(topic).minInSyncReplicas().get()).intValue() == 1);
        this.topicManager.updateTopicMinInSyncReplica(topic, 2);
        Assert.assertTrue(((Integer) this.topicManager.getTopicConfig(topic).minInSyncReplicas().get()).intValue() == 2);
    }
}
