package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.ConfigConstants;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.xerces.impl.xs.SchemaSymbols;
import org.jboss.netty.channel.ChannelPipelineCoverage;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerConfig.class */
public class ApacheKafkaProducerConfig {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ApacheKafkaProducerConfig.class);
    public static final String KAFKA_CONFIG_PREFIX = "kafka.";
    public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    public static final String KAFKA_PRODUCER_RETRIES_CONFIG = "kafka.retries";
    public static final String KAFKA_LINGER_MS = "kafka.linger.ms";
    public static final String KAFKA_BATCH_SIZE = "kafka.batch.size";
    public static final String KAFKA_BUFFER_MEMORY = "kafka.buffer.memory";
    public static final String KAFKA_CLIENT_ID = "kafka.client.id";
    public static final String KAFKA_KEY_SERIALIZER = "kafka.key.serializer";
    public static final String KAFKA_VALUE_SERIALIZER = "kafka.value.serializer";
    public static final String KAFKA_PRODUCER_DELIVERY_TIMEOUT_MS = "kafka.delivery.timeout.ms";
    public static final String KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = "kafka.request.timeout.ms";
    public static final String SSL_KAFKA_BOOTSTRAP_SERVERS = "ssl.kafka.bootstrap.servers";
    public static final String SSL_TO_KAFKA = "ssl.to.kakfa";
    public static final String KAFKA_SASL_JAAS_CONFIG = "sasl.jaas.config";
    public static final String KAFKA_SASL_MECHANISM = "sasl.mechanism";
    private final Properties producerProperties;

    public ApacheKafkaProducerConfig(Properties properties) {
        this(new VeniceProperties(properties), null, null, true);
    }

    public ApacheKafkaProducerConfig(VeniceProperties veniceProperties, String str, String str2, boolean z) {
        String pubsubBrokerAddress = str != null ? str : getPubsubBrokerAddress(veniceProperties);
        this.producerProperties = veniceProperties.clipAndFilterNamespace("kafka.").toProperties();
        this.producerProperties.put("bootstrap.servers", pubsubBrokerAddress);
        validateAndUpdateProperties(this.producerProperties, z);
        if (str2 != null) {
            this.producerProperties.put("client.id", str2);
        }
        if (KafkaSSLUtils.validateAndCopyKafkaSSLConfig(veniceProperties, this.producerProperties)) {
            LOGGER.info("Will initialize an SSL Kafka producer");
        } else {
            LOGGER.info("Will initialize a non-SSL Kafka producer");
        }
        LOGGER.info("Kafka producer properties: {}", this.producerProperties);
    }

    public Properties getProducerProperties() {
        return this.producerProperties;
    }

    public static String getPubsubBrokerAddress(VeniceProperties veniceProperties) {
        if (Boolean.parseBoolean(veniceProperties.getString("ssl.to.kakfa", "false"))) {
            checkProperty(veniceProperties, "ssl.kafka.bootstrap.servers");
            return veniceProperties.getString("ssl.kafka.bootstrap.servers");
        }
        checkProperty(veniceProperties, "kafka.bootstrap.servers");
        return veniceProperties.getString("kafka.bootstrap.servers");
    }

    private static void checkProperty(VeniceProperties veniceProperties, String str) {
        if (!veniceProperties.containsKey(str)) {
            throw new VeniceException("Invalid properties for Kafka producer factory. Required property: " + str + " is missing.");
        }
    }

    public String getBrokerAddress() {
        return this.producerProperties.getProperty("bootstrap.servers");
    }

    private void validateAndUpdateProperties(Properties properties, boolean z) {
        validateClassProp(properties, z, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaKeySerializer.class.getName());
        validateClassProp(properties, z, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaValueSerializer.class.getName());
        validateOrPopulateProp(properties, z, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, SchemaSymbols.ATTVAL_TRUE_1);
        validateOrPopulateProp(properties, z, ProducerConfig.ACKS_CONFIG, ChannelPipelineCoverage.ALL);
        if (!properties.containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
            properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "300000");
        }
        if (!properties.containsKey("retries")) {
            properties.setProperty("retries", Integer.toString(Integer.MAX_VALUE));
        }
        if (!properties.contains("retry.backoff.ms")) {
            validateOrPopulateProp(properties, z, "retry.backoff.ms", ConfigConstants.DEFAULT_KAFKA_LINGER_MS);
        }
        if (!properties.containsKey(ProducerConfig.MAX_BLOCK_MS_CONFIG)) {
            validateOrPopulateProp(properties, z, ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
        }
        if (properties.containsKey("compression.type")) {
            LOGGER.info("Compression type explicitly specified by config: {}", properties.getProperty("compression.type"));
        } else {
            properties.put("compression.type", "gzip");
        }
    }

    private void validateOrPopulateProp(Properties properties, boolean z, String str, String str2) {
        String property = properties.getProperty(str);
        if (property == null) {
            properties.setProperty(str, str2);
        } else if (!property.equals(str2) && z) {
            throw new VeniceException("The Kafka Producer must use certain configuration settings in order to work properly. requiredConfigKey: '" + str + "', requiredConfigValue: '" + str2 + "', actualConfigValue: '" + property + "'.");
        }
    }

    private void validateClassProp(Properties properties, boolean z, String str, String str2) {
        validateOrPopulateProp(properties, z, str, str2);
        String property = properties.getProperty(str);
        if (property == null) {
            return;
        }
        try {
            properties.put(str, Class.forName(property));
        } catch (ClassNotFoundException e) {
            throw new VeniceException("Failed to load the specified class: " + property + " for key: " + str, e);
        }
    }
}
