package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaUtils;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import it.unimi.dsi.fastutil.objects.Object2DoubleMap;
import it.unimi.dsi.fastutil.objects.Object2DoubleMaps;
import it.unimi.dsi.fastutil.objects.Object2DoubleOpenHashMap;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.HealthStat;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapter.class */
public class ApacheKafkaProducerAdapter implements PubSubProducerAdapter {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ApacheKafkaProducerAdapter.class);
    private KafkaProducer<KafkaKey, KafkaMessageEnvelope> producer;
    private final ApacheKafkaProducerConfig producerConfig;

    public static final String mapToPulsar(String str) {
        return str.replace(HealthStat.statFieldDelim, "/");
    }

    public static final TopicPartition mapFromPulsar(TopicPartition topicPartition) {
        return new TopicPartition(topicPartition.topic().replace("/", HealthStat.statFieldDelim), topicPartition.partition());
    }

    public static final TopicPartition mapToPulsar(TopicPartition topicPartition) {
        return new TopicPartition(mapToPulsar(topicPartition.topic()), topicPartition.partition());
    }

    public ApacheKafkaProducerAdapter(ApacheKafkaProducerConfig apacheKafkaProducerConfig) {
        this(apacheKafkaProducerConfig, new KafkaProducer(apacheKafkaProducerConfig.getProducerProperties()));
    }

    ApacheKafkaProducerAdapter(ApacheKafkaProducerConfig apacheKafkaProducerConfig, KafkaProducer<KafkaKey, KafkaMessageEnvelope> kafkaProducer) {
        this.producerConfig = apacheKafkaProducerConfig;
        this.producer = kafkaProducer;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubProducerAdapter
    @Deprecated
    public int getNumberOfPartitions(String str) {
        ensureProducerIsNotClosed();
        return this.producer.partitionsFor(mapToPulsar(str)).size();
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubProducerAdapter
    public Future<PubSubProduceResult> sendMessage(String str, Integer num, KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubMessageHeaders pubSubMessageHeaders, PubSubProducerCallback pubSubProducerCallback) {
        String mapToPulsar = mapToPulsar(str);
        ensureProducerIsNotClosed();
        ProducerRecord<KafkaKey, KafkaMessageEnvelope> producerRecord = new ProducerRecord<>(mapToPulsar, num, kafkaKey, kafkaMessageEnvelope, ApacheKafkaUtils.convertToKafkaSpecificHeaders(pubSubMessageHeaders));
        ApacheKafkaProducerCallback apacheKafkaProducerCallback = new ApacheKafkaProducerCallback(pubSubProducerCallback);
        try {
            this.producer.send(producerRecord, apacheKafkaProducerCallback);
            return apacheKafkaProducerCallback.getProduceResultFuture();
        } catch (Exception e) {
            throw new VeniceException("Got an error while trying to produce message into Kafka. Topic: '" + producerRecord.topic() + "', partition: " + producerRecord.partition(), e);
        }
    }

    private void ensureProducerIsNotClosed() {
        if (this.producer == null) {
            throw new VeniceException("The internal KafkaProducer has been closed");
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubProducerAdapter
    public void flush() {
        if (this.producer != null) {
            this.producer.flush();
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubProducerAdapter
    public void close(int i, boolean z) {
        if (this.producer == null) {
            return;
        }
        if (z) {
            this.producer.flush(i, TimeUnit.MILLISECONDS);
            LOGGER.info("Flushed all the messages in producer before closing");
        }
        this.producer.close(Duration.ofMillis(i));
        this.producer = null;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubProducerAdapter
    public Object2DoubleMap<String> getMeasurableProducerMetrics() {
        if (this.producer == null) {
            return Object2DoubleMaps.emptyMap();
        }
        Set<Map.Entry<MetricName, ? extends Metric>> entrySet = this.producer.metrics().entrySet();
        Object2DoubleOpenHashMap object2DoubleOpenHashMap = new Object2DoubleOpenHashMap(entrySet.size());
        for (Map.Entry<MetricName, ? extends Metric> entry : entrySet) {
            try {
                Object metricValue = entry.getValue().metricValue();
                if (metricValue instanceof Double) {
                    object2DoubleOpenHashMap.put((Object2DoubleOpenHashMap) entry.getKey().name(), ((Double) metricValue).doubleValue());
                }
            } catch (Exception e) {
                LOGGER.warn("Caught exception: {} when attempting to get producer metrics. Incomplete metrics might be returned.", e.getMessage());
            }
        }
        return object2DoubleOpenHashMap;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubProducerAdapter
    public String getBrokerAddress() {
        return this.producerConfig.getBrokerAddress();
    }
}
