package org.apache.pulsar.client.impl;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ConnectionHandlerTest.class */
public class ConnectionHandlerTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ConnectionHandlerTest.class);
    private static final Backoff BACKOFF = new BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS).setMandatoryStop(1, TimeUnit.SECONDS).setMax(3, TimeUnit.SECONDS).create();
    private final ExecutorService executor = Executors.newFixedThreadPool(4);

    /* loaded from: input_file:org/apache/pulsar/client/impl/ConnectionHandlerTest$MockedBackoff.class */
    private static class MockedBackoff extends Backoff {
        public MockedBackoff() {
            super(1L, TimeUnit.HOURS, 2L, TimeUnit.HOURS, 1L, TimeUnit.HOURS);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/client/impl/ConnectionHandlerTest$MockedHandlerState.class */
    private static class MockedHandlerState extends HandlerState {
        public MockedHandlerState(PulsarClientImpl pulsarClientImpl, String str) {
            super(pulsarClientImpl, str);
        }

        String getHandlerName() {
            return "mocked";
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        super.internalCleanup();
        this.executor.shutdown();
    }

    @Test(timeOut = 30000)
    public void testSynchronousGrabCnx() {
        for (int i = 0; i < 10; i++) {
            CompletableFuture completableFuture = new CompletableFuture();
            int i2 = i;
            new ConnectionHandler(new MockedHandlerState(this.pulsarClient, "my-topic"), BACKOFF, clientCnx -> {
                completableFuture.complete(Integer.valueOf(i2));
                return CompletableFuture.completedFuture(null);
            }).grabCnx();
            Assert.assertEquals((Integer) completableFuture.join(), i);
        }
    }

    @Test
    public void testConcurrentGrabCnx() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ConnectionHandler connectionHandler = new ConnectionHandler(new MockedHandlerState(this.pulsarClient, "my-topic"), BACKOFF, clientCnx -> {
            atomicInteger.incrementAndGet();
            return CompletableFuture.completedFuture(null);
        });
        for (int i = 0; i < 10; i++) {
            connectionHandler.grabCnx();
        }
        Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> {
            return Boolean.valueOf(atomicInteger.get() > 0);
        });
        Assert.assertThrows(ConditionTimeoutException.class, () -> {
            Awaitility.await().atMost(Duration.ofMillis(500L)).until(() -> {
                return Boolean.valueOf(atomicInteger.get() == 10);
            });
        });
        Assert.assertEquals(atomicInteger.get(), 1);
    }

    @Test
    public void testDuringConnectInvokeCount() throws IllegalAccessException {
        AtomicBoolean atomicBoolean = (AtomicBoolean) Mockito.spy(new AtomicBoolean());
        ConnectionHandler connectionHandler = new ConnectionHandler(new MockedHandlerState(this.pulsarClient, "my-topic"), BACKOFF, clientCnx -> {
            return CompletableFuture.completedFuture(null);
        });
        FieldUtils.writeField(connectionHandler, "duringConnect", atomicBoolean, true);
        connectionHandler.grabCnx();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> {
            return Boolean.valueOf(!atomicBoolean.get());
        });
        ((AtomicBoolean) Mockito.verify(atomicBoolean, Mockito.times(1))).compareAndSet(false, true);
        ((AtomicBoolean) Mockito.verify(atomicBoolean, Mockito.times(1))).set(false);
        ConnectionHandler connectionHandler2 = new ConnectionHandler(new MockedHandlerState(this.pulsarClient, null), new MockedBackoff(), clientCnx2 -> {
            return CompletableFuture.completedFuture(null);
        });
        FieldUtils.writeField(connectionHandler2, "duringConnect", atomicBoolean, true);
        connectionHandler2.grabCnx();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> {
            return Boolean.valueOf(!atomicBoolean.get());
        });
        ((AtomicBoolean) Mockito.verify(atomicBoolean, Mockito.times(2))).compareAndSet(false, true);
        ((AtomicBoolean) Mockito.verify(atomicBoolean, Mockito.times(2))).set(false);
        ConnectionHandler connectionHandler3 = new ConnectionHandler(new MockedHandlerState(this.pulsarClient, "my-topic"), new MockedBackoff(), clientCnx3 -> {
            return FutureUtil.failedFuture(new RuntimeException("fail"));
        });
        FieldUtils.writeField(connectionHandler3, "duringConnect", atomicBoolean, true);
        connectionHandler3.grabCnx();
        Awaitility.await().atMost(Duration.ofSeconds(3L)).until(() -> {
            return Boolean.valueOf(!atomicBoolean.get());
        });
        ((AtomicBoolean) Mockito.verify(atomicBoolean, Mockito.times(3))).compareAndSet(false, true);
        ((AtomicBoolean) Mockito.verify(atomicBoolean, Mockito.times(3))).set(false);
    }
}
