package org.apache.pulsar.broker.loadbalance.extensions.scheduler;

import com.google.common.collect.Lists;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.class */
public class UnloadSchedulerTest {
    private PulsarService pulsar;
    private ScheduledExecutorService loadManagerExecutor;

    public LoadManagerContext setupContext() {
        LoadManagerContext context = getContext();
        context.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
        return context;
    }

    @BeforeMethod
    public void setUp() {
        this.pulsar = (PulsarService) Mockito.mock(PulsarService.class);
        this.loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
        ((PulsarService) Mockito.doReturn(this.loadManagerExecutor).when(this.pulsar)).getLoadManagerExecutor();
    }

    @AfterMethod
    public void tearDown() {
        this.loadManagerExecutor.shutdown();
    }

    @Test(timeOut = 30000)
    public void testExecuteSuccess() {
        AtomicReference atomicReference = new AtomicReference();
        UnloadCounter unloadCounter = new UnloadCounter();
        LoadManagerContext loadManagerContext = setupContext();
        BrokerRegistry brokerRegistry = loadManagerContext.brokerRegistry();
        ServiceUnitStateChannel serviceUnitStateChannel = (ServiceUnitStateChannel) Mockito.mock(ServiceUnitStateChannel.class);
        UnloadManager unloadManager = (UnloadManager) Mockito.mock(UnloadManager.class);
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        NamespaceUnloadStrategy namespaceUnloadStrategy = (NamespaceUnloadStrategy) Mockito.mock(NamespaceUnloadStrategy.class);
        ((ServiceUnitStateChannel) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(serviceUnitStateChannel)).isChannelOwnerAsync();
        ((BrokerRegistry) Mockito.doReturn(CompletableFuture.completedFuture(Lists.newArrayList(new String[]{"broker-1", "broker-2"}))).when(brokerRegistry)).getAvailableBrokersAsync();
        ((ServiceUnitStateChannel) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(serviceUnitStateChannel)).publishUnloadEventAsync((Unload) ArgumentMatchers.any());
        ((UnloadManager) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(unloadManager)).waitAsync((CompletableFuture) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (UnloadDecision) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
        UnloadDecision unloadDecision = new UnloadDecision();
        Unload unload = new Unload("broker-1", "bundle-1");
        unloadDecision.setUnload(unload);
        unloadDecision.setLabel(UnloadDecision.Label.Success);
        ((NamespaceUnloadStrategy) Mockito.doReturn(Set.of(unloadDecision)).when(namespaceUnloadStrategy)).findBundlesForUnloading((LoadManagerContext) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        UnloadScheduler unloadScheduler = new UnloadScheduler(pulsarService, this.loadManagerExecutor, unloadManager, loadManagerContext, serviceUnitStateChannel, namespaceUnloadStrategy, unloadCounter, atomicReference);
        unloadScheduler.execute();
        ((ServiceUnitStateChannel) Mockito.verify(serviceUnitStateChannel, Mockito.times(1))).publishUnloadEventAsync((Unload) ArgumentMatchers.eq(unload));
        ((NamespaceUnloadStrategy) Mockito.doReturn(Set.of(new UnloadDecision())).when(namespaceUnloadStrategy)).findBundlesForUnloading((LoadManagerContext) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        unloadScheduler.execute();
        ((ServiceUnitStateChannel) Mockito.verify(serviceUnitStateChannel, Mockito.times(1))).publishUnloadEventAsync((Unload) ArgumentMatchers.eq(unload));
    }

    @Test(timeOut = 30000)
    public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        UnloadCounter unloadCounter = new UnloadCounter();
        LoadManagerContext loadManagerContext = setupContext();
        BrokerRegistry brokerRegistry = loadManagerContext.brokerRegistry();
        ServiceUnitStateChannel serviceUnitStateChannel = (ServiceUnitStateChannel) Mockito.mock(ServiceUnitStateChannel.class);
        UnloadManager unloadManager = (UnloadManager) Mockito.mock(UnloadManager.class);
        PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
        NamespaceUnloadStrategy namespaceUnloadStrategy = (NamespaceUnloadStrategy) Mockito.mock(NamespaceUnloadStrategy.class);
        ((ServiceUnitStateChannel) Mockito.doReturn(CompletableFuture.completedFuture(true)).when(serviceUnitStateChannel)).isChannelOwnerAsync();
        ((BrokerRegistry) Mockito.doAnswer(invocationOnMock -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                    return Lists.newArrayList(new String[]{"broker-1", "broker-2"});
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, Executors.newFixedThreadPool(1));
        }).when(brokerRegistry)).getAvailableBrokersAsync();
        UnloadScheduler unloadScheduler = new UnloadScheduler(pulsarService, this.loadManagerExecutor, unloadManager, loadManagerContext, serviceUnitStateChannel, namespaceUnloadStrategy, unloadCounter, atomicReference);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            newFixedThreadPool.execute(() -> {
                unloadScheduler.execute();
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        ((BrokerRegistry) Mockito.verify(brokerRegistry, Mockito.times(5))).getAvailableBrokersAsync();
    }

    @Test(timeOut = 30000)
    public void testDisableLoadBalancer() {
        AtomicReference atomicReference = new AtomicReference();
        UnloadCounter unloadCounter = new UnloadCounter();
        LoadManagerContext loadManagerContext = setupContext();
        loadManagerContext.brokerConfiguration().setLoadBalancerEnabled(false);
        ServiceUnitStateChannel serviceUnitStateChannel = (ServiceUnitStateChannel) Mockito.mock(ServiceUnitStateChannel.class);
        NamespaceUnloadStrategy namespaceUnloadStrategy = (NamespaceUnloadStrategy) Mockito.mock(NamespaceUnloadStrategy.class);
        UnloadScheduler unloadScheduler = new UnloadScheduler((PulsarService) Mockito.mock(PulsarService.class), this.loadManagerExecutor, (UnloadManager) Mockito.mock(UnloadManager.class), loadManagerContext, serviceUnitStateChannel, namespaceUnloadStrategy, unloadCounter, atomicReference);
        unloadScheduler.execute();
        ((ServiceUnitStateChannel) Mockito.verify(serviceUnitStateChannel, Mockito.times(0))).isChannelOwnerAsync();
        loadManagerContext.brokerConfiguration().setLoadBalancerEnabled(true);
        loadManagerContext.brokerConfiguration().setLoadBalancerSheddingEnabled(false);
        unloadScheduler.execute();
        ((ServiceUnitStateChannel) Mockito.verify(serviceUnitStateChannel, Mockito.times(0))).isChannelOwnerAsync();
    }

    @Test(timeOut = 30000)
    public void testNotChannelOwner() {
        AtomicReference atomicReference = new AtomicReference();
        UnloadCounter unloadCounter = new UnloadCounter();
        LoadManagerContext loadManagerContext = setupContext();
        loadManagerContext.brokerConfiguration().setLoadBalancerEnabled(false);
        ServiceUnitStateChannel serviceUnitStateChannel = (ServiceUnitStateChannel) Mockito.mock(ServiceUnitStateChannel.class);
        NamespaceUnloadStrategy namespaceUnloadStrategy = (NamespaceUnloadStrategy) Mockito.mock(NamespaceUnloadStrategy.class);
        UnloadScheduler unloadScheduler = new UnloadScheduler((PulsarService) Mockito.mock(PulsarService.class), this.loadManagerExecutor, (UnloadManager) Mockito.mock(UnloadManager.class), loadManagerContext, serviceUnitStateChannel, namespaceUnloadStrategy, unloadCounter, atomicReference);
        ((ServiceUnitStateChannel) Mockito.doReturn(CompletableFuture.completedFuture(false)).when(serviceUnitStateChannel)).isChannelOwnerAsync();
        unloadScheduler.execute();
        ((BrokerRegistry) Mockito.verify(loadManagerContext.brokerRegistry(), Mockito.times(0))).getAvailableBrokersAsync();
    }

    public LoadManagerContext getContext() {
        LoadManagerContext loadManagerContext = (LoadManagerContext) Mockito.mock(LoadManagerContext.class);
        BrokerRegistry brokerRegistry = (BrokerRegistry) Mockito.mock(BrokerRegistry.class);
        ((LoadManagerContext) Mockito.doReturn(new ServiceConfiguration()).when(loadManagerContext)).brokerConfiguration();
        ((LoadManagerContext) Mockito.doReturn(brokerRegistry).when(loadManagerContext)).brokerRegistry();
        return loadManagerContext;
    }
}
