package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.utils.InMemoryLogAppender;
import com.linkedin.venice.utils.Utils;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.class */
public class LeaderProducerCallbackTest {
    @Test
    public void testOnCompletionWithNonNullException() {
        LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = (LeaderFollowerStoreIngestionTask) Mockito.mock(LeaderFollowerStoreIngestionTask.class);
        PubSubMessage pubSubMessage = (PubSubMessage) Mockito.mock(PubSubMessage.class);
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        LeaderProducedRecordContext leaderProducedRecordContext = (LeaderProducedRecordContext) Mockito.mock(LeaderProducedRecordContext.class);
        AggVersionedDIVStats aggVersionedDIVStats = (AggVersionedDIVStats) Mockito.mock(AggVersionedDIVStats.class);
        String uniqueString = Utils.getUniqueString("test-store");
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(leaderFollowerStoreIngestionTask.getStoreName()).thenReturn(uniqueString);
        Mockito.when(leaderFollowerStoreIngestionTask.getVersionedDIVStats()).thenReturn(aggVersionedDIVStats);
        Mockito.when(pubSubMessage.getTopicName()).thenReturn("T1", new String[]{"T2", "T1", "T2", "T1", "T2"});
        Mockito.when(Integer.valueOf(pubSubMessage.getPartition())).thenReturn(1, new Integer[]{21, 1, 22, 1, 22});
        ((AggVersionedDIVStats) Mockito.doAnswer(invocationOnMock -> {
            return Integer.valueOf(atomicInteger.getAndIncrement());
        }).when(aggVersionedDIVStats)).recordLeaderProducerFailure(uniqueString, 0);
        InMemoryLogAppender build = new InMemoryLogAppender.Builder().build();
        build.start();
        LoggerContext context = LogManager.getContext(false);
        Configuration configuration = context.getConfiguration();
        try {
            configuration.addLoggerAppender(LogManager.getLogger(LeaderFollowerStoreIngestionTask.class), build);
            LeaderProducerCallback leaderProducerCallback = new LeaderProducerCallback(leaderFollowerStoreIngestionTask, pubSubMessage, partitionConsumptionState, leaderProducedRecordContext, 5, "dc-0.kafka.venice.org", 67454542L);
            for (int i = 0; i < 6; i++) {
                leaderProducerCallback.onCompletion((PubSubProduceResult) null, new VeniceException("Producer is closed forcefully"));
            }
            Assert.assertEquals(build.getLogs().stream().filter(str -> {
                return str.contains("Leader failed to send out message to version topic when consuming ");
            }).count(), 3L);
            Assert.assertEquals(atomicInteger.get(), 6);
            LoggerConfig loggerConfig = configuration.getLoggerConfig(LeaderFollowerStoreIngestionTask.class.getName());
            if (loggerConfig.getName().equals(LeaderFollowerStoreIngestionTask.class.getCanonicalName())) {
                loggerConfig.removeAppender(build.getName());
            }
            context.updateLoggers();
            build.stop();
        } catch (Throwable th) {
            LoggerConfig loggerConfig2 = configuration.getLoggerConfig(LeaderFollowerStoreIngestionTask.class.getName());
            if (loggerConfig2.getName().equals(LeaderFollowerStoreIngestionTask.class.getCanonicalName())) {
                loggerConfig2.removeAppender(build.getName());
            }
            context.updateLoggers();
            build.stop();
            throw th;
        }
    }
}
