package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.adapter.PubSubProducerCallbackSimpleImpl;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.lang.Thread;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.class */
public class ApacheKafkaProducerAdapterITest {
    private static final Logger LOGGER = LogManager.getLogger(ApacheKafkaProducerAdapterITest.class);
    private PubSubBrokerWrapper pubSubBrokerWrapper;
    private AdminClient kafkaAdminClient;
    private String topicName;
    private ApacheKafkaProducerAdapter producerAdapter;

    /* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest$SimpleBlockingCallback.class */
    private static class SimpleBlockingCallback implements PubSubProducerCallback {
        String idx;
        boolean block = true;
        boolean blockedSuccessfully = false;
        Lock mutex = new ReentrantLock();
        Condition blockCv = this.mutex.newCondition();
        Condition blockedSuccessfullyCv = this.mutex.newCondition();

        SimpleBlockingCallback(String str) {
            this.idx = str;
        }

        public void onCompletion(PubSubProduceResult pubSubProduceResult, Exception exc) {
            ApacheKafkaProducerAdapterITest.LOGGER.info("Blocking callback invoked. Executing thread will be blocked. KeyIdx: {}", this.idx);
            this.mutex.lock();
            while (this.block) {
                try {
                    try {
                        this.blockedSuccessfully = true;
                        this.blockedSuccessfullyCv.signal();
                        this.blockCv.await();
                    } catch (InterruptedException e) {
                        ApacheKafkaProducerAdapterITest.LOGGER.error("Something went wrong while waiting to receive unblock signal in blocking callback", e);
                        this.mutex.unlock();
                    }
                } finally {
                    this.mutex.unlock();
                }
            }
            ApacheKafkaProducerAdapterITest.LOGGER.info("Callback has unblocked executing thread. KeyIdx: {}", this.idx);
        }
    }

    @BeforeClass(alwaysRun = true)
    public void setupKafka() {
        this.pubSubBrokerWrapper = ServiceFactory.getPubSubBroker();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.pubSubBrokerWrapper.getAddress());
        this.kafkaAdminClient = KafkaAdminClient.create(properties);
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        this.kafkaAdminClient.close(Duration.ZERO);
        this.pubSubBrokerWrapper.close();
    }

    @BeforeMethod(alwaysRun = true)
    public void setupProducerAdapter() {
        this.topicName = Utils.getUniqueString("test-topic");
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.pubSubBrokerWrapper.getAddress());
        properties.put("kafka.client.id", this.topicName);
        this.producerAdapter = new ApacheKafkaProducerAdapter(new ApacheKafkaProducerConfig(properties));
    }

    @AfterMethod(alwaysRun = true)
    public void cleanUp() {
        if (this.producerAdapter != null) {
            this.producerAdapter.close(0, false);
        }
    }

    private void createTopic(String str, int i, int i2, Map<String, String> map) {
        try {
            this.kafkaAdminClient.createTopics(Collections.singleton(new NewTopic(str, i, (short) i2).configs(map))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Failed to create topic: {}", str, e);
            throw new RuntimeException(e);
        }
    }

    private KafkaKey getDummyKey() {
        return new KafkaKey(MessageType.PUT, Utils.getUniqueString("key-").getBytes());
    }

    private KafkaMessageEnvelope getDummyVal() {
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.producerMetadata = new ProducerMetadata();
        kafkaMessageEnvelope.producerMetadata.messageTimestamp = 0L;
        kafkaMessageEnvelope.producerMetadata.messageSequenceNumber = 0;
        kafkaMessageEnvelope.producerMetadata.segmentNumber = 0;
        kafkaMessageEnvelope.producerMetadata.producerGUID = new GUID();
        Put put = new Put();
        put.putValue = ByteBuffer.allocate(1024);
        put.replicationMetadataPayload = ByteBuffer.allocate(0);
        kafkaMessageEnvelope.payloadUnion = put;
        return kafkaMessageEnvelope;
    }

    @Test(timeOut = 90000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testProducerCloseDoesNotLeaveAnyFuturesIncomplete(boolean z) throws InterruptedException {
        createTopic(this.topicName, 1, 1, Collections.singletonMap("retention.ms", Long.toString(Long.MAX_VALUE)));
        KafkaKey dummyKey = getDummyKey();
        SimpleBlockingCallback simpleBlockingCallback = new SimpleBlockingCallback("m0Key");
        this.producerAdapter.sendMessage(this.topicName, (Integer) null, dummyKey, getDummyVal(), (PubSubMessageHeaders) null, simpleBlockingCallback);
        Future sendMessage = this.producerAdapter.sendMessage(this.topicName, (Integer) null, getDummyKey(), getDummyVal(), (PubSubMessageHeaders) null, new PubSubProducerCallbackSimpleImpl());
        simpleBlockingCallback.mutex.lock();
        while (!simpleBlockingCallback.blockedSuccessfully) {
            try {
                simpleBlockingCallback.blockedSuccessfullyCv.await();
            } finally {
            }
        }
        simpleBlockingCallback.mutex.unlock();
        Assert.assertFalse(sendMessage.isDone());
        LinkedHashMap linkedHashMap = new LinkedHashMap(100);
        for (int i = 2; i < 100; i++) {
            KafkaKey dummyKey2 = getDummyKey();
            PubSubProducerCallbackSimpleImpl pubSubProducerCallbackSimpleImpl = new PubSubProducerCallbackSimpleImpl();
            linkedHashMap.put(pubSubProducerCallbackSimpleImpl, this.producerAdapter.sendMessage(this.topicName, (Integer) null, dummyKey2, getDummyVal(), (PubSubMessageHeaders) null, pubSubProducerCallbackSimpleImpl));
        }
        Thread thread = new Thread(() -> {
            this.producerAdapter.close(z ? Integer.MAX_VALUE : 0, z);
        });
        thread.start();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(thread.getState().equals(Thread.State.WAITING) || thread.getState().equals(Thread.State.TIMED_WAITING) || thread.getState().equals(Thread.State.BLOCKED) || thread.getState().equals(Thread.State.TERMINATED));
        });
        linkedHashMap.forEach((pubSubProducerCallbackSimpleImpl2, future) -> {
            Assert.assertFalse(pubSubProducerCallbackSimpleImpl2.isInvoked());
            Assert.assertFalse(future.isDone());
        });
        LOGGER.info("Unblocking producer's ioThread (sender)...");
        simpleBlockingCallback.mutex.lock();
        try {
            simpleBlockingCallback.block = false;
            simpleBlockingCallback.blockCv.signal();
            simpleBlockingCallback.mutex.unlock();
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                Assert.assertTrue(sendMessage.isDone());
            });
            thread.join();
            for (int i2 = 0; i2 < 10; i2++) {
                try {
                    this.producerAdapter.sendMessage(this.topicName, (Integer) null, getDummyKey(), getDummyVal(), (PubSubMessageHeaders) null, (PubSubProducerCallback) null);
                    Assert.fail("Sending records after producer has been closed should not succeed");
                } catch (Exception e) {
                    LOGGER.info("As expected an exception was received - {}", e.toString());
                }
            }
            for (Map.Entry entry : linkedHashMap.entrySet()) {
                PubSubProducerCallbackSimpleImpl pubSubProducerCallbackSimpleImpl3 = (PubSubProducerCallbackSimpleImpl) entry.getKey();
                Future future2 = (Future) entry.getValue();
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertTrue(pubSubProducerCallbackSimpleImpl3.isInvoked());
                });
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Assert.assertTrue(future2.isDone());
                });
                Assert.assertFalse(future2.isCancelled());
                if (z) {
                    try {
                        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) future2.get();
                        Assert.assertNotNull(pubSubProduceResult);
                        Assert.assertNotEquals(Long.valueOf(pubSubProduceResult.getOffset()), -1);
                        Assert.assertNull(pubSubProducerCallbackSimpleImpl3.getException());
                    } catch (Exception e2) {
                        Assert.fail("When flush is enabled all messages should be sent to Kafka successfully");
                    }
                } else {
                    Assert.assertNotNull(pubSubProducerCallbackSimpleImpl3.getException());
                    Assert.assertNotNull(pubSubProducerCallbackSimpleImpl3.getException().getMessage());
                    Assert.assertTrue(pubSubProducerCallbackSimpleImpl3.getException().getMessage().contains("Producer is closed forcefully."));
                    try {
                        future2.get();
                        Assert.fail("Exceptionally completed future should throw an exception");
                    } catch (Exception e3) {
                        LOGGER.info("As expected an exception was received - {}", e3.toString());
                    }
                }
            }
        } finally {
        }
    }

    @Test(timeOut = 30000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testProducerCloseCanUnblockSendMessageCallerThread(boolean z) {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future<?> submit = newCachedThreadPool.submit(() -> {
            Thread.currentThread().setName("sendMessageThread");
            countDownLatch.countDown();
            try {
                this.producerAdapter.sendMessage("topic", (Integer) null, getDummyKey(), getDummyVal(), (PubSubMessageHeaders) null, (PubSubProducerCallback) null);
                LOGGER.error("Expectations were not met in thread: {}", Thread.currentThread().getName());
                Assert.fail("sendMessage on non-existent topic should have blocked the executing thread");
            } catch (VeniceException e) {
                LOGGER.info("As expected an exception has been received from sendMessage()", e);
                Assert.assertNotNull(e.getMessage(), "Exception thrown by sendMessage does not have a message");
                Assert.assertTrue(e.getMessage().contains("Got an error while trying to produce message into Kafka. Topic: 'topic'"));
                Assert.assertTrue(ExceptionUtils.recursiveMessageContains(e, "Producer closed while send in progress"));
                Assert.assertTrue(ExceptionUtils.recursiveMessageContains(e, "Requested metadata update after close"));
                LOGGER.info("All expectations were met in thread: {}", Thread.currentThread().getName());
            }
        });
        try {
            try {
                countDownLatch.await();
                Utils.sleep(50L);
                this.producerAdapter.close(5000, z);
                submit.get();
                newCachedThreadPool.shutdownNow();
            } catch (Exception e) {
                Assert.fail("Producer closing should have succeeded without an exception", e);
                newCachedThreadPool.shutdownNow();
            }
        } catch (Throwable th) {
            newCachedThreadPool.shutdownNow();
            throw th;
        }
    }
}
