package com.linkedin.venice.ingestion.control;

import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/ingestion/control/RealTimeTopicSwitcherTest.class */
public class RealTimeTopicSwitcherTest {
    private static final int KAFKA_RF_FOR_RT_TOPICS = 6;
    private static final int KAFKA_MIN_ISR_FOR_RT_TOPICS = 4;
    private RealTimeTopicSwitcher leaderStorageNodeReplicator;
    private TopicManager mockTopicManager;
    private VeniceWriterFactory mockVeniceWriterFactory;
    private String aggregateRealTimeSourceKafkaUrl = "aggregate-real-time-source-kafka-url";
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeMethod
    public void setUp() {
        this.mockTopicManager = (TopicManager) Mockito.mock(TopicManager.class);
        this.mockVeniceWriterFactory = (VeniceWriterFactory) Mockito.mock(VeniceWriterFactory.class);
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", "dummy");
        properties.put("kafka.replication.factor", "3");
        properties.put("kafka.replication.factor.rt.topics", Integer.toString(KAFKA_RF_FOR_RT_TOPICS));
        properties.put("kafka.min.in.sync.replicas.rt.topics", Integer.toString(KAFKA_MIN_ISR_FOR_RT_TOPICS));
        this.leaderStorageNodeReplicator = new RealTimeTopicSwitcher(this.mockTopicManager, this.mockVeniceWriterFactory, new VeniceProperties(properties), this.pubSubTopicRepository);
    }

    @Test
    public void testPrepareAndStartReplication() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("testTopic_rt");
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic("testTopic_v1");
        Store store = (Store) Mockito.mock(Store.class);
        HybridStoreConfig hybridStoreConfig = (HybridStoreConfig) Mockito.mock(HybridStoreConfig.class);
        ArrayList arrayList = new ArrayList();
        VeniceWriter veniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
        ((Store) Mockito.doReturn(true).when(store)).isHybrid();
        ((Store) Mockito.doReturn(hybridStoreConfig).when(store)).getHybridStoreConfig();
        ((Store) Mockito.doReturn(Optional.of(new VersionImpl(topic2.getStoreName(), 1, "test-id"))).when(store)).getVersion(Version.parseVersionFromKafkaTopicName(topic2.getName()));
        ((HybridStoreConfig) Mockito.doReturn(3600L).when(hybridStoreConfig)).getRewindTimeInSeconds();
        ((HybridStoreConfig) Mockito.doReturn(BufferReplayPolicy.REWIND_FROM_EOP).when(hybridStoreConfig)).getBufferReplayPolicy();
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopicAndAllPartitionsAreOnline(topic);
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopicAndAllPartitionsAreOnline(topic2);
        ((TopicManager) Mockito.doReturn(arrayList).when(this.mockTopicManager)).partitionsFor(topic);
        ((TopicManager) Mockito.doReturn(arrayList).when(this.mockTopicManager)).partitionsFor(topic2);
        ((VeniceWriterFactory) Mockito.doReturn(veniceWriter).when(this.mockVeniceWriterFactory)).createVeniceWriter((VeniceWriterOptions) ArgumentMatchers.any(VeniceWriterOptions.class));
        this.leaderStorageNodeReplicator.switchToRealTimeTopic(topic.getName(), topic2.getName(), store, this.aggregateRealTimeSourceKafkaUrl, Collections.emptyList());
        ((VeniceWriter) Mockito.verify(veniceWriter)).broadcastTopicSwitch((List) ArgumentMatchers.any(), ArgumentMatchers.anyString(), Long.valueOf(ArgumentMatchers.anyLong()), (Map) ArgumentMatchers.any());
    }

    @Test
    public void testPrepareAndStartReplicationWithNativeReplication() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("testTopic_rt");
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic("testTopic_v1");
        Store store = (Store) Mockito.mock(Store.class);
        HybridStoreConfig hybridStoreConfig = (HybridStoreConfig) Mockito.mock(HybridStoreConfig.class);
        ArrayList arrayList = new ArrayList();
        VeniceWriter veniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
        ((Store) Mockito.doReturn(true).when(store)).isHybrid();
        ((Store) Mockito.doReturn(hybridStoreConfig).when(store)).getHybridStoreConfig();
        VersionImpl versionImpl = new VersionImpl(topic2.getStoreName(), 1, "test-id");
        versionImpl.setNativeReplicationEnabled(true);
        ((Store) Mockito.doReturn(Optional.of(versionImpl)).when(store)).getVersion(Version.parseVersionFromKafkaTopicName(topic2.getName()));
        ((HybridStoreConfig) Mockito.doReturn(3600L).when(hybridStoreConfig)).getRewindTimeInSeconds();
        ((HybridStoreConfig) Mockito.doReturn(BufferReplayPolicy.REWIND_FROM_EOP).when(hybridStoreConfig)).getBufferReplayPolicy();
        ((HybridStoreConfig) Mockito.doReturn(DataReplicationPolicy.AGGREGATE).when(hybridStoreConfig)).getDataReplicationPolicy();
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopicAndAllPartitionsAreOnline(topic);
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopicAndAllPartitionsAreOnline(topic2);
        ((TopicManager) Mockito.doReturn(arrayList).when(this.mockTopicManager)).partitionsFor(topic);
        ((TopicManager) Mockito.doReturn(arrayList).when(this.mockTopicManager)).partitionsFor(topic2);
        ((VeniceWriterFactory) Mockito.doReturn(veniceWriter).when(this.mockVeniceWriterFactory)).createVeniceWriter((VeniceWriterOptions) ArgumentMatchers.any(VeniceWriterOptions.class));
        this.leaderStorageNodeReplicator.switchToRealTimeTopic(topic.getName(), topic2.getName(), store, this.aggregateRealTimeSourceKafkaUrl, Collections.emptyList());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(this.aggregateRealTimeSourceKafkaUrl);
        ((VeniceWriter) Mockito.verify(veniceWriter)).broadcastTopicSwitch((List) ArgumentMatchers.eq(arrayList2), (String) ArgumentMatchers.eq(topic.getName()), Long.valueOf(ArgumentMatchers.anyLong()), (Map) ArgumentMatchers.any());
    }

    public void testSendVersionSwap() {
        HashMap hashMap = new HashMap();
        hashMap.put("testView", new ViewConfigImpl("testClass", Collections.emptyMap()));
        VersionImpl versionImpl = new VersionImpl("TestStore", 1, "push1");
        VersionImpl versionImpl2 = new VersionImpl("TestStore", 2, "push2");
        versionImpl2.setViewConfigs(hashMap);
        VersionImpl versionImpl3 = new VersionImpl("TestStore", 3, "push3");
        versionImpl3.setViewConfigs(hashMap);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic("TestStore"));
        Store store = (Store) Mockito.mock(Store.class);
        Mockito.when(store.getName()).thenReturn("TestStore");
        Mockito.when(store.getVersion(1)).thenReturn(Optional.of(versionImpl));
        Mockito.when(store.getVersion(2)).thenReturn(Optional.of(versionImpl2));
        Mockito.when(store.getVersion(3)).thenReturn(Optional.of(versionImpl3));
        Mockito.when(store.getVersion(KAFKA_MIN_ISR_FOR_RT_TOPICS)).thenReturn(Optional.empty());
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        Mockito.when(Boolean.valueOf(topicManager.containsTopic(topic))).thenReturn(true);
        VeniceWriter veniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
        VeniceWriterFactory veniceWriterFactory = (VeniceWriterFactory) Mockito.mock(VeniceWriterFactory.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(VeniceWriterOptions.class);
        Mockito.when(veniceWriterFactory.createVeniceWriter((VeniceWriterOptions) ArgumentMatchers.any(VeniceWriterOptions.class))).thenReturn(veniceWriter);
        VeniceProperties veniceProperties = (VeniceProperties) Mockito.mock(VeniceProperties.class);
        Mockito.when(veniceProperties.getString("kafka.bootstrap.servers")).thenReturn(this.aggregateRealTimeSourceKafkaUrl);
        RealTimeTopicSwitcher realTimeTopicSwitcher = new RealTimeTopicSwitcher(topicManager, veniceWriterFactory, veniceProperties, this.pubSubTopicRepository);
        realTimeTopicSwitcher.transmitVersionSwapMessage(store, 1, 2);
        ((VeniceWriter) Mockito.verify(veniceWriter, Mockito.never())).broadcastVersionSwap(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), ArgumentMatchers.anyMap());
        Assert.assertThrows(() -> {
            realTimeTopicSwitcher.transmitVersionSwapMessage(store, 3, KAFKA_MIN_ISR_FOR_RT_TOPICS);
        });
        realTimeTopicSwitcher.transmitVersionSwapMessage(store, 2, 3);
        ((VeniceWriterFactory) Mockito.verify(veniceWriterFactory, Mockito.times(2))).createVeniceWriter((VeniceWriterOptions) forClass.capture());
        Assert.assertEquals(((VeniceWriterOptions) forClass.getValue()).getTopicName(), topic.getName());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(String.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(String.class);
        ((VeniceWriter) Mockito.verify(veniceWriter, Mockito.times(2))).broadcastVersionSwap((String) forClass2.capture(), (String) forClass3.capture(), ArgumentMatchers.anyMap());
        Assert.assertEquals((String) forClass2.getValue(), versionImpl2.kafkaTopicName());
        Assert.assertEquals((String) forClass3.getValue(), versionImpl3.kafkaTopicName());
    }

    @Test
    public void testEnsurePreconditions() {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("testTopic_rt");
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic("testTopic_v1");
        Store store = (Store) Mockito.mock(Store.class);
        HybridStoreConfig hybridStoreConfig = (HybridStoreConfig) Mockito.mock(HybridStoreConfig.class);
        ((Store) Mockito.doReturn(true).when(store)).isHybrid();
        ((Store) Mockito.doReturn(hybridStoreConfig).when(store)).getHybridStoreConfig();
        ((Store) Mockito.doReturn(Optional.of(new VersionImpl(topic2.getStoreName(), 1, "test-id"))).when(store)).getVersion(Version.parseVersionFromKafkaTopicName(topic2.getName()));
        ((HybridStoreConfig) Mockito.doReturn(3600L).when(hybridStoreConfig)).getRewindTimeInSeconds();
        ((HybridStoreConfig) Mockito.doReturn(BufferReplayPolicy.REWIND_FROM_EOP).when(hybridStoreConfig)).getBufferReplayPolicy();
        ((TopicManager) Mockito.doReturn(false).when(this.mockTopicManager)).containsTopicAndAllPartitionsAreOnline(topic);
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopicAndAllPartitionsAreOnline(topic2);
        this.leaderStorageNodeReplicator.ensurePreconditions(topic, topic2, store, Optional.of(hybridStoreConfig));
        ((TopicManager) Mockito.verify(this.mockTopicManager)).createTopic((PubSubTopic) ArgumentMatchers.eq(topic), ArgumentMatchers.anyInt(), ArgumentMatchers.eq(KAFKA_RF_FOR_RT_TOPICS), ArgumentMatchers.anyLong(), ArgumentMatchers.eq(false), (Optional) ArgumentMatchers.eq(Optional.of(Integer.valueOf(KAFKA_MIN_ISR_FOR_RT_TOPICS))), ArgumentMatchers.eq(false));
    }
}
