package org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.TestableZooKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.quorum.BufferStats;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.TestByteBufAllocator;
import org.apache.pulsar.functions.runtime.shaded.org.hamcrest.Matchers;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/server/NettyServerCnxnTest.class */
public class NettyServerCnxnTest extends ClientBase {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NettyServerCnxnTest.class);

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase
    public void setUp() throws Exception {
        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.NettyServerCnxnFactory");
        NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance());
        super.setUp();
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase
    public void tearDown() throws Exception {
        super.tearDown();
        NettyServerCnxnFactory.clearTestAllocator();
        TestByteBufAllocator.checkForLeaks();
    }

    @Test(timeout = 40000)
    public void testSendCloseSession() throws Exception {
        Assert.assertTrue("Didn't instantiate ServerCnxnFactory with NettyServerCnxnFactory!", this.serverFactory instanceof NettyServerCnxnFactory);
        TestableZooKeeper createClient = createClient();
        ZooKeeperServer server = getServer(this.serverFactory);
        try {
            createClient.create("/a", "test".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.assertNotNull("Didn't create znode:/a", createClient.exists("/a", true));
            Assert.assertEquals(1L, server.getZKDatabase().getDataTree().getWatchCount());
            Iterable<ServerCnxn> connections = this.serverFactory.getConnections();
            Assert.assertEquals("Mismatch in number of live connections!", 1L, this.serverFactory.getNumAliveConnections());
            Iterator<ServerCnxn> it = connections.iterator();
            while (it.hasNext()) {
                it.next().sendCloseSession();
            }
            LOG.info("Waiting for the channel disconnected event");
            int i = 0;
            while (this.serverFactory.getNumAliveConnections() != 0) {
                Thread.sleep(1000L);
                i += 1000;
                if (i > CONNECTION_TIMEOUT) {
                    Assert.fail("The number of live connections should be 0");
                }
            }
            Assert.assertEquals(0L, server.getZKDatabase().getDataTree().getWatchCount());
            createClient.close();
        } catch (Throwable th) {
            createClient.close();
            throw th;
        }
    }

    @Test
    public void testClientResponseStatsUpdate() throws IOException, InterruptedException, KeeperException {
        TestableZooKeeper createClient = createClient();
        Throwable th = null;
        try {
            try {
                BufferStats clientResponseStats = this.serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
                Assert.assertThat("Last client response size should be initialized with INIT_VALUE", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.equalTo(-1));
                createClient.create("/a", "test".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                Assert.assertThat("Last client response size should be greater than 0 after client request was performed", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.greaterThan(0));
                Assert.assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), createClient.getData("/a", (Watcher) null, (Stat) null));
                if (createClient != null) {
                    if (0 == 0) {
                        createClient.close();
                        return;
                    }
                    try {
                        createClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testServerSideThrottling() throws IOException, InterruptedException, KeeperException {
        TestableZooKeeper createClient = createClient();
        Throwable th = null;
        try {
            BufferStats clientResponseStats = this.serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
            Assert.assertThat("Last client response size should be initialized with INIT_VALUE", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.equalTo(-1));
            createClient.create("/a", "test".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.assertThat("Last client response size should be greater than 0 after client request was performed", Integer.valueOf(clientResponseStats.getLastBufferSize()), Matchers.greaterThan(0));
            Iterator<ServerCnxn> it = this.serverFactory.cnxns.iterator();
            while (it.hasNext()) {
                final NettyServerCnxn nettyServerCnxn = (NettyServerCnxn) it.next();
                nettyServerCnxn.disableRecv();
                nettyServerCnxn.getChannel().eventLoop().schedule(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.1
                    @Override // java.lang.Runnable
                    public void run() {
                        nettyServerCnxn.getChannel().read();
                    }
                }, 1L, TimeUnit.SECONDS);
                nettyServerCnxn.getChannel().eventLoop().schedule(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        nettyServerCnxn.enableRecv();
                    }
                }, 2L, TimeUnit.SECONDS);
            }
            Assert.assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), createClient.getData("/a", (Watcher) null, (Stat) null));
            Iterator<ServerCnxn> it2 = this.serverFactory.cnxns.iterator();
            while (it2.hasNext()) {
                final NettyServerCnxn nettyServerCnxn2 = (NettyServerCnxn) it2.next();
                nettyServerCnxn2.disableRecv();
                nettyServerCnxn2.getChannel().eventLoop().schedule(new Runnable() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.NettyServerCnxnTest.3
                    @Override // java.lang.Runnable
                    public void run() {
                        nettyServerCnxn2.enableRecv();
                    }
                }, 2L, TimeUnit.SECONDS);
            }
            Assert.assertArrayEquals("unexpected data", "test".getBytes(StandardCharsets.UTF_8), createClient.getData("/a", (Watcher) null, (Stat) null));
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }
}
