package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import java.util.function.BiConsumer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerConfigTest.class */
public class ApacheKafkaProducerConfigTest {
    private static final String KAFKA_BROKER_ADDR = "kafka.broker.com:8181";
    private static final String PRODUCER_NAME = "sender-store_v1";
    private static final String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"foo\" password=\"bar\"\n";
    private static final String SASL_MECHANISM = "PLAIN";

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Required property: kafka.bootstrap.servers is missing.*")
    public void testConfiguratorThrowsAnExceptionWhenBrokerAddressIsMissing() {
        new ApacheKafkaProducerConfig(VeniceProperties.empty(), (String) null, PRODUCER_NAME, true);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*requiredConfigKey: 'key.serializer', requiredConfigValue:.*")
    public void testValidateAndUpdatePropertiesShouldThrowAnErrorIfKeySerIsIncorrect() {
        Properties properties = new Properties();
        properties.put("kafka.key.serializer", Object.class.getName());
        new ApacheKafkaProducerConfig(new VeniceProperties(properties), KAFKA_BROKER_ADDR, PRODUCER_NAME, true);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*requiredConfigKey: 'value.serializer', requiredConfigValue:.*")
    public void testValidateAndUpdatePropertiesShouldThrowAnErrorIfValSerIsIncorrect() {
        Properties properties = new Properties();
        properties.put("kafka.key.serializer", KafkaKeySerializer.class.getName());
        properties.put("kafka.value.serializer", Object.class.getName());
        new ApacheKafkaProducerConfig(new VeniceProperties(properties), KAFKA_BROKER_ADDR, PRODUCER_NAME, true);
    }

    @Test
    public void testValidateOrPopulatePropCanFillMissingConfigs() {
        Properties producerProperties = new ApacheKafkaProducerConfig(VeniceProperties.empty(), KAFKA_BROKER_ADDR, PRODUCER_NAME, true).getProducerProperties();
        Assert.assertTrue(producerProperties.containsKey("key.serializer"));
        Assert.assertTrue(producerProperties.containsKey("value.serializer"));
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Failed to load the specified class: ThisIsBogusClass for key: key.serializer")
    public void testValidateClassPropFailsToLoadGarbageClass() {
        Properties properties = new Properties();
        properties.put("kafka.key.serializer", "ThisIsBogusClass");
        new ApacheKafkaProducerConfig(new VeniceProperties(properties), KAFKA_BROKER_ADDR, PRODUCER_NAME, false);
    }

    @Test
    public void testGetBrokerAddress() {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", KAFKA_BROKER_ADDR);
        ApacheKafkaProducerConfig apacheKafkaProducerConfig = new ApacheKafkaProducerConfig(properties);
        Assert.assertNotNull(apacheKafkaProducerConfig);
        Assert.assertEquals(apacheKafkaProducerConfig.getBrokerAddress(), KAFKA_BROKER_ADDR);
        Assert.assertFalse(apacheKafkaProducerConfig.getProducerProperties().containsKey("client.id"));
        ApacheKafkaProducerConfig apacheKafkaProducerConfig2 = new ApacheKafkaProducerConfig(new VeniceProperties(properties), "overridden.addr", PRODUCER_NAME, false);
        Assert.assertEquals(apacheKafkaProducerConfig2.getBrokerAddress(), "overridden.addr");
        Assert.assertEquals(apacheKafkaProducerConfig2.getProducerProperties().getProperty("client.id"), PRODUCER_NAME);
    }

    @Test
    public void testGetBrokerAddressReturnsSslAddrIfKafkaSslIsEnabled() {
        Properties properties = new Properties();
        properties.put("ssl.to.kakfa", true);
        properties.put("kafka.bootstrap.servers", KAFKA_BROKER_ADDR);
        properties.put("ssl.kafka.bootstrap.servers", "ssl.kafka.broker.com:8182");
        Assert.assertEquals(new ApacheKafkaProducerConfig(properties).getBrokerAddress(), "ssl.kafka.broker.com:8182");
    }

    @Test
    public void testSaslConfiguration() {
        Properties properties = new Properties();
        properties.put("ssl.to.kakfa", true);
        properties.put("kafka.bootstrap.servers", KAFKA_BROKER_ADDR);
        properties.put("ssl.kafka.bootstrap.servers", "ssl.kafka.broker.com:8182");
        properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG);
        properties.put("kafka.sasl.mechanism", SASL_MECHANISM);
        properties.put("kafka.security.protocol", "SASL_SSL");
        Properties producerProperties = new ApacheKafkaProducerConfig(properties).getProducerProperties();
        Assert.assertEquals(SASL_JAAS_CONFIG, producerProperties.get("sasl.jaas.config"));
        Assert.assertEquals(SASL_MECHANISM, producerProperties.get("sasl.mechanism"));
        Assert.assertEquals("SASL_SSL", producerProperties.get("security.protocol"));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "stripPrefix")
    public static Object[][] stripPrefix() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(dataProvider = "stripPrefix")
    public void testCopySaslConfiguration(boolean z) {
        Properties properties = new Properties();
        properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG);
        properties.put("kafka.sasl.mechanism", SASL_MECHANISM);
        properties.put("kafka.security.protocol", "SASL_SSL");
        testCopy(z, properties, (properties2, properties3) -> {
            ApacheKafkaProducerConfig.copyKafkaSASLProperties(new VeniceProperties(properties2), properties3, z);
        });
        testCopy(z, properties, (properties4, properties5) -> {
            ApacheKafkaProducerConfig.copyKafkaSASLProperties(properties4, properties5, z);
        });
    }

    private static void testCopy(boolean z, Properties properties, BiConsumer<Properties, Properties> biConsumer) {
        Properties properties2 = new Properties();
        biConsumer.accept(properties, properties2);
        if (z) {
            Assert.assertEquals(SASL_JAAS_CONFIG, properties2.get("sasl.jaas.config"));
            Assert.assertEquals(SASL_MECHANISM, properties2.get("sasl.mechanism"));
            Assert.assertEquals("SASL_SSL", properties2.get("security.protocol"));
        } else {
            Assert.assertEquals(SASL_JAAS_CONFIG, properties2.get("kafka.sasl.jaas.config"));
            Assert.assertEquals(SASL_MECHANISM, properties2.get("kafka.sasl.mechanism"));
            Assert.assertEquals("SASL_SSL", properties2.get("kafka.security.protocol"));
        }
    }
}
