package org.apache.distributedlog.impl;

import com.google.common.collect.Sets;
import java.net.URI;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/distributedlog/impl/TestZKNamespaceWatcher.class */
public class TestZKNamespaceWatcher extends TestDistributedLogBase {
    private static final int zkSessionTimeoutMs = 2000;

    @Rule
    public TestName runtime = new TestName();
    protected final DistributedLogConfiguration baseConf = new DistributedLogConfiguration();
    protected ZooKeeperClient zkc;
    protected OrderedScheduler scheduler;

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).sessionTimeoutMs(zkSessionTimeoutMs).build();
        this.scheduler = OrderedScheduler.newSchedulerBuilder().name("test-zk-namespace-watcher").numThreads(1).build();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        if (null != this.zkc) {
            this.zkc.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
    }

    private void createLogInNamespace(URI uri, String str) throws Exception {
        Utils.zkCreateFullPathOptimistic(this.zkc, uri.getPath() + "/" + str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    private void deleteLogInNamespace(URI uri, String str) throws Exception {
        this.zkc.get().delete(uri.getPath() + "/" + str, -1);
    }

    @Test(timeout = 60000)
    public void testNamespaceListener() throws Exception {
        URI createDLMURI = createDLMURI("/" + this.runtime.getMethodName());
        this.zkc.get().create(createDLMURI.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        ZKNamespaceWatcher zKNamespaceWatcher = new ZKNamespaceWatcher(distributedLogConfiguration, createDLMURI, this.zkc, this.scheduler);
        final CountDownLatch[] countDownLatchArr = new CountDownLatch[10];
        for (int i = 0; i < 10; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
        }
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference(null);
        zKNamespaceWatcher.registerListener(new NamespaceListener() { // from class: org.apache.distributedlog.impl.TestZKNamespaceWatcher.1
            public void onStreamsChanged(Iterator<String> it) {
                HashSet newHashSet = Sets.newHashSet(it);
                int incrementAndGet = atomicInteger.incrementAndGet();
                atomicReference.set(newHashSet);
                countDownLatchArr[incrementAndGet - 1].countDown();
            }
        });
        HashSet newHashSet = Sets.newHashSet();
        countDownLatchArr[0].await();
        validateReceivedLogs(newHashSet, (Set) atomicReference.get());
        newHashSet.add("test1");
        createLogInNamespace(createDLMURI, "test1");
        countDownLatchArr[1].await();
        validateReceivedLogs(newHashSet, (Set) atomicReference.get());
        createLogInNamespace(createDLMURI, ".test1");
        countDownLatchArr[2].await();
        validateReceivedLogs(newHashSet, (Set) atomicReference.get());
        newHashSet.add("test2");
        createLogInNamespace(createDLMURI, "test2");
        countDownLatchArr[3].await();
        validateReceivedLogs(newHashSet, (Set) atomicReference.get());
        newHashSet.remove("test1");
        deleteLogInNamespace(createDLMURI, "test1");
        countDownLatchArr[4].await();
        validateReceivedLogs(newHashSet, (Set) atomicReference.get());
    }

    private void validateReceivedLogs(Set<String> set, Set<String> set2) {
        Assert.assertTrue(Sets.difference(set, set2).isEmpty());
    }

    @Test(timeout = 60000)
    public void testSessionExpired() throws Exception {
        URI createDLMURI = createDLMURI("/" + this.runtime.getMethodName());
        this.zkc.get().create(createDLMURI.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        ZKNamespaceWatcher zKNamespaceWatcher = new ZKNamespaceWatcher(distributedLogConfiguration, createDLMURI, this.zkc, this.scheduler);
        final CountDownLatch[] countDownLatchArr = new CountDownLatch[10];
        for (int i = 0; i < 10; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
        }
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference(null);
        zKNamespaceWatcher.registerListener(new NamespaceListener() { // from class: org.apache.distributedlog.impl.TestZKNamespaceWatcher.2
            public void onStreamsChanged(Iterator<String> it) {
                HashSet newHashSet = Sets.newHashSet(it);
                int incrementAndGet = atomicInteger.incrementAndGet();
                atomicReference.set(newHashSet);
                countDownLatchArr[incrementAndGet - 1].countDown();
            }
        });
        countDownLatchArr[0].await();
        createLogInNamespace(createDLMURI, "test1");
        countDownLatchArr[1].await();
        createLogInNamespace(createDLMURI, "test2");
        countDownLatchArr[2].await();
        Assert.assertEquals(2L, ((Set) atomicReference.get()).size());
        ZooKeeperClientUtils.expireSession(this.zkc, BKNamespaceDriver.getZKServersFromDLUri(createDLMURI), zkSessionTimeoutMs);
        countDownLatchArr[3].await();
        Assert.assertEquals(2L, ((Set) atomicReference.get()).size());
        createLogInNamespace(createDLMURI, "test3");
        countDownLatchArr[4].await();
        Assert.assertEquals(3L, ((Set) atomicReference.get()).size());
    }
}
