package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.pubsub.adapter.PubSubProducerCallbackSimpleImpl;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerCallbackTest.class */
public class ApacheKafkaProducerCallbackTest {
    @Test
    public void testOnCompletionWithNullExceptionShouldInvokeInternalCallbackWithNullException() {
        PubSubProducerCallbackSimpleImpl pubSubProducerCallbackSimpleImpl = new PubSubProducerCallbackSimpleImpl();
        ApacheKafkaProducerCallback apacheKafkaProducerCallback = new ApacheKafkaProducerCallback(pubSubProducerCallbackSimpleImpl);
        RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("topicX", 42), 0L, 1L, 1676397545L, 1L, 11, 12);
        apacheKafkaProducerCallback.onCompletion(recordMetadata, (Exception) null);
        Assert.assertTrue(pubSubProducerCallbackSimpleImpl.isInvoked());
        Assert.assertNull(pubSubProducerCallbackSimpleImpl.getException());
        Assert.assertNotNull(pubSubProducerCallbackSimpleImpl.getProduceResult());
        PubSubProduceResult produceResult = pubSubProducerCallbackSimpleImpl.getProduceResult();
        Assert.assertEquals(produceResult.getTopic(), recordMetadata.topic());
        Assert.assertEquals(produceResult.getPartition(), recordMetadata.partition());
        Assert.assertEquals(produceResult.getOffset(), recordMetadata.offset());
    }

    @Test
    public void testOnCompletionWithNonNullExceptionShouldInvokeInternalCallbackWithNonNullException() {
        PubSubProducerCallbackSimpleImpl pubSubProducerCallbackSimpleImpl = new PubSubProducerCallbackSimpleImpl();
        ApacheKafkaProducerCallback apacheKafkaProducerCallback = new ApacheKafkaProducerCallback(pubSubProducerCallbackSimpleImpl);
        RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("topicX", 42), -1L, -1L, -1L, -1L, -1, -1);
        UnknownTopicOrPartitionException unknownTopicOrPartitionException = new UnknownTopicOrPartitionException("Unknown topic: topicX");
        apacheKafkaProducerCallback.onCompletion(recordMetadata, unknownTopicOrPartitionException);
        Assert.assertTrue(pubSubProducerCallbackSimpleImpl.isInvoked());
        Assert.assertNotNull(pubSubProducerCallbackSimpleImpl.getException());
        Assert.assertEquals(pubSubProducerCallbackSimpleImpl.getException(), unknownTopicOrPartitionException);
        Assert.assertNotNull(pubSubProducerCallbackSimpleImpl.getProduceResult());
        PubSubProduceResult produceResult = pubSubProducerCallbackSimpleImpl.getProduceResult();
        Assert.assertEquals(produceResult.getTopic(), recordMetadata.topic());
        Assert.assertEquals(produceResult.getPartition(), recordMetadata.partition());
        Assert.assertEquals(produceResult.getOffset(), recordMetadata.offset());
    }

    @Test(expectedExceptions = {ExecutionException.class}, expectedExceptionsMessageRegExp = ".*Unknown topic: topicX.*")
    public void testOnCompletionWithNonNullExceptionShouldCompleteFutureExceptionally() throws ExecutionException, InterruptedException {
        PubSubProducerCallbackSimpleImpl pubSubProducerCallbackSimpleImpl = new PubSubProducerCallbackSimpleImpl();
        ApacheKafkaProducerCallback apacheKafkaProducerCallback = new ApacheKafkaProducerCallback(pubSubProducerCallbackSimpleImpl);
        Future produceResultFuture = apacheKafkaProducerCallback.getProduceResultFuture();
        Assert.assertFalse(produceResultFuture.isDone());
        Assert.assertFalse(produceResultFuture.isCancelled());
        apacheKafkaProducerCallback.onCompletion(new RecordMetadata(new TopicPartition("topicX", 42), 0L, 1L, 1676397545L, 1L, 11, 12), new UnknownTopicOrPartitionException("Unknown topic: topicX"));
        Assert.assertTrue(pubSubProducerCallbackSimpleImpl.isInvoked());
        Assert.assertTrue(produceResultFuture.isDone());
        Assert.assertFalse(produceResultFuture.isCancelled());
        produceResultFuture.get();
    }

    @Test
    public void testOnCompletionWithNonNullExceptionShouldCompleteFuture() throws ExecutionException, InterruptedException {
        PubSubProducerCallbackSimpleImpl pubSubProducerCallbackSimpleImpl = new PubSubProducerCallbackSimpleImpl();
        ApacheKafkaProducerCallback apacheKafkaProducerCallback = new ApacheKafkaProducerCallback(pubSubProducerCallbackSimpleImpl);
        Future produceResultFuture = apacheKafkaProducerCallback.getProduceResultFuture();
        Assert.assertFalse(produceResultFuture.isDone());
        Assert.assertFalse(produceResultFuture.isCancelled());
        apacheKafkaProducerCallback.onCompletion(new RecordMetadata(new TopicPartition("topicX", 42), 0L, 1L, 1676397545L, 1L, 11, 12), (Exception) null);
        Assert.assertTrue(pubSubProducerCallbackSimpleImpl.isInvoked());
        Assert.assertTrue(produceResultFuture.isDone());
        Assert.assertFalse(produceResultFuture.isCancelled());
        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) produceResultFuture.get();
        Assert.assertNotNull(pubSubProduceResult);
        Assert.assertEquals(pubSubProduceResult.getTopic(), "topicX");
        Assert.assertEquals(pubSubProduceResult.getPartition(), 42);
    }
}
