package com.linkedin.alpini.netty4.pool;

import com.linkedin.alpini.base.misc.Time;
import com.linkedin.alpini.netty4.handlers.Log4J2LoggingHandler;
import com.linkedin.venice.utils.TestUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.handler.logging.LogLevel;
import io.netty.util.concurrent.Future;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/alpini/netty4/pool/TestFixedFastChannelPoolImpl.class */
public class TestFixedFastChannelPoolImpl {
    private static final Logger LOG = LogManager.getLogger(TestFixedFastChannelPoolImpl.class);
    private EventLoopGroup _eventLoopGroup;

    @BeforeClass(groups = {"unit"})
    public void beforeClass() {
        this._eventLoopGroup = new NioEventLoopGroup(1);
    }

    @AfterClass(groups = {"unit"})
    public void afterClass() {
        Optional.ofNullable(this._eventLoopGroup).ifPresent((v0) -> {
            v0.shutdownGracefully();
        });
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public Object[][] useQueueSizeForAcquiredChannelCount() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(groups = {"unit"}, dataProvider = "useQueueSizeForAcquiredChannelCount")
    public void testMinPoolSize(boolean z) throws Exception {
        LocalAddress localAddress = new LocalAddress("testMinPoolSize");
        Log4J2LoggingHandler log4J2LoggingHandler = new Log4J2LoggingHandler("listen", LogLevel.DEBUG);
        final Log4J2LoggingHandler log4J2LoggingHandler2 = new Log4J2LoggingHandler("server", LogLevel.DEBUG);
        ServerBootstrap childHandler = new ServerBootstrap().group(this._eventLoopGroup).channel(LocalServerChannel.class).handler(log4J2LoggingHandler).childHandler(new ChannelInitializer<Channel>() { // from class: com.linkedin.alpini.netty4.pool.TestFixedFastChannelPoolImpl.1
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline().addLast(new ChannelHandler[]{log4J2LoggingHandler2});
            }
        });
        Bootstrap remoteAddress = new Bootstrap().group(this._eventLoopGroup).channel(LocalChannel.class).remoteAddress(localAddress);
        ChannelFuture sync = childHandler.bind(localAddress).sync();
        try {
            int i = 10;
            LongAdder longAdder = new LongAdder();
            LongAdder longAdder2 = new LongAdder();
            FixedFastChannelPoolImpl createNewFixedChannelPool = createNewFixedChannelPool(z, remoteAddress, 10, 100, 100, longAdder, longAdder2);
            TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 0);
            });
            Objects.requireNonNull(createNewFixedChannelPool);
            Future future = (Future) CompletableFuture.supplyAsync(createNewFixedChannelPool::acquire, this._eventLoopGroup.next()).join();
            future.sync();
            Assert.assertTrue(future.isSuccess());
            this._eventLoopGroup.execute(() -> {
                createNewFixedChannelPool.release((Channel) future.getNow());
            });
            Objects.requireNonNull(createNewFixedChannelPool);
            Future future2 = (Future) CompletableFuture.supplyAsync(createNewFixedChannelPool::acquire, this._eventLoopGroup.next()).join();
            future2.sync();
            Assert.assertTrue(future2.isSuccess());
            this._eventLoopGroup.execute(() -> {
                createNewFixedChannelPool.release((Channel) future2.getNow());
            });
            TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                LOG.info("created={}, closed={}", longAdder, longAdder2);
                Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), i);
            });
            Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 10);
            Assert.assertEquals(longAdder.intValue(), 10);
            Assert.assertEquals(longAdder2.intValue(), 0);
            Objects.requireNonNull(createNewFixedChannelPool);
            Future future3 = (Future) CompletableFuture.supplyAsync(createNewFixedChannelPool::acquire, this._eventLoopGroup.next()).join();
            future3.sync();
            Assert.assertTrue(future3.isSuccess());
            Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 10);
            Assert.assertEquals(longAdder.intValue(), 10);
            Assert.assertEquals(longAdder2.intValue(), 0);
            this._eventLoopGroup.execute(() -> {
                createNewFixedChannelPool.release((Channel) future3.getNow());
            });
            CompletableFuture completableFuture = new CompletableFuture();
            createNewFixedChannelPool.acquire().addListener(future4 -> {
                try {
                    if (future4.isSuccess()) {
                        TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                            int acquiredChannelCount = createNewFixedChannelPool.acquiredChannelCount();
                            LOG.info("Before close :{}", Integer.valueOf(acquiredChannelCount));
                            Assert.assertEquals(acquiredChannelCount, 1);
                        });
                        ((Channel) future4.getNow()).close().addListener(future4 -> {
                            ((Channel) future4.getNow()).eventLoop().execute(() -> {
                                try {
                                    TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                                        int acquiredChannelCount = createNewFixedChannelPool.acquiredChannelCount();
                                        LOG.info("After close :{}", Integer.valueOf(acquiredChannelCount));
                                        Assert.assertEquals(acquiredChannelCount, 0);
                                    });
                                    createNewFixedChannelPool.release((Channel) future4.getNow()).addListener(future4 -> {
                                        try {
                                            TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                                                int acquiredChannelCount = createNewFixedChannelPool.acquiredChannelCount();
                                                LOG.info("After release :{}", Integer.valueOf(acquiredChannelCount));
                                                Assert.assertEquals(acquiredChannelCount, 0);
                                            });
                                            completableFuture.complete(null);
                                        } catch (Throwable th) {
                                            completableFuture.completeExceptionally(th);
                                        }
                                    });
                                } catch (Throwable th) {
                                    completableFuture.completeExceptionally(th);
                                }
                            });
                        });
                    } else {
                        completableFuture.completeExceptionally(future4.cause());
                    }
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
            completableFuture.join();
            LOG.info("closing pool {}", createNewFixedChannelPool);
            Objects.requireNonNull(createNewFixedChannelPool);
            CompletableFuture.runAsync(createNewFixedChannelPool::close, this._eventLoopGroup.next()).join();
            TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                LOG.info("created={}, closed={}", longAdder, longAdder2);
                Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 0);
            });
            Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 0);
            Assert.assertEquals(longAdder2.intValue(), 10 + 1);
            sync.channel().close().sync();
        } catch (Throwable th) {
            sync.channel().close().sync();
            throw th;
        }
    }

    @Test(groups = {"unit"}, dataProvider = "useQueueSizeForAcquiredChannelCount")
    public void testPoolClose(boolean z) throws InterruptedException {
        LocalAddress localAddress = new LocalAddress("testPoolClose");
        Log4J2LoggingHandler log4J2LoggingHandler = new Log4J2LoggingHandler("listen", LogLevel.DEBUG);
        final Log4J2LoggingHandler log4J2LoggingHandler2 = new Log4J2LoggingHandler("server", LogLevel.DEBUG);
        ServerBootstrap childHandler = new ServerBootstrap().group(this._eventLoopGroup).channel(LocalServerChannel.class).handler(log4J2LoggingHandler).childHandler(new ChannelInitializer<Channel>() { // from class: com.linkedin.alpini.netty4.pool.TestFixedFastChannelPoolImpl.2
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline().addLast(new ChannelHandler[]{log4J2LoggingHandler2});
            }
        });
        Bootstrap remoteAddress = new Bootstrap().group(this._eventLoopGroup).channel(LocalChannel.class).remoteAddress(localAddress);
        ChannelFuture sync = childHandler.bind(localAddress).sync();
        try {
            int i = 10;
            LongAdder longAdder = new LongAdder();
            LongAdder longAdder2 = new LongAdder();
            FixedFastChannelPoolImpl createNewFixedChannelPool = createNewFixedChannelPool(z, remoteAddress, 10, 100, 100, longAdder, longAdder2);
            Time.sleep(100L);
            Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 0);
            Objects.requireNonNull(createNewFixedChannelPool);
            Future future = (Future) CompletableFuture.supplyAsync(createNewFixedChannelPool::acquire, this._eventLoopGroup.next()).join();
            future.sync();
            Assert.assertTrue(future.isSuccess());
            this._eventLoopGroup.execute(() -> {
                createNewFixedChannelPool.release((Channel) future.getNow());
            });
            TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                LOG.info("created={}, closed={}", longAdder, longAdder2);
                Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), i);
            });
            Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 10);
            Assert.assertEquals(longAdder.intValue(), 10);
            Assert.assertEquals(longAdder2.intValue(), 0);
            LOG.info("closing pool {}", createNewFixedChannelPool);
            Objects.requireNonNull(createNewFixedChannelPool);
            CompletableFuture.runAsync(createNewFixedChannelPool::close, this._eventLoopGroup.next()).join();
            TestUtils.waitForNonDeterministicAssertion(1L, TimeUnit.SECONDS, () -> {
                LOG.info("created={}, closed={}", longAdder, longAdder2);
                Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 0);
            });
            Assert.assertEquals(createNewFixedChannelPool.getConnectedChannels(), 0);
            Assert.assertEquals(longAdder2.intValue(), 10);
            sync.channel().close().sync();
        } catch (Throwable th) {
            sync.channel().close().sync();
            throw th;
        }
    }

    private FixedFastChannelPoolImpl createNewFixedChannelPool(boolean z, Bootstrap bootstrap, int i, int i2, int i3, final LongAdder longAdder, final LongAdder longAdder2) {
        return new FixedFastChannelPoolImpl(bootstrap, new ChannelPoolHandler() { // from class: com.linkedin.alpini.netty4.pool.TestFixedFastChannelPoolImpl.3
            public void channelReleased(Channel channel) throws Exception {
                TestFixedFastChannelPoolImpl.LOG.info("channelReleased {}", channel.id());
            }

            public void channelAcquired(Channel channel) throws Exception {
                TestFixedFastChannelPoolImpl.LOG.info("channelAcquired {}", channel.id());
            }

            public void channelCreated(Channel channel) throws Exception {
                TestFixedFastChannelPoolImpl.LOG.info("channelCreated {}", channel.id());
                ChannelFuture closeFuture = channel.closeFuture();
                LongAdder longAdder3 = longAdder2;
                closeFuture.addListener(future -> {
                    longAdder3.increment();
                });
                longAdder.increment();
            }
        }, ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, 100L, i, i2, i3, true, 1, () -> {
            return z;
        });
    }
}
