package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import it.unimi.dsi.fastutil.objects.Object2DoubleMap;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.TimeoutException;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterTest.class */
public class ApacheKafkaProducerAdapterTest {
    private KafkaProducer<KafkaKey, KafkaMessageEnvelope> kafkaProducerMock;
    private ApacheKafkaProducerConfig producerConfigMock;
    private static final String TOPIC_NAME = "test-topic";
    private final KafkaKey testKafkaKey = new KafkaKey(MessageType.DELETE, "key".getBytes());
    private final KafkaMessageEnvelope testKafkaValue = new KafkaMessageEnvelope();

    @BeforeMethod
    public void setupMocks() {
        this.kafkaProducerMock = (KafkaProducer) Mockito.mock(KafkaProducer.class);
        this.producerConfigMock = (ApacheKafkaProducerConfig) Mockito.mock(ApacheKafkaProducerConfig.class);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "The internal KafkaProducer has been closed")
    public void testEnsureProducerIsNotClosedThrowsExceptionWhenProducerIsClosed() {
        ApacheKafkaProducerAdapter apacheKafkaProducerAdapter = new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock);
        ((KafkaProducer) Mockito.doNothing().when(this.kafkaProducerMock)).close((Duration) ArgumentMatchers.any());
        apacheKafkaProducerAdapter.close(10, false);
        apacheKafkaProducerAdapter.sendMessage(TOPIC_NAME, 0, this.testKafkaKey, this.testKafkaValue, (PubSubMessageHeaders) null, (PubSubProducerCallback) null);
    }

    @Test
    public void testGetNumberOfPartitions() {
        Mockito.when(this.kafkaProducerMock.partitionsFor(TOPIC_NAME)).thenReturn(new ArrayList());
        Assert.assertEquals(new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock).getNumberOfPartitions(TOPIC_NAME), 0);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Got an error while trying to produce message into Kafka.*")
    public void testSendMessageThrowsAnExceptionOnTimeout() {
        ((KafkaProducer) Mockito.doThrow(TimeoutException.class).when(this.kafkaProducerMock)).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock).sendMessage(TOPIC_NAME, 42, this.testKafkaKey, this.testKafkaValue, (PubSubMessageHeaders) null, (PubSubProducerCallback) null);
    }

    @Test
    public void testSendMessageInteractionWithInternalProducer() {
        ApacheKafkaProducerAdapter apacheKafkaProducerAdapter = new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(this.kafkaProducerMock.send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.isNull())).thenReturn(future);
        Assert.assertNotNull(apacheKafkaProducerAdapter.sendMessage(TOPIC_NAME, 42, this.testKafkaKey, this.testKafkaValue, (PubSubMessageHeaders) null, (PubSubProducerCallback) null));
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.never())).send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.isNull());
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.times(1))).send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class));
        PubSubProducerCallback pubSubProducerCallback = (PubSubProducerCallback) Mockito.mock(PubSubProducerCallback.class);
        Mockito.when(this.kafkaProducerMock.send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class))).thenReturn(future);
        Assert.assertNotNull(apacheKafkaProducerAdapter.sendMessage(TOPIC_NAME, 42, this.testKafkaKey, this.testKafkaValue, (PubSubMessageHeaders) null, pubSubProducerCallback));
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.never())).send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.isNull());
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.times(2))).send((ProducerRecord) ArgumentMatchers.any(ProducerRecord.class), (Callback) ArgumentMatchers.any(Callback.class));
    }

    @Test
    public void testCloseInvokesProducerFlushAndClose() {
        ((KafkaProducer) Mockito.doNothing().when(this.kafkaProducerMock)).flush(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock).close(10, true);
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.times(1))).flush(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.times(1))).close((Duration) ArgumentMatchers.any(Duration.class));
    }

    @Test
    public void testFlushInvokesInternalProducerFlushIfProducerIsNotClosed() {
        ((KafkaProducer) Mockito.doNothing().when(this.kafkaProducerMock)).flush();
        new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock).flush();
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.times(1))).flush();
    }

    @Test
    public void testFlushDoesNotInvokeInternalProducerFlushIfProducerIs1Closed() {
        ((KafkaProducer) Mockito.doNothing().when(this.kafkaProducerMock)).flush();
        ((KafkaProducer) Mockito.doNothing().when(this.kafkaProducerMock)).close();
        ApacheKafkaProducerAdapter apacheKafkaProducerAdapter = new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock);
        apacheKafkaProducerAdapter.close(10, false);
        apacheKafkaProducerAdapter.flush();
        ((KafkaProducer) Mockito.verify(this.kafkaProducerMock, Mockito.never())).flush();
    }

    @Test
    public void testGetMeasurableProducerMetricsReturnsEmptyMapWhenProducerIsClosed() {
        ((KafkaProducer) Mockito.doNothing().when(this.kafkaProducerMock)).close();
        ApacheKafkaProducerAdapter apacheKafkaProducerAdapter = new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock);
        apacheKafkaProducerAdapter.close(10, false);
        Object2DoubleMap measurableProducerMetrics = apacheKafkaProducerAdapter.getMeasurableProducerMetrics();
        Assert.assertNotNull(measurableProducerMetrics, "Returned metrics cannot be null");
        Assert.assertEquals(measurableProducerMetrics.size(), 0, "Should return empty metrics when producer is closed");
    }

    @Test
    public void testGetMeasurableProducerMetricsReturnsMetricsOfTypeDouble() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        MetricName metricName = new MetricName("metric-1", "g1", "desc", new HashMap());
        Metric metric = (Metric) Mockito.mock(Metric.class);
        ((Metric) Mockito.doReturn(Double.valueOf(20.23d)).when(metric)).metricValue();
        linkedHashMap.put(metricName, metric);
        MetricName metricName2 = new MetricName("metric-2", "g1", "desc", new HashMap());
        Metric metric2 = (Metric) Mockito.mock(Metric.class);
        ((Metric) Mockito.doReturn(314).when(metric2)).metricValue();
        linkedHashMap.put(metricName2, metric2);
        MetricName metricName3 = new MetricName("metric-3", "g1", "desc", new HashMap());
        Metric metric3 = (Metric) Mockito.mock(Metric.class);
        ((Metric) Mockito.doThrow(new Throwable[]{new NullPointerException("Failed to extract value of metric-3")}).when(metric3)).metricValue();
        linkedHashMap.put(metricName3, metric3);
        ((KafkaProducer) Mockito.doReturn(linkedHashMap).when(this.kafkaProducerMock)).metrics();
        Object2DoubleMap measurableProducerMetrics = new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock).getMeasurableProducerMetrics();
        Assert.assertNotNull(measurableProducerMetrics, "Returned metrics cannot be null");
        Assert.assertEquals(measurableProducerMetrics.size(), 1);
        Assert.assertTrue(measurableProducerMetrics.containsKey("metric-1"));
        Assert.assertEquals(measurableProducerMetrics.get("metric-1"), Double.valueOf(20.23d));
    }

    @Test
    public void testGetBrokerAddress() {
        Mockito.when(this.producerConfigMock.getBrokerAddress()).thenReturn("venice.kafka.db:2023");
        Assert.assertEquals(new ApacheKafkaProducerAdapter(this.producerConfigMock, this.kafkaProducerMock).getBrokerAddress(), "venice.kafka.db:2023");
        ((ApacheKafkaProducerConfig) Mockito.verify(this.producerConfigMock, Mockito.times(1))).getBrokerAddress();
    }
}
