package com.linkedin.venice.writer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.PubSubSharedProducerAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.producer.SharedKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/writer/PubSubSharedProducerAdapterFactoryTest.class */
public class PubSubSharedProducerAdapterFactoryTest {
    private static final Logger LOGGER = LogManager.getLogger(PubSubSharedProducerAdapterFactoryTest.class);
    private PubSubBrokerWrapper pubSubBrokerWrapper;
    private TopicManager topicManager;
    private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory;
    private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeClass
    public void setUp() {
        this.pubSubBrokerWrapper = ServiceFactory.getPubSubBroker();
        this.pubSubConsumerAdapterFactory = IntegrationTestPushUtils.getVeniceConsumerFactory();
        this.topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, this.pubSubBrokerWrapper.getAddress(), this.pubSubTopicRepository).getTopicManager();
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.topicManager, this.pubSubBrokerWrapper});
    }

    @Test(timeOut = 60000)
    public void testSharedProducerWithNonExistingTopic() throws Exception {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic("test-topic-1_v1");
        this.topicManager.createTopic(topic, 1, 1, true);
        SharedKafkaProducerAdapterFactory sharedKafkaProducerAdapterFactory = null;
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.pubSubBrokerWrapper.getAddress());
        try {
            properties.put("partitioner.class", DefaultVenicePartitioner.class.getName());
            properties.put("kafka.buffer.memory", "16384");
            sharedKafkaProducerAdapterFactory = TestUtils.getSharedKafkaProducerService(properties);
            VeniceWriterFactory veniceWriterFactoryWithSharedProducer = TestUtils.getVeniceWriterFactoryWithSharedProducer(properties, sharedKafkaProducerAdapterFactory);
            VeniceWriter createVeniceWriter = veniceWriterFactoryWithSharedProducer.createVeniceWriter(new VeniceWriterOptions.Builder(topic.getName()).setUseKafkaKeySerializer(true).setPartitionCount(1).build());
            try {
                CountDownLatch countDownLatch = new CountDownLatch(100);
                for (int i = 0; i < 100 && !Thread.interrupted(); i++) {
                    try {
                        createVeniceWriter.put(new KafkaKey(MessageType.PUT, "topic1".getBytes()), "topic1".getBytes(), 1, (pubSubProduceResult, exc) -> {
                            if (exc != null) {
                                LOGGER.error("Error when producing to an existing topic: {}", topic, exc);
                            } else {
                                LOGGER.info("produced offset test-topic-1: {}", Long.valueOf(pubSubProduceResult.getOffset()));
                                countDownLatch.countDown();
                            }
                        });
                    } catch (VeniceException e) {
                        LOGGER.error("Exception: ", e);
                    }
                }
                countDownLatch.await();
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
                Thread thread = null;
                Closeable closeable = null;
                try {
                    closeable = veniceWriterFactoryWithSharedProducer.createVeniceWriter(new VeniceWriterOptions.Builder("test-topic-2_v1").setUseKafkaKeySerializer(true).setPartitionCount(1).build());
                    AtomicInteger atomicInteger = new AtomicInteger();
                    thread = new Thread(() -> {
                        for (int i2 = 0; i2 < 100 && !Thread.interrupted(); i2++) {
                            try {
                                closeable.put(new KafkaKey(MessageType.PUT, "topic2".getBytes()), "topic2".getBytes(), 1, (pubSubProduceResult2, exc2) -> {
                                    atomicInteger.getAndIncrement();
                                });
                            } catch (VeniceException e2) {
                                LOGGER.error("Exception: ", e2);
                            }
                        }
                    });
                    thread.start();
                    Thread.sleep(1000L);
                    Assert.assertEquals(atomicInteger.get(), 0);
                    if (thread != null) {
                        TestUtils.shutdownThread(thread);
                    }
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                    if (sharedKafkaProducerAdapterFactory != null) {
                        sharedKafkaProducerAdapterFactory.close();
                    }
                    PubSubConsumerAdapter create = this.pubSubConsumerAdapterFactory.create(new VeniceProperties(properties), false, new KafkaPubSubMessageDeserializer(new KafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)), "");
                    try {
                        Assert.assertTrue(create.endOffset(new PubSubTopicPartitionImpl(topic, 0)).longValue() > 100);
                        if (create != null) {
                            create.close();
                        }
                    } catch (Throwable th) {
                        if (create != null) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (thread != null) {
                        TestUtils.shutdownThread(thread);
                    }
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (sharedKafkaProducerAdapterFactory != null) {
                sharedKafkaProducerAdapterFactory.close();
            }
            throw th4;
        }
    }

    @Test
    public void testProducerReuse() throws Exception {
        SharedKafkaProducerAdapterFactory sharedKafkaProducerAdapterFactory = null;
        try {
            sharedKafkaProducerAdapterFactory = new SharedKafkaProducerAdapterFactory(getProperties(), 8, new ApacheKafkaProducerAdapterFactory() { // from class: com.linkedin.venice.writer.PubSubSharedProducerAdapterFactoryTest.1
                /* renamed from: create, reason: merged with bridge method [inline-methods] */
                public ApacheKafkaProducerAdapter m85create(VeniceProperties veniceProperties, String str, String str2) {
                    return (ApacheKafkaProducerAdapter) Mockito.mock(ApacheKafkaProducerAdapter.class);
                }
            }, new MetricsRepository(), Collections.EMPTY_SET);
            PubSubSharedProducerAdapter acquireSharedProducer = sharedKafkaProducerAdapterFactory.acquireSharedProducer("task1");
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task2");
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task3");
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task4");
            PubSubSharedProducerAdapter acquireSharedProducer2 = sharedKafkaProducerAdapterFactory.acquireSharedProducer("task5");
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task6");
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task7");
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task8");
            Assert.assertEquals(acquireSharedProducer, sharedKafkaProducerAdapterFactory.acquireSharedProducer("task1"));
            sharedKafkaProducerAdapterFactory.releaseSharedProducer("task5");
            Assert.assertEquals(acquireSharedProducer2, sharedKafkaProducerAdapterFactory.acquireSharedProducer("task9"));
            sharedKafkaProducerAdapterFactory.releaseSharedProducer("task5");
            if (sharedKafkaProducerAdapterFactory != null) {
                sharedKafkaProducerAdapterFactory.close();
            }
        } catch (Throwable th) {
            if (sharedKafkaProducerAdapterFactory != null) {
                sharedKafkaProducerAdapterFactory.close();
            }
            throw th;
        }
    }

    @Test
    public void testProducerClosing() throws Exception {
        SharedKafkaProducerAdapterFactory sharedKafkaProducerAdapterFactory = null;
        try {
            sharedKafkaProducerAdapterFactory = new SharedKafkaProducerAdapterFactory(getProperties(), 8, new ApacheKafkaProducerAdapterFactory() { // from class: com.linkedin.venice.writer.PubSubSharedProducerAdapterFactoryTest.2
                /* renamed from: create, reason: merged with bridge method [inline-methods] */
                public ApacheKafkaProducerAdapter m86create(VeniceProperties veniceProperties, String str, String str2) {
                    return (ApacheKafkaProducerAdapter) Mockito.mock(ApacheKafkaProducerAdapter.class);
                }
            }, new MetricsRepository(), Collections.EMPTY_SET);
            sharedKafkaProducerAdapterFactory.acquireSharedProducer("task1");
            sharedKafkaProducerAdapterFactory.releaseSharedProducer("task1");
            if (sharedKafkaProducerAdapterFactory != null) {
                sharedKafkaProducerAdapterFactory.close();
            }
        } catch (Throwable th) {
            if (sharedKafkaProducerAdapterFactory != null) {
                sharedKafkaProducerAdapterFactory.close();
            }
            throw th;
        }
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", "127.0.0.1:9092");
        return properties;
    }
}
