package org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZKParameterized;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.client.ZKClientConfig;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.server.ServerCnxn;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.runner.RunWith;
import org.apache.pulsar.functions.runtime.shaded.org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameterized.UseParametersRunnerFactory(ZKParameterized.RunnerFactory.class)
@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/RemoveWatchesTest.class */
public class RemoveWatchesTest extends ClientBase {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RemoveWatchesTest.class);
    private ZooKeeper zk1 = null;
    private ZooKeeper zk2 = null;
    private final boolean useAsync;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/RemoveWatchesTest$MyCallback.class */
    public class MyCallback implements AsyncCallback.VoidCallback {
        private final String path;
        private final int rc;
        private String eventPath;
        int eventRc;
        private CountDownLatch latch = new CountDownLatch(1);

        public MyCallback(int i, String str) {
            this.rc = i;
            this.path = str;
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback.VoidCallback
        public void processResult(int i, String str, Object obj) {
            System.out.println("latch:" + this.path + " " + str);
            this.eventPath = str;
            this.eventRc = i;
            this.latch.countDown();
        }

        public boolean matches() throws InterruptedException {
            return this.latch.await((long) (ClientBase.CONNECTION_TIMEOUT / 5), TimeUnit.MILLISECONDS) && this.path.equals(this.eventPath) && this.rc == this.eventRc;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/RemoveWatchesTest$MyWatcher.class */
    private class MyWatcher implements Watcher {
        private final String path;
        private String eventPath;
        private CountDownLatch latch;
        private List<Watcher.Event.EventType> eventsAfterWatchRemoval = new ArrayList();

        MyWatcher(String str, int i) {
            this.path = str;
            this.latch = new CountDownLatch(i);
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
        public void process(WatchedEvent watchedEvent) {
            RemoveWatchesTest.LOG.debug("Event path : {}, eventPath : {}" + new Object[]{this.path, watchedEvent.getPath()});
            this.eventPath = watchedEvent.getPath();
            if (this.latch.getCount() == 0 && watchedEvent.getType() != Watcher.Event.EventType.None) {
                this.eventsAfterWatchRemoval.add(watchedEvent.getType());
            }
            if (watchedEvent.getType() == Watcher.Event.EventType.ChildWatchRemoved || watchedEvent.getType() == Watcher.Event.EventType.DataWatchRemoved) {
                this.latch.countDown();
            }
        }

        public boolean matches() throws InterruptedException {
            if (this.latch.await(ClientBase.CONNECTION_TIMEOUT / 5, TimeUnit.MILLISECONDS)) {
                RemoveWatchesTest.LOG.debug("Client path : {} eventPath : {}", this.path, this.eventPath);
                return this.path.equals(this.eventPath);
            }
            RemoveWatchesTest.LOG.error("Failed waiting to remove the watches");
            return false;
        }

        public List<Watcher.Event.EventType> getEventsAfterWatchRemoval() {
            return this.eventsAfterWatchRemoval;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/RemoveWatchesTest$MyZooKeeper.class */
    private class MyZooKeeper extends ZooKeeper {
        private MyWatchManager myWatchManager;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/zookeeper/RemoveWatchesTest$MyZooKeeper$MyWatchManager.class */
        public class MyWatchManager extends ZooKeeper.ZKWatchManager {
            public int lastrc;

            public MyWatchManager(boolean z) {
                super(z);
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper.ZKWatchManager
            void containsWatcher(String str, Watcher watcher, Watcher.WatcherType watcherType) throws KeeperException.NoWatcherException {
            }

            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper.ZKWatchManager
            protected boolean removeWatches(Map<String, Set<Watcher>> map, Watcher watcher, String str, boolean z, int i, Set<Watcher> set) throws KeeperException {
                this.lastrc = i;
                return false;
            }
        }

        public MyZooKeeper(String str, int i, Watcher watcher) throws IOException {
            super(str, i, watcher, false);
        }

        @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper
        protected ZooKeeper.ZKWatchManager defaultWatchManager() {
            this.myWatchManager = new MyWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
            return this.myWatchManager;
        }

        public int getRemoveWatchesRC() {
            return this.myWatchManager.lastrc;
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase
    public void setUp() throws Exception {
        super.setUp();
        this.zk1 = createClient();
        this.zk2 = createClient();
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.test.ClientBase
    public void tearDown() throws Exception {
        if (this.zk1 != null) {
            this.zk1.close();
        }
        if (this.zk2 != null) {
            this.zk2.close();
        }
        super.tearDown();
    }

    public RemoveWatchesTest(boolean z) {
        this.useAsync = z;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> configs() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    private void removeWatches(ZooKeeper zooKeeper, String str, Watcher watcher, Watcher.WatcherType watcherType, boolean z, KeeperException.Code code) throws InterruptedException, KeeperException {
        LOG.info("Sending removeWatches req using zk {} path: {} type: {} watcher: {} ", zooKeeper, str, watcherType, watcher);
        if (!this.useAsync) {
            zooKeeper.removeWatches(str, watcher, watcherType, z);
            return;
        }
        MyCallback myCallback = new MyCallback(code.intValue(), str);
        zooKeeper.removeWatches(str, watcher, watcherType, z, myCallback, null);
        Assert.assertTrue("Didn't succeeds removeWatch operation", myCallback.matches());
        if (KeeperException.Code.OK.intValue() != myCallback.rc) {
            throw KeeperException.create(KeeperException.Code.get(myCallback.rc));
        }
    }

    private void removeAllWatches(ZooKeeper zooKeeper, String str, Watcher.WatcherType watcherType, boolean z, KeeperException.Code code) throws InterruptedException, KeeperException {
        LOG.info("Sending removeWatches req using zk {} path: {} type: {} ", zooKeeper, str, watcherType);
        if (!this.useAsync) {
            zooKeeper.removeAllWatches(str, watcherType, z);
            return;
        }
        MyCallback myCallback = new MyCallback(code.intValue(), str);
        zooKeeper.removeAllWatches(str, watcherType, z, myCallback, null);
        Assert.assertTrue("Didn't succeeds removeWatch operation", myCallback.matches());
        if (KeeperException.Code.OK.intValue() != myCallback.rc) {
            throw KeeperException.create(KeeperException.Code.get(myCallback.rc));
        }
    }

    @Test(timeout = 90000)
    public void testRemoveSingleWatcher() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        this.zk1.create("/node2", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        MyWatcher myWatcher = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        MyWatcher myWatcher2 = new MyWatcher("/node2", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node2", myWatcher2));
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
        Assert.assertEquals("Didn't find data watcher", 1L, this.zk2.getDataWatches().size());
        Assert.assertEquals("Didn't find data watcher", "/node2", this.zk2.getDataWatches().get(0));
        removeWatches(this.zk2, "/node2", myWatcher2, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", myWatcher2.matches());
        if (this.zk1 != null) {
            this.zk1.close();
            this.zk1 = null;
        }
        Assert.assertFalse("Shouldn't get NodeDeletedEvent after watch removal", myWatcher.getEventsAfterWatchRemoval().contains(Watcher.Event.EventType.NodeDeleted));
        Assert.assertEquals("Shouldn't get NodeDeletedEvent after watch removal", 0L, r0.size());
    }

    @Test(timeout = 90000)
    public void testMultipleDataWatchers() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Watcher myWatcher = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher2));
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
        Assert.assertEquals("Didn't find data watcher", 1L, this.zk2.getDataWatches().size());
        Assert.assertEquals("Didn't find data watcher", "/node1", this.zk2.getDataWatches().get(0));
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", myWatcher2.matches());
        if (this.zk1 != null) {
            this.zk1.close();
            this.zk1 = null;
        }
        Assert.assertEquals("Shouldn't get NodeDeletedEvent after watch removal", 0L, myWatcher2.getEventsAfterWatchRemoval().size());
    }

    @Test(timeout = 90000)
    public void testMultipleChildWatchers() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 1);
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Children, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
        Assert.assertEquals("Didn't find child watcher", 1L, this.zk2.getChildWatches().size());
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher.matches());
        this.zk1.create("/node1/node2", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        int i = 30;
        while (i > 0 && myWatcher.getEventsAfterWatchRemoval().size() <= 0) {
            i--;
            Thread.sleep(100L);
        }
        Assert.assertEquals("Shouldn't get NodeChildrenChanged event", 0L, myWatcher2.getEventsAfterWatchRemoval().size());
    }

    @Test(timeout = 90000)
    public void testRemoveAllWatchers() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 2);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 2);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher2));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        this.zk1.create("/node1/child", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Assert.assertTrue("Didn't remove data watcher", myWatcher.matches());
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
    }

    @Test(timeout = 90000)
    public void testRemoveAllDataWatchers() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 1);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher2));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
        this.zk1.create("/node1/child", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Assert.assertTrue("Didn't remove data watcher", myWatcher.matches());
        Assert.assertTrue("Didn't remove data watcher", myWatcher2.matches());
        int i = 10;
        while (i > 0 && (myWatcher.getEventsAfterWatchRemoval().size() <= 0 || myWatcher2.getEventsAfterWatchRemoval().size() <= 0)) {
            i--;
            Thread.sleep(1000L);
        }
        List<Watcher.Event.EventType> eventsAfterWatchRemoval = myWatcher.getEventsAfterWatchRemoval();
        Assert.assertEquals("Didn't get NodeChildrenChanged event", 1L, eventsAfterWatchRemoval.size());
        Assert.assertTrue("Didn't get NodeChildrenChanged event", eventsAfterWatchRemoval.contains(Watcher.Event.EventType.NodeChildrenChanged));
        List<Watcher.Event.EventType> eventsAfterWatchRemoval2 = myWatcher2.getEventsAfterWatchRemoval();
        Assert.assertEquals("Didn't get NodeChildrenChanged event", 1L, eventsAfterWatchRemoval2.size());
        Assert.assertTrue("Didn't get NodeChildrenChanged event", eventsAfterWatchRemoval2.contains(Watcher.Event.EventType.NodeChildrenChanged));
    }

    @Test(timeout = 90000)
    public void testRemoveAllChildWatchers() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 1);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher2));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Children, false, KeeperException.Code.OK);
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Children, false, KeeperException.Code.OK);
        this.zk1.setData("/node1", "test".getBytes(), -1);
        Assert.assertTrue("Didn't remove child watcher", myWatcher.matches());
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
        int i = 10;
        while (i > 0 && (myWatcher.getEventsAfterWatchRemoval().size() <= 0 || myWatcher2.getEventsAfterWatchRemoval().size() <= 0)) {
            i--;
            Thread.sleep(1000L);
        }
        List<Watcher.Event.EventType> eventsAfterWatchRemoval = myWatcher.getEventsAfterWatchRemoval();
        Assert.assertEquals("Didn't get NodeDataChanged event", 1L, eventsAfterWatchRemoval.size());
        Assert.assertTrue("Didn't get NodeDataChanged event", eventsAfterWatchRemoval.contains(Watcher.Event.EventType.NodeDataChanged));
        List<Watcher.Event.EventType> eventsAfterWatchRemoval2 = myWatcher2.getEventsAfterWatchRemoval();
        Assert.assertEquals("Didn't get NodeDataChanged event", 1L, eventsAfterWatchRemoval2.size());
        Assert.assertTrue("Didn't get NodeDataChanged event", eventsAfterWatchRemoval2.contains(Watcher.Event.EventType.NodeDataChanged));
    }

    @Test(timeout = 90000)
    public void testNoWatcherException() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 2);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 2);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNull("Didn't set data watches", this.zk2.exists("/node2", myWatcher2));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        MyWatcher myWatcher3 = new MyWatcher("/node1", 2);
        try {
            removeWatches(this.zk2, "/node1", myWatcher3, Watcher.WatcherType.Any, false, KeeperException.Code.NOWATCHER);
            Assert.fail("Should throw exception as given watcher doesn't exists");
        } catch (KeeperException.NoWatcherException e) {
        }
        try {
            removeWatches(this.zk2, "/node1", myWatcher3, Watcher.WatcherType.Children, false, KeeperException.Code.NOWATCHER);
            Assert.fail("Should throw exception as given watcher doesn't exists");
        } catch (KeeperException.NoWatcherException e2) {
        }
        try {
            removeWatches(this.zk2, "/node1", myWatcher3, Watcher.WatcherType.Data, false, KeeperException.Code.NOWATCHER);
            Assert.fail("Should throw exception as given watcher doesn't exists");
        } catch (KeeperException.NoWatcherException e3) {
        }
        try {
            removeWatches(this.zk2, "/nonexists", myWatcher3, Watcher.WatcherType.Data, false, KeeperException.Code.NOWATCHER);
            Assert.fail("Should throw exception as given watcher doesn't exists");
        } catch (KeeperException.NoWatcherException e4) {
        }
    }

    @Test(timeout = 90000)
    public void testRemoveAnyDataWatcher() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 1);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 2);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding data watcher {} on path {}", myWatcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher2));
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", myWatcher.matches());
        Assert.assertEquals("Didn't find child watcher", 1L, this.zk2.getChildWatches().size());
        Assert.assertEquals("Didn't find data watcher", 1L, this.zk2.getDataWatches().size());
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
    }

    @Test(timeout = 90000)
    public void testRemoveAnyChildWatcher() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 2);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
        Assert.assertEquals("Didn't find child watcher", 1L, this.zk2.getChildWatches().size());
        Assert.assertEquals("Didn't find data watcher", 1L, this.zk2.getDataWatches().size());
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove watchers", myWatcher.matches());
    }

    @Test(timeout = 90000)
    public void testRemoveWatcherWhenNoConnection() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 2);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        stopServer();
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Any, true, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
        Assert.assertFalse("Shouldn't remove data watcher", myWatcher.matches());
        try {
            removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.CONNECTIONLOSS);
            Assert.fail("Should throw exception as last watch removal requires server connection");
        } catch (KeeperException.ConnectionLossException e) {
        }
        Assert.assertFalse("Shouldn't remove data watcher", myWatcher.matches());
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, true, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", myWatcher.matches());
    }

    @Test(timeout = 90000)
    public void testManyPreNodeWatchers() throws Exception {
        ArrayList arrayList = new ArrayList(50);
        for (int i = 0; i < 50; i++) {
            String str = "/node" + i;
            MyWatcher myWatcher = new MyWatcher(str, 1);
            arrayList.add(myWatcher);
            LOG.info("Adding pre node watcher {} on path {}", myWatcher, str);
            this.zk1.exists(str, myWatcher);
        }
        Assert.assertEquals("Failed to add watchers!", 50, this.zk1.getExistWatches().size());
        for (int i2 = 0; i2 < 50; i2++) {
            MyWatcher myWatcher2 = (MyWatcher) arrayList.get(i2);
            removeWatches(this.zk1, "/node" + i2, myWatcher2, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
            Assert.assertTrue("Didn't remove data watcher", myWatcher2.matches());
        }
        Assert.assertEquals("Didn't remove watch references!", 0L, this.zk1.getExistWatches().size());
    }

    @Test(timeout = 90000)
    public void testManyChildWatchers() throws Exception {
        ArrayList arrayList = new ArrayList(50);
        for (int i = 0; i < 50; i++) {
            String str = "/node" + i;
            this.zk1.create(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            String str2 = str + "/";
        }
        for (int i2 = 0; i2 < 50; i2++) {
            String str3 = "/node" + i2;
            MyWatcher myWatcher = new MyWatcher("/node" + i2, 1);
            arrayList.add(myWatcher);
            LOG.info("Adding child watcher {} on path {}", myWatcher, str3);
            this.zk1.getChildren(str3, myWatcher);
            String str4 = str3 + "/";
        }
        Assert.assertEquals("Failed to add watchers!", 50, this.zk1.getChildWatches().size());
        for (int i3 = 0; i3 < 50; i3++) {
            MyWatcher myWatcher2 = (MyWatcher) arrayList.get(i3);
            removeWatches(this.zk1, "/node" + i3, myWatcher2, Watcher.WatcherType.Children, false, KeeperException.Code.OK);
            Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
        }
        Assert.assertEquals("Didn't remove watch references!", 0L, this.zk1.getChildWatches().size());
    }

    @Test(timeout = 90000)
    public void testManyDataWatchers() throws Exception {
        ArrayList arrayList = new ArrayList(50);
        for (int i = 0; i < 50; i++) {
            String str = "/node" + i;
            MyWatcher myWatcher = new MyWatcher("/node" + i, 1);
            arrayList.add(myWatcher);
            this.zk1.create(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            LOG.info("Adding data watcher {} on path {}", myWatcher, str);
            this.zk1.getData(str, myWatcher, (Stat) null);
            String str2 = str + "/";
        }
        Assert.assertEquals("Failed to add watchers!", 50, this.zk1.getDataWatches().size());
        for (int i2 = 0; i2 < 50; i2++) {
            MyWatcher myWatcher2 = (MyWatcher) arrayList.get(i2);
            removeWatches(this.zk1, "/node" + i2, myWatcher2, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
            Assert.assertTrue("Didn't remove data watcher", myWatcher2.matches());
        }
        Assert.assertEquals("Didn't remove watch references!", 0L, this.zk1.getDataWatches().size());
    }

    @Test(timeout = 90000)
    public void testManyWatchersWhenNoConnection() throws Exception {
        ArrayList arrayList = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            String str = "/node" + i;
            this.zk1.create(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            String str2 = str + "/";
        }
        for (int i2 = 0; i2 < 3; i2++) {
            String str3 = "/node" + i2;
            MyWatcher myWatcher = new MyWatcher("/node" + i2, 2);
            arrayList.add(myWatcher);
            LOG.info("Adding child watcher {} on path {}", myWatcher, str3);
            this.zk1.getChildren(str3, myWatcher);
            String str4 = str3 + "/";
        }
        Assert.assertEquals("Failed to add watchers!", 3, this.zk1.getChildWatches().size());
        for (int i3 = 0; i3 < 3; i3++) {
            String str5 = "/node" + i3;
            MyWatcher myWatcher2 = (MyWatcher) arrayList.get(i3);
            LOG.info("Adding data watcher {} on path {}", myWatcher2, str5);
            this.zk1.getData(str5, myWatcher2, (Stat) null);
            String str6 = str5 + "/";
        }
        Assert.assertEquals("Failed to add watchers!", 3, this.zk1.getDataWatches().size());
        stopServer();
        for (int i4 = 0; i4 < 3; i4++) {
            MyWatcher myWatcher3 = (MyWatcher) arrayList.get(i4);
            removeWatches(this.zk1, "/node" + i4, myWatcher3, Watcher.WatcherType.Any, true, KeeperException.Code.OK);
            Assert.assertTrue("Didn't remove watcher", myWatcher3.matches());
        }
        Assert.assertEquals("Didn't remove watch references!", 0L, this.zk1.getChildWatches().size());
        Assert.assertEquals("Didn't remove watch references!", 0L, this.zk1.getDataWatches().size());
    }

    @Test(timeout = 90000)
    public void testChRootRemoveWatcher() throws Exception {
        this.zk1.create("/appsX", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        if (this.zk1 != null) {
            this.zk1.close();
        }
        if (this.zk2 != null) {
            this.zk2.close();
        }
        this.zk1 = createClient(this.hostPort + "/appsX");
        this.zk2 = createClient(this.hostPort + "/appsX");
        LOG.info("Creating child znode /node1 using chRoot client");
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher("/node1", 2);
        MyWatcher myWatcher2 = new MyWatcher("/node1", 1);
        LOG.info("Adding data watcher {} on path {}", myWatcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", myWatcher));
        LOG.info("Adding child watcher {} on path {}", myWatcher, "/node1");
        this.zk2.getChildren("/node1", myWatcher2);
        LOG.info("Adding child watcher {} on path {}", myWatcher2, "/node1");
        this.zk2.getChildren("/node1", myWatcher);
        removeWatches(this.zk2, "/node1", myWatcher, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher.matches());
        Assert.assertEquals("Didn't find child watcher", 1L, this.zk2.getChildWatches().size());
        removeWatches(this.zk2, "/node1", myWatcher2, Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", myWatcher2.matches());
    }

    @Test(timeout = 90000)
    public void testNoWatcherServerException() throws InterruptedException, IOException, TimeoutException {
        ClientBase.CountdownWatcher countdownWatcher = new ClientBase.CountdownWatcher();
        MyZooKeeper myZooKeeper = new MyZooKeeper(this.hostPort, CONNECTION_TIMEOUT, countdownWatcher);
        boolean z = false;
        countdownWatcher.waitForConnected(CONNECTION_TIMEOUT);
        try {
            myZooKeeper.removeWatches("/nowatchhere", countdownWatcher, Watcher.WatcherType.Data, false);
        } catch (KeeperException e) {
            if (e.code().intValue() == KeeperException.Code.NOWATCHER.intValue()) {
                z = true;
            }
        }
        Assert.assertTrue("Server didn't return NOWATCHER", myZooKeeper.getRemoveWatchesRC() == KeeperException.Code.NOWATCHER.intValue());
        Assert.assertTrue("NoWatcherException didn't happen", z);
    }

    @Test(timeout = 90000)
    public void testRemoveAllNoWatcherException() throws IOException, InterruptedException, KeeperException {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            removeAllWatches(this.zk2, "/node1", Watcher.WatcherType.Any, false, KeeperException.Code.NOWATCHER);
            Assert.fail("Should throw exception as given watcher doesn't exists");
        } catch (KeeperException.NoWatcherException e) {
        }
    }

    @Test(timeout = 30000)
    public void testNullWatcherReference() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            if (this.useAsync) {
                this.zk1.removeWatches("/node1", null, Watcher.WatcherType.Data, false, null, null);
            } else {
                this.zk1.removeWatches("/node1", null, Watcher.WatcherType.Data, false);
            }
            Assert.fail("Must throw IllegalArgumentException as watcher is null!");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test(timeout = 90000)
    public void testRemoveWhenMultipleDataWatchesOnAPath() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Watcher watcher = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.1
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.DataWatchRemoved) {
                    countDownLatch2.countDown();
                }
            }
        };
        Watcher watcher2 = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.2
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    countDownLatch.countDown();
                }
            }
        };
        LOG.info("Adding data watcher {} on path {}", watcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", watcher));
        LOG.info("Adding data watcher {} on path {}", watcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", watcher2));
        removeWatches(this.zk2, "/node1", watcher, Watcher.WatcherType.Data, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", countDownLatch2.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
        this.zk1.setData("/node1", "test".getBytes(), -1);
        LOG.info("Waiting for data watchers to be notified");
        Assert.assertTrue("Didn't get data watch notification!", countDownLatch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
    }

    @Test(timeout = 90000)
    public void testRemoveWhenMultipleChildWatchesOnAPath() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Watcher watcher = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.3
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.ChildWatchRemoved) {
                    countDownLatch2.countDown();
                }
            }
        };
        Watcher watcher2 = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.4
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    countDownLatch.countDown();
                }
            }
        };
        LOG.info("Adding child watcher {} on path {}", watcher, "/node1");
        Assert.assertEquals("Didn't set child watches", 0L, this.zk2.getChildren("/node1", watcher).size());
        LOG.info("Adding child watcher {} on path {}", watcher2, "/node1");
        Assert.assertEquals("Didn't set child watches", 0L, this.zk2.getChildren("/node1", watcher2).size());
        removeWatches(this.zk2, "/node1", watcher, Watcher.WatcherType.Children, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", countDownLatch2.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
        this.zk1.create("/node1/node2", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        LOG.info("Waiting for child watchers to be notified");
        Assert.assertTrue("Didn't get child watch notification!", countDownLatch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
    }

    @Test(timeout = 90000)
    public void testRemoveAllDataWatchesOnAPath() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final CountDownLatch countDownLatch2 = new CountDownLatch(2);
        Watcher watcher = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.5
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case DataWatchRemoved:
                        countDownLatch2.countDown();
                        return;
                    case NodeDataChanged:
                        countDownLatch.countDown();
                        return;
                    default:
                        return;
                }
            }
        };
        Watcher watcher2 = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.6
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case DataWatchRemoved:
                        countDownLatch2.countDown();
                        return;
                    case NodeDataChanged:
                        countDownLatch.countDown();
                        return;
                    default:
                        return;
                }
            }
        };
        LOG.info("Adding data watcher {} on path {}", watcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", watcher));
        LOG.info("Adding data watcher {} on path {}", watcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", watcher2));
        Assert.assertTrue("Server session is not a watcher", isServerSessionWatcher(this.zk2.getSessionId(), "/node1", Watcher.WatcherType.Data));
        removeAllWatches(this.zk2, "/node1", Watcher.WatcherType.Data, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", countDownLatch2.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Server session is still a watcher after removal", isServerSessionWatcher(this.zk2.getSessionId(), "/node1", Watcher.WatcherType.Data));
    }

    @Test(timeout = 90000)
    public void testRemoveAllChildWatchesOnAPath() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final CountDownLatch countDownLatch2 = new CountDownLatch(2);
        Watcher watcher = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.7
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case ChildWatchRemoved:
                        countDownLatch2.countDown();
                        return;
                    case NodeChildrenChanged:
                        countDownLatch.countDown();
                        return;
                    default:
                        return;
                }
            }
        };
        Watcher watcher2 = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.8
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case ChildWatchRemoved:
                        countDownLatch2.countDown();
                        return;
                    case NodeChildrenChanged:
                        countDownLatch.countDown();
                        return;
                    default:
                        return;
                }
            }
        };
        LOG.info("Adding child watcher {} on path {}", watcher, "/node1");
        Assert.assertEquals("Didn't set child watches", 0L, this.zk2.getChildren("/node1", watcher).size());
        LOG.info("Adding child watcher {} on path {}", watcher2, "/node1");
        Assert.assertEquals("Didn't set child watches", 0L, this.zk2.getChildren("/node1", watcher2).size());
        Assert.assertTrue("Server session is not a watcher", isServerSessionWatcher(this.zk2.getSessionId(), "/node1", Watcher.WatcherType.Children));
        removeAllWatches(this.zk2, "/node1", Watcher.WatcherType.Children, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove child watcher", countDownLatch2.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Server session is still a watcher after removal", isServerSessionWatcher(this.zk2.getSessionId(), "/node1", Watcher.WatcherType.Children));
    }

    @Test(timeout = 90000)
    public void testRemoveAllWatchesOnAPath() throws Exception {
        this.zk1.create("/node1", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final CountDownLatch countDownLatch2 = new CountDownLatch(4);
        Watcher watcher = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.9
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                switch (watchedEvent.getType()) {
                    case DataWatchRemoved:
                    case ChildWatchRemoved:
                        countDownLatch2.countDown();
                        return;
                    case NodeDataChanged:
                    case NodeChildrenChanged:
                        countDownLatch.countDown();
                        return;
                    default:
                        return;
                }
            }
        };
        Watcher watcher2 = new Watcher() { // from class: org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.RemoveWatchesTest.10
            @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                switch (AnonymousClass11.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[watchedEvent.getType().ordinal()]) {
                    case 1:
                    case 3:
                        countDownLatch2.countDown();
                        return;
                    case 2:
                    case 4:
                        countDownLatch.countDown();
                        return;
                    default:
                        return;
                }
            }
        };
        LOG.info("Adding child watcher {} on path {}", watcher, "/node1");
        Assert.assertEquals("Didn't set child watches", 0L, this.zk2.getChildren("/node1", watcher).size());
        LOG.info("Adding child watcher {} on path {}", watcher2, "/node1");
        Assert.assertEquals("Didn't set child watches", 0L, this.zk2.getChildren("/node1", watcher2).size());
        LOG.info("Adding data watcher {} on path {}", watcher, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", watcher));
        LOG.info("Adding data watcher {} on path {}", watcher2, "/node1");
        Assert.assertNotNull("Didn't set data watches", this.zk2.exists("/node1", watcher2));
        Assert.assertTrue("Server session is not a watcher", isServerSessionWatcher(this.zk2.getSessionId(), "/node1", Watcher.WatcherType.Data));
        removeAllWatches(this.zk2, "/node1", Watcher.WatcherType.Any, false, KeeperException.Code.OK);
        Assert.assertTrue("Didn't remove data watcher", countDownLatch2.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Server session is still a watcher after removal", isServerSessionWatcher(this.zk2.getSessionId(), "/node1", Watcher.WatcherType.Data));
        Assert.assertEquals("Received watch notification after removal!", 2L, countDownLatch.getCount());
    }

    private boolean isServerSessionWatcher(long j, String str, Watcher.WatcherType watcherType) {
        HashSet<ServerCnxn> hashSet = new HashSet();
        CollectionUtils.addAll(hashSet, this.serverFactory.getConnections().iterator());
        for (ServerCnxn serverCnxn : hashSet) {
            if (serverCnxn.getSessionId() == j) {
                return getServer(this.serverFactory).getZKDatabase().getDataTree().containsWatcher(str, watcherType, serverCnxn);
            }
        }
        return false;
    }
}
