package com.linkedin.venice.hadoop;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.RepushInfo;
import com.linkedin.venice.controllerapi.RepushInfoResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/TestKafkaFormatTopicAutoDiscover.class */
public class TestKafkaFormatTopicAutoDiscover {
    private static final String JOB_ID = "some-job-ID";
    private static final String STORE_NAME = "store-name";
    private static final String STORE_NAME_2 = "store-name-2";

    @Test(expectedExceptions = {UndefinedPropertyException.class}, expectedExceptionsMessageRegExp = "Missing required property 'venice.store.name'.")
    public void testMissingStoreNameInConfig() {
        new VenicePushJob(JOB_ID, getJobProperties(Collections.emptyMap())).close();
    }

    @Test
    public void testNoUserProvidedTopicNameAndSingleColoVersion() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(Collections.emptyMap(), 1));
        Mockito.when(controllerClient.getRepushInfo(STORE_NAME, Optional.empty())).thenReturn(getMockRepushResponse(1));
        configureClusterDiscoveryControllerClient(controllerClient);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(Collections.singletonMap("venice.store.name", STORE_NAME)));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            Assert.assertEquals(venicePushJob.getPushJobSetting().kafkaInputTopic, Version.composeKafkaTopic(STORE_NAME, 1));
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUserProvidedEpochRewind() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockHybridStoreResponse(Collections.emptyMap(), 1, BufferReplayPolicy.REWIND_FROM_SOP));
        Mockito.when(controllerClient.getRepushInfo(STORE_NAME, Optional.empty())).thenReturn(getMockRepushResponse(1));
        configureClusterDiscoveryControllerClient(controllerClient);
        HashMap hashMap = new HashMap();
        hashMap.put("venice.store.name", STORE_NAME);
        hashMap.put("rewind.epoch.time.in.seconds.override", "1637016606");
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(hashMap));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.validateRemoteHybridSettings();
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUserProvidedEpochRewindWithInvalidRemotePolicy() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockHybridStoreResponse(Collections.emptyMap(), 1, BufferReplayPolicy.REWIND_FROM_EOP));
        Mockito.when(controllerClient.getRepushInfo(STORE_NAME, Optional.empty())).thenReturn(getMockRepushResponse(1));
        configureClusterDiscoveryControllerClient(controllerClient);
        HashMap hashMap = new HashMap();
        hashMap.put("venice.store.name", STORE_NAME);
        hashMap.put("rewind.epoch.time.in.seconds.override", "1637016606");
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(hashMap));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            venicePushJob.setControllerClient(controllerClient);
            Objects.requireNonNull(venicePushJob);
            Assert.assertThrows(venicePushJob::validateRemoteHybridSettings);
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testNoUserProvidedTopicNameAndMultiColoVersion() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        HashMap hashMap = new HashMap(3);
        hashMap.put("colo-0", 1);
        hashMap.put("colo-1", 1);
        hashMap.put("colo-2", 1);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(hashMap, -1));
        Mockito.when(controllerClient.getRepushInfo(STORE_NAME, Optional.empty())).thenReturn(getMockRepushResponse(1));
        configureClusterDiscoveryControllerClient(controllerClient);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(Collections.singletonMap("venice.store.name", STORE_NAME)));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            Assert.assertEquals(venicePushJob.getPushJobSetting().kafkaInputTopic, Version.composeKafkaTopic(STORE_NAME, 1));
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testNoUserProvidedTopicNameAndMultiColoVersionMismatch() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        HashMap hashMap = new HashMap(3);
        hashMap.put("colo-0", 1);
        hashMap.put("colo-1", 1);
        hashMap.put("colo-2", 2);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(hashMap, -1));
        Mockito.when(controllerClient.getRepushInfo(STORE_NAME, Optional.empty())).thenReturn(getMockRepushResponse(2));
        configureClusterDiscoveryControllerClient(controllerClient);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(Collections.singletonMap("venice.store.name", STORE_NAME)));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            Assert.assertEquals(venicePushJob.getPushJobSetting().kafkaInputTopic, Version.composeKafkaTopic(STORE_NAME, 2));
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUserProvidedTopicNameAndMultiColoVersionMismatch() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        HashMap hashMap = new HashMap(3);
        hashMap.put("colo-0", 1);
        hashMap.put("colo-1", 1);
        hashMap.put("colo-2", 2);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(hashMap, -1));
        configureClusterDiscoveryControllerClient(controllerClient);
        String composeKafkaTopic = Version.composeKafkaTopic(STORE_NAME, 3);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("venice.store.name", STORE_NAME);
        hashMap2.put("kafka.input.topic", composeKafkaTopic);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(hashMap2));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            Assert.assertEquals(venicePushJob.getPushJobSetting().kafkaInputTopic, composeKafkaTopic);
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(expectedExceptions = {IllegalArgumentException.class}, expectedExceptionsMessageRegExp = "Store user-provided name mismatch with the derived store name.*")
    public void testUserProvidedTopicNameNotValid() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(Collections.emptyMap(), -1));
        configureClusterDiscoveryControllerClient(controllerClient);
        String composeKafkaTopic = Version.composeKafkaTopic(STORE_NAME_2, 3);
        HashMap hashMap = new HashMap();
        hashMap.put("venice.store.name", STORE_NAME);
        hashMap.put("kafka.input.topic", composeKafkaTopic);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(hashMap));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUserProvidedTopicNameMatchDiscoveredTopicName() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        HashMap hashMap = new HashMap(3);
        hashMap.put("colo-0", 1);
        hashMap.put("colo-1", 1);
        hashMap.put("colo-2", 1);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(hashMap, -1));
        configureClusterDiscoveryControllerClient(controllerClient);
        String composeKafkaTopic = Version.composeKafkaTopic(STORE_NAME, 1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("venice.store.name", STORE_NAME);
        hashMap2.put("kafka.input.topic", composeKafkaTopic);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(hashMap2));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            Assert.assertEquals(venicePushJob.getPushJobSetting().kafkaInputTopic, composeKafkaTopic);
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(expectedExceptions = {IllegalArgumentException.class}, expectedExceptionsMessageRegExp = "Store user-provided name mismatch with the derived store name.*")
    public void testUserProvidedTopicNameMismatchDiscoveredTopicName() {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        HashMap hashMap = new HashMap(3);
        hashMap.put("colo-0", 1);
        hashMap.put("colo-1", 1);
        hashMap.put("colo-2", 1);
        Mockito.when(controllerClient.getStore(STORE_NAME)).thenReturn(getMockStoreResponse(hashMap, -1));
        configureClusterDiscoveryControllerClient(controllerClient);
        String composeKafkaTopic = Version.composeKafkaTopic(STORE_NAME_2, 1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("venice.store.name", STORE_NAME);
        hashMap2.put("kafka.input.topic", composeKafkaTopic);
        VenicePushJob venicePushJob = new VenicePushJob(JOB_ID, getJobProperties(hashMap2));
        try {
            venicePushJob.setControllerClient(controllerClient);
            venicePushJob.initKIFRepushDetails();
            Assert.assertEquals(venicePushJob.getPushJobSetting().kafkaInputTopic, composeKafkaTopic);
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void configureClusterDiscoveryControllerClient(ControllerClient controllerClient) {
        D2ServiceDiscoveryResponse d2ServiceDiscoveryResponse = (D2ServiceDiscoveryResponse) Mockito.mock(D2ServiceDiscoveryResponse.class);
        Mockito.when(Boolean.valueOf(d2ServiceDiscoveryResponse.isError())).thenReturn(false);
        Mockito.when(d2ServiceDiscoveryResponse.getCluster()).thenReturn("some-cluster");
        Mockito.when(controllerClient.discoverCluster(STORE_NAME)).thenReturn(d2ServiceDiscoveryResponse);
    }

    private StoreResponse getMockStoreResponse(Map<String, Integer> map, int i) {
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        StoreInfo storeInfo = (StoreInfo) Mockito.mock(StoreInfo.class);
        Mockito.when(storeInfo.getColoToCurrentVersions()).thenReturn(map);
        Mockito.when(Integer.valueOf(storeInfo.getCurrentVersion())).thenReturn(Integer.valueOf(i));
        Mockito.when(storeResponse.getStore()).thenReturn(storeInfo);
        return storeResponse;
    }

    private StoreResponse getMockHybridStoreResponse(Map<String, Integer> map, int i, BufferReplayPolicy bufferReplayPolicy) {
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        HybridStoreConfig hybridStoreConfig = (HybridStoreConfig) Mockito.mock(HybridStoreConfig.class);
        StoreInfo storeInfo = (StoreInfo) Mockito.mock(StoreInfo.class);
        Mockito.when(hybridStoreConfig.getBufferReplayPolicy()).thenReturn(bufferReplayPolicy);
        Mockito.when(storeInfo.getHybridStoreConfig()).thenReturn(hybridStoreConfig);
        Mockito.when(storeInfo.getColoToCurrentVersions()).thenReturn(map);
        Mockito.when(Integer.valueOf(storeInfo.getCurrentVersion())).thenReturn(Integer.valueOf(i));
        Mockito.when(storeResponse.getStore()).thenReturn(storeInfo);
        return storeResponse;
    }

    private RepushInfoResponse getMockRepushResponse(int i) {
        RepushInfoResponse repushInfoResponse = (RepushInfoResponse) Mockito.mock(RepushInfoResponse.class);
        RepushInfo repushInfo = (RepushInfo) Mockito.mock(RepushInfo.class);
        Version version = (Version) Mockito.mock(Version.class);
        Mockito.when(Integer.valueOf(version.getNumber())).thenReturn(Integer.valueOf(i));
        Mockito.when(repushInfo.getVersion()).thenReturn(version);
        Mockito.when(repushInfo.getKafkaBrokerUrl()).thenReturn("kafkaUrl");
        Mockito.when(repushInfoResponse.getRepushInfo()).thenReturn(repushInfo);
        return repushInfoResponse;
    }

    private Properties getJobProperties(Map<String, String> map) {
        Properties properties = new Properties();
        Objects.requireNonNull(properties);
        map.forEach(properties::setProperty);
        properties.setProperty("multi.region", "false");
        properties.setProperty("source.grid.fabric", "child_region");
        properties.setProperty("d2.zk.hosts.child_region", "child.zk.com:1234");
        properties.setProperty("ssl.key.password.property.name", "something");
        properties.setProperty("ssl.key.store.password.property.name", "something");
        properties.setProperty("ssl.key.store.property.name", "something");
        properties.setProperty("ssl.trust.store.property.name", "something");
        properties.setProperty("source.kafka", "true");
        properties.setProperty("kafka.input.broker.url", "some-kafka-input-broker-url");
        properties.setProperty("venice.discover.urls", "some-venice-URL");
        return properties;
    }
}
