package com.linkedin.alpini.netty4.pool;

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.misc.Time;
import com.linkedin.alpini.base.monitoring.CallTracker;
import com.linkedin.alpini.base.monitoring.CallTrackerImpl;
import com.linkedin.alpini.consts.QOS;
import com.linkedin.alpini.netty4.handlers.HttpClientResponseHandler;
import com.linkedin.alpini.netty4.misc.BasicFullHttpResponse;
import com.linkedin.alpini.netty4.misc.NettyUtils;
import com.linkedin.alpini.netty4.pool.ChannelPoolManager;
import io.netty.bootstrap.InstrumentedBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import javax.annotation.Nonnull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.IRetryAnalyzer;
import org.testng.ITestResult;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/alpini/netty4/pool/TestChannelPoolManager.class */
public class TestChannelPoolManager {
    private Logger _log = LogManager.getLogger(getClass());
    long acquireTimeoutMillis = 1000;
    int maxConnections = 1;
    int maxPendingAcquires = 100;
    int maxWaitersPerPool = 100;
    boolean releaseHealthCheck = true;
    long healthCheckInterval = 10000;
    ChannelHealthChecker channelHealthChecker = new ChannelHealthChecker() { // from class: com.linkedin.alpini.netty4.pool.TestChannelPoolManager.1
        public Future<Boolean> isHealthy(Channel channel) {
            Promise newPromise = channel.eventLoop().newPromise();
            Promise newPromise2 = channel.eventLoop().newPromise();
            TestChannelPoolManager testChannelPoolManager = TestChannelPoolManager.this;
            HttpVersion httpVersion = HttpVersion.HTTP_1_1;
            HttpMethod httpMethod = HttpMethod.OPTIONS;
            ByteBuf byteBuf = Unpooled.EMPTY_BUFFER;
            Objects.requireNonNull(newPromise2);
            channel.writeAndFlush(new Request(httpVersion, httpMethod, "/", byteBuf, newPromise2::setSuccess)).addListener(future -> {
                if (future.isSuccess()) {
                    newPromise2.addListener(future -> {
                        if (!future.isSuccess()) {
                            TestChannelPoolManager.this._log.warn("unhealthy", future.cause());
                            newPromise.setSuccess(false);
                        } else if (!(future.getNow() instanceof FullHttpResponse)) {
                            TestChannelPoolManager.this._log.warn("bad response type");
                            newPromise.setSuccess(false);
                        } else {
                            int code = ((FullHttpResponse) future.getNow()).status().code();
                            TestChannelPoolManager.this._log.debug("Health check response code {}", Integer.valueOf(code));
                            newPromise.setSuccess(Boolean.valueOf(code >= 200 && code < 300));
                        }
                    });
                } else {
                    TestChannelPoolManager.this._log.warn("unhealthy", future.cause());
                    newPromise.setSuccess(false);
                }
            });
            return newPromise;
        }
    };
    CallTracker healthCheckerTracker = null;
    CallTracker connectCallTracker = new CallTrackerImpl();
    ChannelPoolResolver channelPoolResolver = new BasicDnsResolver();
    private Future<Channel> _cancelInAcquireResolved;
    private Future<Channel> _cancelInAcquireEventLoop;
    private Future<Channel> _cancelInAcquireEventLoopListener;
    private Future<Channel> _cancelInAcquireTrySuccess;

    /* loaded from: input_file:com/linkedin/alpini/netty4/pool/TestChannelPoolManager$Request.class */
    private class Request extends DefaultFullHttpRequest implements HttpClientResponseHandler.ResponseConsumer {
        private Consumer<Object> _responseConsumer;

        Request(HttpVersion httpVersion, HttpMethod httpMethod, String str, ByteBuf byteBuf, Consumer<Object> consumer) {
            super(httpVersion, httpMethod, str, byteBuf);
            this._responseConsumer = consumer;
        }

        public Consumer<Object> responseConsumer() {
            return this._responseConsumer;
        }
    }

    /* loaded from: input_file:com/linkedin/alpini/netty4/pool/TestChannelPoolManager$Retry.class */
    public static class Retry implements IRetryAnalyzer {
        int _attempts = 3;

        public boolean retry(ITestResult iTestResult) {
            if (iTestResult.isSuccess()) {
                return false;
            }
            int i = this._attempts;
            this._attempts = i - 1;
            if (i <= 0) {
                return false;
            }
            iTestResult.setStatus(4);
            try {
                Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(10000));
                return true;
            } catch (InterruptedException e) {
                return true;
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] poolModes() {
        return new Object[]{new Object[]{false, false, false, false}, new Object[]{true, false, false, false}, new Object[]{false, false, false, true}, new Object[]{true, false, false, true}};
    }

    @Test(groups = {"unit"}, retryAnalyzer = Retry.class, successPercentage = 30, dataProvider = "poolModes")
    public void testChannelPoolManagerNIO(boolean z, boolean z2, boolean z3, boolean z4) throws InterruptedException {
        NettyUtils.setMode("NIO");
        MultithreadEventLoopGroup newEventLoopGroup = NettyUtils.newEventLoopGroup(4, Executors.defaultThreadFactory());
        try {
            channelPoolManagerTest(newEventLoopGroup, z, z2, z3, z4);
            newEventLoopGroup.shutdownGracefully().await();
        } catch (Throwable th) {
            newEventLoopGroup.shutdownGracefully().await();
            throw th;
        }
    }

    @Test(groups = {"unit"}, retryAnalyzer = Retry.class, successPercentage = 30, dataProvider = "poolModes")
    public void testChannelPoolManagerEPOLL(boolean z, boolean z2, boolean z3, boolean z4) throws InterruptedException {
        NettyUtils.setMode("EPOLL");
        MultithreadEventLoopGroup newEventLoopGroup = NettyUtils.newEventLoopGroup(4, Executors.defaultThreadFactory());
        try {
            channelPoolManagerTest(newEventLoopGroup, z, z2, z3, z4);
            newEventLoopGroup.shutdownGracefully().await();
        } catch (Throwable th) {
            newEventLoopGroup.shutdownGracefully().await();
            throw th;
        }
    }

    private static void assertSupplierEquals(IntSupplier intSupplier, int i) {
        long nanoTime = Time.nanoTime() + TimeUnit.MILLISECONDS.toNanos(100L);
        while (intSupplier.getAsInt() != i && Time.nanoTime() < nanoTime) {
            Thread.yield();
        }
        Assert.assertEquals(intSupplier.getAsInt(), i);
    }

    private <E extends MultithreadEventLoopGroup> void channelPoolManagerTest(E e, boolean z, boolean z2, boolean z3, boolean z4) throws InterruptedException {
        FixedChannelPoolFactory fixedChannelPoolFactory = new FixedChannelPoolFactory(new InstrumentedBootstrap(this.connectCallTracker).channel(NettyUtils.socketChannel()).option(ChannelOption.ALLOW_HALF_CLOSURE, true).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { // from class: com.linkedin.alpini.netty4.pool.TestChannelPoolManager.2
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                TestChannelPoolManager.this._log.debug("initChannel({})", socketChannel.id());
                socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpClientCodec(), new HttpObjectAggregator(4096), new HttpClientResponseHandler()});
            }

            public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
                super.handlerAdded(channelHandlerContext);
            }
        }), this.acquireTimeoutMillis, this.maxConnections, this.maxPendingAcquires, this.releaseHealthCheck, this.healthCheckInterval, this.channelHealthChecker, this.healthCheckerTracker);
        fixedChannelPoolFactory.setUsingFastPool(z4);
        ChannelPoolManagerImpl channelPoolManagerImpl = new ChannelPoolManagerImpl(e, fixedChannelPoolFactory, this.channelPoolResolver, this.maxWaitersPerPool, z, z2, z3) { // from class: com.linkedin.alpini.netty4.pool.TestChannelPoolManager.3
            @Nonnull
            protected CallTracker createHostAcquireCallTracker(@Nonnull String str) {
                Assert.assertSame(super.createHostAcquireCallTracker(str), CallTracker.nullTracker());
                return CallTracker.create();
            }

            @Nonnull
            protected CallTracker createHostBusyCallTracker(@Nonnull String str) {
                Assert.assertSame(super.createHostBusyCallTracker(str), CallTracker.nullTracker());
                return CallTracker.create();
            }

            @Nonnull
            protected CallTracker createQueueAcquireCallTracker(@Nonnull String str) {
                Assert.assertSame(super.createQueueAcquireCallTracker(str), CallTracker.nullTracker());
                return CallTracker.create();
            }

            @Nonnull
            protected CallTracker createQueueBusyCallTracker(@Nonnull String str) {
                Assert.assertSame(super.createQueueBusyCallTracker(str), CallTracker.nullTracker());
                return CallTracker.create();
            }

            void debugAcquireResolved(Future<Channel> future) {
                Time.sleepUninterruptably(100L);
                super.debugAcquireResolved(future);
                if (future == TestChannelPoolManager.this._cancelInAcquireResolved) {
                    TestChannelPoolManager.this._cancelInAcquireResolved.cancel(false);
                }
            }

            void debugAcquireInEventLoop(Future<Channel> future) {
                Time.sleepUninterruptably(100L);
                super.debugAcquireInEventLoop(future);
                if (future == TestChannelPoolManager.this._cancelInAcquireEventLoop) {
                    TestChannelPoolManager.this._cancelInAcquireEventLoop.cancel(false);
                }
            }

            void debugAcquireInEventLoopListener(Future<Channel> future) {
                Time.sleepUninterruptably(100L);
                super.debugAcquireInEventLoopListener(future);
                if (future == TestChannelPoolManager.this._cancelInAcquireEventLoopListener) {
                    TestChannelPoolManager.this._cancelInAcquireEventLoopListener.cancel(false);
                }
            }

            void debugAcquireInTrySuccess(Future<Channel> future) {
                Time.sleepUninterruptably(100L);
                super.debugAcquireInTrySuccess(future);
                if (future == TestChannelPoolManager.this._cancelInAcquireTrySuccess) {
                    TestChannelPoolManager.this._cancelInAcquireTrySuccess.cancel(false);
                }
            }
        };
        try {
            Assert.assertEquals(channelPoolManagerImpl.openConnections(), 0);
            Assert.assertEquals(channelPoolManagerImpl.activeCount(), 0);
            try {
                Assert.fail("should not get here: " + channelPoolManagerImpl.acquire("bad host name and port", "fail", QOS.NORMAL));
            } catch (IllegalArgumentException e2) {
            }
            for (int i = 10; i > 0; i--) {
                Future acquire = channelPoolManagerImpl.acquire("localhost:79", "fail", QOS.NORMAL);
                Assert.assertTrue(acquire.await((long) (this.acquireTimeoutMillis * 1.1d), TimeUnit.MILLISECONDS));
                Assert.assertFalse(acquire.isSuccess());
                Assert.assertTrue(acquire.cause().getMessage().matches(".*Connection refused: localhost/.*"), acquire.cause().getMessage());
                Future acquire2 = channelPoolManagerImpl.acquire("localhost:79", "fail", QOS.HIGH);
                Assert.assertFalse(acquire2.await().isSuccess());
                Assert.assertTrue(acquire2.cause().getMessage().matches(".*Connection refused: localhost/.*"), acquire2.cause().getMessage());
            }
            Thread.sleep(100L);
            Assert.assertEquals(channelPoolManagerImpl.openConnections(), 0);
            Assert.assertEquals(channelPoolManagerImpl.activeCount(), 0);
            Channel dummyServer = dummyServer(e);
            try {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) dummyServer.localAddress();
                for (int i2 = 10; i2 > 0; i2--) {
                    Future acquire3 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "default", QOS.NORMAL);
                    Assert.assertTrue(acquire3.await().isSuccess());
                    ChannelPoolManager.PoolStats poolStats = (ChannelPoolManager.PoolStats) channelPoolManagerImpl.getPoolStats("localhost:" + inetSocketAddress.getPort()).orElseThrow(NullPointerException::new);
                    Assert.assertFalse(channelPoolManagerImpl.getQueueStats("default").isPresent());
                    Assert.assertEquals(poolStats.activeCount(), 1);
                    Assert.assertTrue(channelPoolManagerImpl.release((Channel) acquire3.getNow()).syncUninterruptibly().isSuccess());
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Future acquire4 = channelPoolManagerImpl.acquire(e.next(), "localhost:" + inetSocketAddress.getPort(), "default", QOS.HIGH);
                    Assert.assertTrue(acquire4.await().isSuccess());
                    Assert.assertEquals(poolStats.activeCount(), 1);
                    Assert.assertTrue(channelPoolManagerImpl.release((Channel) acquire4.getNow()).syncUninterruptibly().isSuccess());
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Future acquire5 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.NORMAL);
                    Assert.assertTrue(acquire5.await().isSuccess());
                    Assert.assertFalse(channelPoolManagerImpl.getQueueStats("other").isPresent());
                    Assert.assertEquals(poolStats.activeCount(), 1);
                    Assert.assertTrue(channelPoolManagerImpl.release((Channel) acquire5.getNow()).syncUninterruptibly().isSuccess());
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Assert.assertTrue(channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.HIGH).cancel(false));
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Future<Channel> acquire6 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.HIGH);
                    this._cancelInAcquireResolved = acquire6;
                    Assert.assertTrue(acquire6.await(1L, TimeUnit.SECONDS));
                    Assert.assertTrue(acquire6.isCancelled());
                    Thread.sleep(100L);
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Future<Channel> acquire7 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.HIGH);
                    this._cancelInAcquireEventLoop = acquire7;
                    Assert.assertTrue(acquire7.await(1L, TimeUnit.SECONDS));
                    Assert.assertTrue(acquire7.isCancelled());
                    Thread.sleep(100L);
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Future<Channel> acquire8 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.HIGH);
                    this._cancelInAcquireEventLoopListener = acquire8;
                    Assert.assertTrue(acquire8.await(1L, TimeUnit.SECONDS));
                    Assert.assertTrue(acquire8.isCancelled());
                    Thread.sleep(100L);
                    Assert.assertEquals(poolStats.activeCount(), 0);
                    Future<Channel> acquire9 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.HIGH);
                    this._cancelInAcquireTrySuccess = acquire9;
                    Assert.assertTrue(acquire9.await(1L, TimeUnit.SECONDS));
                    Assert.assertTrue(acquire9.isCancelled());
                    Thread.sleep(100L);
                    Objects.requireNonNull(poolStats);
                    assertSupplierEquals(poolStats::activeCount, 0);
                    Future acquire10 = channelPoolManagerImpl.acquire("localhost:" + inetSocketAddress.getPort(), "other", QOS.HIGH);
                    Assert.assertTrue(acquire10.await().isSuccess());
                    Assert.assertEquals(poolStats.activeCount(), 1);
                    Channel channel = (Channel) acquire10.getNow();
                    Assert.assertTrue(((Future) channel.eventLoop().submit(() -> {
                        return channelPoolManagerImpl.release(channel);
                    }).syncUninterruptibly().getNow()).syncUninterruptibly().isSuccess());
                    Assert.assertEquals(poolStats.activeCount(), 0);
                }
                Thread.sleep(100L);
                Assert.assertEquals(channelPoolManagerImpl.activeCount(), 0);
                Assert.assertTrue(channelPoolManagerImpl.openConnections() > 0);
                ChannelPoolManager.PoolStats poolStats2 = (ChannelPoolManager.PoolStats) channelPoolManagerImpl.getPoolStats("localhost:" + inetSocketAddress.getPort()).orElseThrow(NullPointerException::new);
                Assert.assertTrue(poolStats2.isHealthy());
                Assert.assertEquals(poolStats2.activeCount(), 0);
                Assert.assertFalse(channelPoolManagerImpl.getQueueStats("other").isPresent());
                channelPoolManagerImpl.getPoolStats().forEach((str, poolStats3) -> {
                    this._log.debug("Pool of '{}' = '{}'", str, poolStats3);
                });
                Assert.assertTrue(channelPoolManagerImpl.getQueueStats().isEmpty());
                Assert.assertEquals(poolStats2.closeErrorCount(), 0L);
                Assert.assertEquals(poolStats2.closeBadCount(), 0L);
                Assert.assertTrue(channelPoolManagerImpl.close("localhost:1").isSuccess());
                this._log.debug("Closing server channel");
                dummyServer.close().awaitUninterruptibly();
            } catch (Throwable th) {
                this._log.debug("Closing server channel");
                dummyServer.close().awaitUninterruptibly();
                throw th;
            }
        } finally {
            this._log.debug("Closing all");
            Future awaitUninterruptibly = channelPoolManagerImpl.closeAll().awaitUninterruptibly();
            if (!awaitUninterruptibly.isSuccess()) {
                this._log.warn("Exception in closeAll", awaitUninterruptibly.cause());
            }
            this._log.debug("Closing all done");
            Assert.assertEquals(channelPoolManagerImpl.openConnections(), 0);
        }
    }

    private <E extends MultithreadEventLoopGroup> Channel dummyServer(E e) {
        return new ServerBootstrap().channel(NettyUtils.serverSocketChannel()).group(e).localAddress(InetAddress.getLoopbackAddress(), 0).option(ChannelOption.ALLOW_HALF_CLOSURE, true).option(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<SocketChannel>() { // from class: com.linkedin.alpini.netty4.pool.TestChannelPoolManager.4
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec(), new HttpObjectAggregator(4096), new SimpleChannelInboundHandler<FullHttpRequest>() { // from class: com.linkedin.alpini.netty4.pool.TestChannelPoolManager.4.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
                        BasicFullHttpResponse basicFullHttpResponse = new BasicFullHttpResponse(fullHttpRequest, HttpResponseStatus.OK, Unpooled.copiedBuffer("Hello world", StandardCharsets.US_ASCII));
                        HttpUtil.setContentLength(basicFullHttpResponse, r0.readableBytes());
                        HttpUtil.setKeepAlive(basicFullHttpResponse, HttpUtil.isKeepAlive(fullHttpRequest));
                        channelHandlerContext.writeAndFlush(basicFullHttpResponse);
                    }
                }});
            }
        }).bind().syncUninterruptibly().channel();
    }
}
