package com.linkedin.davinci.ingestion;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.ingestion.main.MainIngestionMonitorService;
import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient;
import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus;
import com.linkedin.davinci.ingestion.main.MainTopicIngestionStatus;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/ingestion/IsolatedIngestionBackendTest.class */
public class IsolatedIngestionBackendTest {
    @Test
    public void testBackendCanDirectCommandCorrectly() {
        MainIngestionMonitorService mainIngestionMonitorService = (MainIngestionMonitorService) Mockito.mock(MainIngestionMonitorService.class);
        try {
            IsolatedIngestionBackend isolatedIngestionBackend = (IsolatedIngestionBackend) Mockito.mock(IsolatedIngestionBackend.class);
            try {
                Mockito.when(isolatedIngestionBackend.getMainIngestionMonitorService()).thenReturn(mainIngestionMonitorService);
                KafkaStoreIngestionService kafkaStoreIngestionService = (KafkaStoreIngestionService) Mockito.mock(KafkaStoreIngestionService.class);
                ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
                Mockito.when(readOnlyStoreRepository.waitVersion(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any())).thenReturn(Pair.create((Store) Mockito.mock(Store.class), (Version) Mockito.mock(Version.class)));
                Mockito.when(kafkaStoreIngestionService.getMetadataRepo()).thenReturn(readOnlyStoreRepository);
                Mockito.when(Boolean.valueOf(kafkaStoreIngestionService.isPartitionConsuming("testTopic_v1", 0))).thenReturn(true);
                Mockito.when(isolatedIngestionBackend.getStoreIngestionService()).thenReturn(kafkaStoreIngestionService);
                ((IsolatedIngestionBackend) Mockito.doCallRealMethod().when(isolatedIngestionBackend)).executeCommandWithRetry(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (IngestionCommandType) ArgumentMatchers.any(), (Supplier) ArgumentMatchers.any(), (Runnable) ArgumentMatchers.any());
                AtomicInteger atomicInteger = new AtomicInteger();
                Supplier supplier = () -> {
                    atomicInteger.set(1);
                    return true;
                };
                Runnable runnable = () -> {
                    atomicInteger.set(-1);
                };
                VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
                ((MainIngestionMonitorService) Mockito.doCallRealMethod().when(mainIngestionMonitorService)).cleanupTopicPartitionState("testTopic_v1", 0);
                ((MainIngestionMonitorService) Mockito.doCallRealMethod().when(mainIngestionMonitorService)).setVersionPartitionToLocalIngestion("testTopic_v1", 0);
                ((MainIngestionMonitorService) Mockito.doCallRealMethod().when(mainIngestionMonitorService)).setVersionPartitionToIsolatedIngestion("testTopic_v1", 0);
                Mockito.when(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0)).thenCallRealMethod();
                Mockito.when(mainIngestionMonitorService.getTopicIngestionStatusMap()).thenReturn(veniceConcurrentHashMap);
                Mockito.when(mainIngestionMonitorService.getForkProcessActionLock()).thenReturn(reentrantReadWriteLock);
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionIngesting("testTopic_v1", 0))).thenCallRealMethod();
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionHostedInMainProcess("testTopic_v1", 0))).thenCallRealMethod();
                atomicInteger.set(0);
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.NOT_EXIST);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.START_CONSUMPTION, supplier, runnable);
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.ISOLATED);
                Assert.assertEquals(atomicInteger.get(), 1);
                atomicInteger.set(0);
                mainIngestionMonitorService.cleanupTopicPartitionState("testTopic_v1", 0);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.STOP_CONSUMPTION, supplier, runnable);
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.NOT_EXIST);
                Assert.assertEquals(atomicInteger.get(), -1);
                atomicInteger.set(0);
                mainIngestionMonitorService.setVersionPartitionToLocalIngestion("testTopic_v1", 0);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.START_CONSUMPTION, supplier, runnable);
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.MAIN);
                Assert.assertEquals(atomicInteger.get(), -1);
                mainIngestionMonitorService.setVersionPartitionToIsolatedIngestion("testTopic_v1", 0);
                atomicInteger.set(0);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.START_CONSUMPTION, supplier, runnable);
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.ISOLATED);
                Assert.assertEquals(atomicInteger.get(), 1);
                if (isolatedIngestionBackend != null) {
                    isolatedIngestionBackend.close();
                }
                if (mainIngestionMonitorService != null) {
                    mainIngestionMonitorService.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mainIngestionMonitorService != null) {
                try {
                    mainIngestionMonitorService.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testBackendCanHandleErrorCorrectly() {
        MainIngestionMonitorService mainIngestionMonitorService = (MainIngestionMonitorService) Mockito.mock(MainIngestionMonitorService.class);
        try {
            IsolatedIngestionBackend isolatedIngestionBackend = (IsolatedIngestionBackend) Mockito.mock(IsolatedIngestionBackend.class);
            try {
                String str = "testTopic_v1";
                int i = 0;
                Mockito.when(isolatedIngestionBackend.getMainIngestionMonitorService()).thenReturn(mainIngestionMonitorService);
                KafkaStoreIngestionService kafkaStoreIngestionService = (KafkaStoreIngestionService) Mockito.mock(KafkaStoreIngestionService.class);
                ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
                Mockito.when(readOnlyStoreRepository.waitVersion(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any())).thenReturn(Pair.create((Store) Mockito.mock(Store.class), (Version) Mockito.mock(Version.class)));
                Mockito.when(kafkaStoreIngestionService.getMetadataRepo()).thenReturn(readOnlyStoreRepository);
                Mockito.when(Boolean.valueOf(kafkaStoreIngestionService.isPartitionConsuming("testTopic_v1", 0))).thenReturn(true);
                Mockito.when(isolatedIngestionBackend.getStoreIngestionService()).thenReturn(kafkaStoreIngestionService);
                ((IsolatedIngestionBackend) Mockito.doCallRealMethod().when(isolatedIngestionBackend)).executeCommandWithRetry(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (IngestionCommandType) ArgumentMatchers.any(), (Supplier) ArgumentMatchers.any(), (Runnable) ArgumentMatchers.any());
                AtomicInteger atomicInteger = new AtomicInteger();
                Runnable runnable = () -> {
                    atomicInteger.set(-1);
                };
                VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
                ((MainIngestionMonitorService) Mockito.doCallRealMethod().when(mainIngestionMonitorService)).cleanupTopicPartitionState("testTopic_v1", 0);
                Mockito.when(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0)).thenCallRealMethod();
                Mockito.when(mainIngestionMonitorService.getTopicIngestionStatusMap()).thenReturn(veniceConcurrentHashMap);
                Mockito.when(mainIngestionMonitorService.getForkProcessActionLock()).thenReturn(reentrantReadWriteLock);
                MainTopicIngestionStatus mainTopicIngestionStatus = new MainTopicIngestionStatus("testTopic_v1");
                mainTopicIngestionStatus.setPartitionIngestionStatusToIsolatedIngestion(0);
                veniceConcurrentHashMap.put("testTopic_v1", mainTopicIngestionStatus);
                atomicInteger.set(0);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.STOP_CONSUMPTION, () -> {
                    return false;
                }, runnable);
                Assert.assertEquals(atomicInteger.get(), -1);
                atomicInteger.set(0);
                veniceConcurrentHashMap.clear();
                Assert.assertThrows(VeniceTimeoutException.class, () -> {
                    isolatedIngestionBackend.executeCommandWithRetry(str, i, IngestionCommandType.START_CONSUMPTION, () -> {
                        mainIngestionMonitorService.setVersionPartitionToIsolatedIngestion(str, i);
                        throw new VeniceTimeoutException("Ingestion request failed due to timeout!");
                    }, runnable);
                });
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.NOT_EXIST);
                Assert.assertEquals(atomicInteger.get(), 0);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.STOP_CONSUMPTION, () -> {
                    return false;
                }, runnable);
                Assert.assertEquals(atomicInteger.get(), -1);
                atomicInteger.set(0);
                veniceConcurrentHashMap.clear();
                Mockito.when(readOnlyStoreRepository.waitVersion(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (Duration) ArgumentMatchers.any())).thenReturn(Pair.create((Object) null, (Object) null));
                Assert.assertThrows(VeniceException.class, () -> {
                    isolatedIngestionBackend.executeCommandWithRetry(str, i, IngestionCommandType.START_CONSUMPTION, () -> {
                        mainIngestionMonitorService.setVersionPartitionToIsolatedIngestion(str, i);
                        throw new VeniceTimeoutException("Ingestion request failed due to timeout!");
                    }, runnable);
                });
                Assert.assertEquals(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0), MainPartitionIngestionStatus.NOT_EXIST);
                Assert.assertEquals(atomicInteger.get(), 0);
                isolatedIngestionBackend.executeCommandWithRetry("testTopic_v1", 0, IngestionCommandType.STOP_CONSUMPTION, () -> {
                    return false;
                }, runnable);
                Assert.assertEquals(atomicInteger.get(), -1);
                if (isolatedIngestionBackend != null) {
                    isolatedIngestionBackend.close();
                }
                if (mainIngestionMonitorService != null) {
                    mainIngestionMonitorService.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mainIngestionMonitorService != null) {
                try {
                    mainIngestionMonitorService.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testIsolatedIngestionNotifierAsyncCompletionHandling() {
        IsolatedIngestionBackend isolatedIngestionBackend = (IsolatedIngestionBackend) Mockito.mock(IsolatedIngestionBackend.class);
        VeniceNotifier veniceNotifier = (VeniceNotifier) Mockito.mock(VeniceNotifier.class);
        VeniceConfigLoader veniceConfigLoader = (VeniceConfigLoader) Mockito.mock(VeniceConfigLoader.class);
        VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
        Mockito.when(isolatedIngestionBackend.getCompletionHandlingExecutor()).thenReturn(Executors.newFixedThreadPool(10));
        Mockito.when(isolatedIngestionBackend.getIsolatedIngestionNotifier((VeniceNotifier) ArgumentMatchers.any())).thenCallRealMethod();
        Mockito.when(isolatedIngestionBackend.getConfigLoader()).thenReturn(veniceConfigLoader);
        Mockito.when(veniceConfigLoader.getStoreConfig("topic_v1")).thenReturn(veniceStoreVersionConfig);
        Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionIngesting("topic_v1", 0))).thenReturn(false);
        Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionIngesting("topic_v1", 1))).thenReturn(true);
        isolatedIngestionBackend.getIsolatedIngestionNotifier(veniceNotifier).completed("topic_v1", 0, 123L, "", Optional.empty());
        ((IsolatedIngestionBackend) Mockito.verify(isolatedIngestionBackend, Mockito.times(0))).getCompletionHandlingExecutor();
        isolatedIngestionBackend.getIsolatedIngestionNotifier(veniceNotifier).completed("topic_v1", 1, 123L, "", Optional.empty());
        ((IsolatedIngestionBackend) Mockito.verify(isolatedIngestionBackend, Mockito.times(1))).getCompletionHandlingExecutor();
    }

    @Test
    public void testBackendCanMaintainMetadataCorrectlyForDroppingPartition() {
        MainIngestionMonitorService mainIngestionMonitorService = (MainIngestionMonitorService) Mockito.mock(MainIngestionMonitorService.class);
        try {
            IsolatedIngestionBackend isolatedIngestionBackend = (IsolatedIngestionBackend) Mockito.mock(IsolatedIngestionBackend.class);
            try {
                int i = 0;
                VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
                Mockito.when(veniceStoreVersionConfig.getStoreVersionName()).thenReturn("testTopic_v1");
                MainIngestionRequestClient mainIngestionRequestClient = (MainIngestionRequestClient) Mockito.mock(MainIngestionRequestClient.class);
                Mockito.when(isolatedIngestionBackend.getMainIngestionRequestClient()).thenReturn(mainIngestionRequestClient);
                Mockito.when(isolatedIngestionBackend.getMainIngestionMonitorService()).thenReturn(mainIngestionMonitorService);
                VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                MainTopicIngestionStatus mainTopicIngestionStatus = new MainTopicIngestionStatus("testTopic_v1");
                mainTopicIngestionStatus.setPartitionIngestionStatusToLocalIngestion(0);
                veniceConcurrentHashMap.put("testTopic_v1", mainTopicIngestionStatus);
                ((MainIngestionMonitorService) Mockito.doCallRealMethod().when(mainIngestionMonitorService)).cleanupTopicPartitionState("testTopic_v1", 0);
                Mockito.when(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0)).thenCallRealMethod();
                Mockito.when(mainIngestionMonitorService.getTopicIngestionStatusMap()).thenReturn(veniceConcurrentHashMap);
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionHostedInMainProcess(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt()))).thenCallRealMethod();
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionIngesting(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt()))).thenCallRealMethod();
                ((IsolatedIngestionBackend) Mockito.doCallRealMethod().when(isolatedIngestionBackend)).dropStoragePartitionGracefully((VeniceStoreVersionConfig) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean());
                ((IsolatedIngestionBackend) Mockito.doCallRealMethod().when(isolatedIngestionBackend)).executeCommandWithRetry(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (IngestionCommandType) ArgumentMatchers.any(), (Supplier) ArgumentMatchers.any(), (Runnable) ArgumentMatchers.any());
                Mockito.when(Long.valueOf(mainIngestionMonitorService.getTopicPartitionCount("testTopic_v1"))).thenReturn(2L);
                Mockito.when(mainIngestionMonitorService.getForkProcessActionLock()).thenReturn(new ReentrantReadWriteLock());
                isolatedIngestionBackend.dropStoragePartitionGracefully(veniceStoreVersionConfig, 0, 180, false);
                ((IsolatedIngestionBackend) Mockito.verify(isolatedIngestionBackend, Mockito.times(1))).removeTopicPartitionLocally((VeniceStoreVersionConfig) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean());
                ((MainIngestionRequestClient) Mockito.verify(mainIngestionRequestClient, Mockito.times(1))).resetTopicPartition("testTopic_v1", 0);
                ((MainIngestionMonitorService) Mockito.verify(mainIngestionMonitorService, Mockito.times(1))).cleanupTopicPartitionState("testTopic_v1", 0);
                Assert.assertEquals(((MainTopicIngestionStatus) veniceConcurrentHashMap.get("testTopic_v1")).getPartitionIngestionStatus(0), MainPartitionIngestionStatus.NOT_EXIST);
                ((MainTopicIngestionStatus) veniceConcurrentHashMap.get("testTopic_v1")).setPartitionIngestionStatusToLocalIngestion(0);
                ((IsolatedIngestionBackend) Mockito.doThrow(new Throwable[]{new VeniceException("test")}).when(isolatedIngestionBackend)).removeTopicPartitionLocally((VeniceStoreVersionConfig) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean());
                Assert.assertThrows(() -> {
                    isolatedIngestionBackend.dropStoragePartitionGracefully(veniceStoreVersionConfig, i, 180, false);
                });
                ((IsolatedIngestionBackend) Mockito.verify(isolatedIngestionBackend, Mockito.times(2))).removeTopicPartitionLocally((VeniceStoreVersionConfig) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean());
                ((MainIngestionRequestClient) Mockito.verify(mainIngestionRequestClient, Mockito.times(1))).resetTopicPartition("testTopic_v1", 0);
                ((MainIngestionMonitorService) Mockito.verify(mainIngestionMonitorService, Mockito.times(1))).cleanupTopicPartitionState("testTopic_v1", 0);
                Assert.assertEquals(((MainTopicIngestionStatus) veniceConcurrentHashMap.get("testTopic_v1")).getPartitionIngestionStatus(0), MainPartitionIngestionStatus.MAIN);
                ((MainTopicIngestionStatus) veniceConcurrentHashMap.get("testTopic_v1")).setPartitionIngestionStatusToIsolatedIngestion(0);
                Mockito.when(Boolean.valueOf(mainIngestionRequestClient.removeTopicPartition("testTopic_v1", 0))).thenReturn(true);
                isolatedIngestionBackend.dropStoragePartitionGracefully(veniceStoreVersionConfig, 0, 180, false);
                ((MainIngestionRequestClient) Mockito.verify(mainIngestionRequestClient, Mockito.times(1))).removeTopicPartition("testTopic_v1", 0);
                ((MainIngestionRequestClient) Mockito.verify(mainIngestionRequestClient, Mockito.times(2))).resetTopicPartition("testTopic_v1", 0);
                ((MainIngestionMonitorService) Mockito.verify(mainIngestionMonitorService, Mockito.times(2))).cleanupTopicPartitionState("testTopic_v1", 0);
                Assert.assertEquals(((MainTopicIngestionStatus) veniceConcurrentHashMap.get("testTopic_v1")).getPartitionIngestionStatus(0), MainPartitionIngestionStatus.NOT_EXIST);
                if (isolatedIngestionBackend != null) {
                    isolatedIngestionBackend.close();
                }
                if (mainIngestionMonitorService != null) {
                    mainIngestionMonitorService.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mainIngestionMonitorService != null) {
                try {
                    mainIngestionMonitorService.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testBackendShutdownSuccessfullyWithDivergedResourceMetadata() {
        MainIngestionMonitorService mainIngestionMonitorService = (MainIngestionMonitorService) Mockito.mock(MainIngestionMonitorService.class);
        try {
            IsolatedIngestionBackend isolatedIngestionBackend = (IsolatedIngestionBackend) Mockito.mock(IsolatedIngestionBackend.class);
            try {
                VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
                Mockito.when(veniceStoreVersionConfig.getStoreVersionName()).thenReturn("testTopic_v1");
                MainIngestionRequestClient mainIngestionRequestClient = (MainIngestionRequestClient) Mockito.mock(MainIngestionRequestClient.class);
                Mockito.when(isolatedIngestionBackend.getMainIngestionRequestClient()).thenReturn(mainIngestionRequestClient);
                Mockito.when(isolatedIngestionBackend.getMainIngestionMonitorService()).thenReturn(mainIngestionMonitorService);
                VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                MainTopicIngestionStatus mainTopicIngestionStatus = new MainTopicIngestionStatus("testTopic_v1");
                mainTopicIngestionStatus.setPartitionIngestionStatusToIsolatedIngestion(0);
                veniceConcurrentHashMap.put("testTopic_v1", mainTopicIngestionStatus);
                Mockito.when(mainIngestionMonitorService.getTopicPartitionIngestionStatus("testTopic_v1", 0)).thenCallRealMethod();
                Mockito.when(mainIngestionMonitorService.getTopicIngestionStatusMap()).thenReturn(veniceConcurrentHashMap);
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionHostedInMainProcess(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt()))).thenCallRealMethod();
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isTopicPartitionIngesting(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt()))).thenCallRealMethod();
                ((IsolatedIngestionBackend) Mockito.doCallRealMethod().when(isolatedIngestionBackend)).stopConsumption((VeniceStoreVersionConfig) ArgumentMatchers.any(), ArgumentMatchers.anyInt());
                ((IsolatedIngestionBackend) Mockito.doCallRealMethod().when(isolatedIngestionBackend)).executeCommandWithRetry(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), (IngestionCommandType) ArgumentMatchers.any(), (Supplier) ArgumentMatchers.any(), (Runnable) ArgumentMatchers.any());
                Mockito.when(mainIngestionMonitorService.getForkProcessActionLock()).thenReturn(new ReentrantReadWriteLock());
                KafkaStoreIngestionService kafkaStoreIngestionService = (KafkaStoreIngestionService) Mockito.mock(KafkaStoreIngestionService.class);
                Mockito.when(Boolean.valueOf(kafkaStoreIngestionService.isPartitionConsuming(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt()))).thenReturn(false);
                Mockito.when(isolatedIngestionBackend.getStoreIngestionService()).thenReturn(kafkaStoreIngestionService);
                Mockito.when(Boolean.valueOf(mainIngestionRequestClient.stopConsumption(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt()))).thenReturn(false);
                Mockito.when(Boolean.valueOf(isolatedIngestionBackend.isShuttingDown())).thenReturn(true);
                isolatedIngestionBackend.stopConsumption(veniceStoreVersionConfig, 0);
                ((KafkaStoreIngestionService) Mockito.verify(kafkaStoreIngestionService, Mockito.times(1))).isPartitionConsuming(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
                ((MainIngestionRequestClient) Mockito.verify(mainIngestionRequestClient, Mockito.times(1))).stopConsumption(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt());
                if (isolatedIngestionBackend != null) {
                    isolatedIngestionBackend.close();
                }
                if (mainIngestionMonitorService != null) {
                    mainIngestionMonitorService.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (mainIngestionMonitorService != null) {
                try {
                    mainIngestionMonitorService.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
