package org.apache.distributedlog.impl;

import com.google.common.collect.Lists;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.distributedlog.callback.LogSegmentNamesListener;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/impl/TestZKLogSegmentMetadataStore.class */
public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
    private static final Logger logger = LoggerFactory.getLogger(TestZKLogSegmentMetadataStore.class);
    private static final int zkSessionTimeoutMs = 2000;

    @Rule
    public TestName runtime = new TestName();
    protected final DistributedLogConfiguration baseConf = new DistributedLogConfiguration();
    protected ZooKeeperClient zkc;
    protected ZKLogSegmentMetadataStore lsmStore;
    protected OrderedScheduler scheduler;
    protected URI uri;
    protected String rootZkPath;

    private LogSegmentMetadata createLogSegment(long j) {
        return createLogSegment(j, 99L);
    }

    private LogSegmentMetadata createLogSegment(long j, long j2) {
        return DLMTestUtil.completedLogSegment("/" + this.runtime.getMethodName(), j, j, 1L, 100, j, j2, 0L, LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).sessionTimeoutMs(zkSessionTimeoutMs).build();
        this.scheduler = OrderedScheduler.newSchedulerBuilder().name("test-zk-logsegment-metadata-store").numThreads(1).build();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(this.baseConf);
        this.uri = createDLMURI("/" + this.runtime.getMethodName());
        this.lsmStore = new ZKLogSegmentMetadataStore(distributedLogConfiguration, this.zkc, this.scheduler);
        this.zkc.get().create("/" + this.runtime.getMethodName(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.rootZkPath = "/" + this.runtime.getMethodName();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        if (null != this.zkc) {
            this.zkc.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
    }

    @Test(timeout = 60000)
    public void testCreateLogSegment() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction.execute());
        Assert.assertNotNull("LogSegment " + createLogSegment + " should be created", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        LogSegmentMetadata createLogSegment2 = createLogSegment(1L);
        Transaction transaction2 = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction2, createLogSegment2, (Transaction.OpListener) null);
        try {
            Utils.ioResult(transaction2.execute());
            Assert.fail("Should fail if log segment exists");
        } catch (Throwable th) {
            Assert.assertTrue("Should throw NodeExistsException if log segment exists", th instanceof ZKException);
            Assert.assertEquals("Should throw NodeExistsException if log segment exists", KeeperException.Code.NODEEXISTS, th.getKeeperExceptionCode());
        }
    }

    @Test(timeout = 60000)
    public void testDeleteLogSegment() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction.execute());
        Assert.assertNotNull("LogSegment " + createLogSegment + " should be created", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        Transaction transaction2 = this.lsmStore.transaction();
        this.lsmStore.deleteLogSegment(transaction2, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction2.execute());
        Assert.assertNull("LogSegment " + createLogSegment + " should be deleted", this.zkc.get().exists(createLogSegment.getZkPath(), false));
    }

    @Test(timeout = 60000)
    public void testDeleteNonExistentLogSegment() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.deleteLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        try {
            Utils.ioResult(transaction.execute());
            Assert.fail("Should fail deletion if log segment doesn't exist");
        } catch (Throwable th) {
            Assert.assertTrue("Should throw NoNodeException if log segment doesn't exist", th instanceof ZKException);
            Assert.assertEquals("Should throw NoNodeException if log segment doesn't exist", KeeperException.Code.NONODE, th.getKeeperExceptionCode());
        }
    }

    @Test(timeout = 60000)
    public void testUpdateNonExistentLogSegment() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.updateLogSegment(transaction, createLogSegment);
        try {
            Utils.ioResult(transaction.execute());
            Assert.fail("Should fail update if log segment doesn't exist");
        } catch (Throwable th) {
            Assert.assertTrue("Should throw NoNodeException if log segment doesn't exist", th instanceof ZKException);
            Assert.assertEquals("Should throw NoNodeException if log segment doesn't exist", KeeperException.Code.NONODE, th.getKeeperExceptionCode());
        }
    }

    @Test(timeout = 60000)
    public void testUpdateLogSegment() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L, 99L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction.execute());
        Assert.assertNotNull("LogSegment " + createLogSegment + " should be created", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        LogSegmentMetadata createLogSegment2 = createLogSegment(1L, 999L);
        Transaction transaction2 = this.lsmStore.transaction();
        this.lsmStore.updateLogSegment(transaction2, createLogSegment2);
        Utils.ioResult(transaction2.execute());
        Assert.assertEquals("Last entry id should be changed from 99L to 999L", 999L, ((LogSegmentMetadata) Utils.ioResult(LogSegmentMetadata.read(this.zkc, createLogSegment.getZkPath(), true))).getLastEntryId());
    }

    @Test(timeout = 60000)
    public void testCreateDeleteLogSegmentSuccess() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L);
        LogSegmentMetadata createLogSegment2 = createLogSegment(2L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction.execute());
        Assert.assertNotNull("LogSegment " + createLogSegment + " should be created", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        Transaction transaction2 = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction2, createLogSegment2, (Transaction.OpListener) null);
        this.lsmStore.deleteLogSegment(transaction2, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction2.execute());
        Assert.assertNull("LogSegment " + createLogSegment + " should be deleted", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        Assert.assertNotNull("LogSegment " + createLogSegment2 + " should be created", this.zkc.get().exists(createLogSegment2.getZkPath(), false));
    }

    @Test(timeout = 60000)
    public void testCreateDeleteLogSegmentFailure() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L);
        LogSegmentMetadata createLogSegment2 = createLogSegment(2L);
        LogSegmentMetadata createLogSegment3 = createLogSegment(3L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction.execute());
        Assert.assertNotNull("LogSegment " + createLogSegment + " should be created", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        Transaction transaction2 = this.lsmStore.transaction();
        this.lsmStore.deleteLogSegment(transaction2, createLogSegment, (Transaction.OpListener) null);
        this.lsmStore.deleteLogSegment(transaction2, createLogSegment2, (Transaction.OpListener) null);
        this.lsmStore.createLogSegment(transaction2, createLogSegment3, (Transaction.OpListener) null);
        try {
            Utils.ioResult(transaction2.execute());
            Assert.fail("Should fail transaction if one operation failed");
        } catch (Throwable th) {
            Assert.assertTrue("Transaction is aborted", th instanceof ZKException);
            Assert.assertEquals("Transaction is aborted", KeeperException.Code.NONODE, th.getKeeperExceptionCode());
        }
        Assert.assertNotNull("LogSegment " + createLogSegment + " should not be deleted", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        Assert.assertNull("LogSegment " + createLogSegment3 + " should be created", this.zkc.get().exists(createLogSegment3.getZkPath(), false));
    }

    @Test(timeout = 60000)
    public void testGetLogSegment() throws Exception {
        LogSegmentMetadata createLogSegment = createLogSegment(1L, 99L);
        Transaction transaction = this.lsmStore.transaction();
        this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        Utils.ioResult(transaction.execute());
        Assert.assertNotNull("LogSegment " + createLogSegment + " should be created", this.zkc.get().exists(createLogSegment.getZkPath(), false));
        Assert.assertEquals("Log segment should match", createLogSegment, (LogSegmentMetadata) Utils.ioResult(this.lsmStore.getLogSegment(createLogSegment.getZkPath())));
    }

    @Test(timeout = 60000)
    public void testGetLogSegmentNames() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(10);
        for (int i = 0; i < 10; i++) {
            LogSegmentMetadata createLogSegment = createLogSegment(i);
            newArrayListWithExpectedSize.add(createLogSegment);
            this.lsmStore.createLogSegment(transaction, createLogSegment, (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction.execute());
        String str = "/" + this.runtime.getMethodName();
        List children = this.zkc.get().getChildren(str, false);
        Collections.sort(children);
        Assert.assertEquals("Should find 10 log segments", 10L, children.size());
        List list = (List) ((Versioned) Utils.ioResult(this.lsmStore.getLogSegmentNames(str, (LogSegmentNamesListener) null))).getValue();
        Collections.sort(list);
        Assert.assertEquals("Should find 10 log segments", 10L, list.size());
        Assert.assertEquals(children, list);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(10);
        for (int i2 = 0; i2 < 10; i2++) {
            newArrayListWithExpectedSize2.add(this.lsmStore.getLogSegment(str + "/" + ((String) list.get(i2))));
        }
        List list2 = (List) Utils.ioResult(FutureUtils.collect(newArrayListWithExpectedSize2));
        for (int i3 = 0; i3 < 10; i3++) {
            Assert.assertEquals(newArrayListWithExpectedSize.get(i3), list2.get(i3));
        }
    }

    @Test(timeout = 60000)
    public void testRegisterListenerAfterLSMStoreClosed() throws Exception {
        this.lsmStore.close();
        this.lsmStore.getLogSegmentNames(createLogSegment(1L).getZkPath(), new LogSegmentNamesListener() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.1
            public void onSegmentsUpdated(Versioned<List<String>> versioned) {
            }

            public void onLogStreamDeleted() {
            }
        });
        Assert.assertTrue("No listener is registered", this.lsmStore.listeners.isEmpty());
    }

    @Test(timeout = 60000)
    public void testLogSegmentNamesListener() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        for (int i = 0; i < 3; i++) {
            this.lsmStore.createLogSegment(transaction, createLogSegment(i), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction.execute());
        String str = "/" + this.runtime.getMethodName();
        List children = this.zkc.get().getChildren(str, false);
        Collections.sort(children);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(2);
        LogSegmentNamesListener logSegmentNamesListener = new LogSegmentNamesListener() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.2
            public void onSegmentsUpdated(Versioned<List<String>> versioned) {
                TestZKLogSegmentMetadataStore.logger.info("Received segments : {}", versioned);
                newArrayListWithExpectedSize.add((List) versioned.getValue());
                atomicInteger.incrementAndGet();
            }

            public void onLogStreamDeleted() {
            }
        };
        this.lsmStore.getLogSegmentNames(str, logSegmentNamesListener);
        Assert.assertEquals(1L, this.lsmStore.listeners.size());
        Assert.assertTrue("Should contain listener", this.lsmStore.listeners.containsKey(str));
        Assert.assertTrue("Should contain listener", ((Map) this.lsmStore.listeners.get(str)).containsKey(logSegmentNamesListener));
        while (atomicInteger.get() < 1) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive one segment list update", 1L, atomicInteger.get());
        List list = (List) newArrayListWithExpectedSize.get(0);
        Collections.sort(list);
        Assert.assertEquals("List of segments should be same", children, list);
        logger.info("Create another {} segments.", 3);
        Transaction transaction2 = this.lsmStore.transaction();
        for (int i2 = 3; i2 < 2 * 3; i2++) {
            this.lsmStore.createLogSegment(transaction2, createLogSegment(i2), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction2.execute());
        List children2 = this.zkc.get().getChildren(str, false);
        Collections.sort(children2);
        logger.info("All log segments become {}", children2);
        while (atomicInteger.get() < 2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive second segment list update", 2L, atomicInteger.get());
        List list2 = (List) newArrayListWithExpectedSize.get(1);
        Collections.sort(list2);
        Assert.assertEquals("List of segments should be updated", 2 * 3, list2.size());
        Assert.assertEquals("List of segments should be updated", children2, list2);
    }

    @Test(timeout = 60000)
    public void testLogSegmentNamesListenerOnDeletion() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        for (int i = 0; i < 3; i++) {
            this.lsmStore.createLogSegment(transaction, createLogSegment(i), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction.execute());
        String str = "/" + this.runtime.getMethodName();
        List children = this.zkc.get().getChildren(str, false);
        Collections.sort(children);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(2);
        LogSegmentNamesListener logSegmentNamesListener = new LogSegmentNamesListener() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.3
            public void onSegmentsUpdated(Versioned<List<String>> versioned) {
                TestZKLogSegmentMetadataStore.logger.info("Received segments : {}", versioned);
                newArrayListWithExpectedSize.add((List) versioned.getValue());
                atomicInteger.incrementAndGet();
            }

            public void onLogStreamDeleted() {
            }
        };
        this.lsmStore.getLogSegmentNames(str, logSegmentNamesListener);
        Assert.assertEquals(1L, this.lsmStore.listeners.size());
        Assert.assertTrue("Should contain listener", this.lsmStore.listeners.containsKey(str));
        Assert.assertTrue("Should contain listener", ((Map) this.lsmStore.listeners.get(str)).containsKey(logSegmentNamesListener));
        while (atomicInteger.get() < 1) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive one segment list update", 1L, atomicInteger.get());
        List list = (List) newArrayListWithExpectedSize.get(0);
        Collections.sort(list);
        Assert.assertEquals("List of segments should be same", children, list);
        Transaction transaction2 = this.lsmStore.transaction();
        for (int i2 = 0; i2 < 3; i2++) {
            this.lsmStore.deleteLogSegment(transaction2, createLogSegment(i2), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction2.execute());
        List children2 = this.zkc.get().getChildren(str, false);
        Collections.sort(children2);
        while (atomicInteger.get() < 2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive second segment list update", 2L, atomicInteger.get());
        List list2 = (List) newArrayListWithExpectedSize.get(1);
        Collections.sort(list2);
        Assert.assertEquals("List of segments should be updated", 0L, list2.size());
        Assert.assertEquals("List of segments should be updated", children2, list2);
        this.zkc.get().delete(str, -1);
        while (!this.lsmStore.listeners.isEmpty()) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertTrue("listener should be removed after root path is deleted", this.lsmStore.listeners.isEmpty());
    }

    @Test(timeout = 60000)
    public void testLogSegmentNamesListenerOnSessionExpired() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        for (int i = 0; i < 3; i++) {
            this.lsmStore.createLogSegment(transaction, createLogSegment(i), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction.execute());
        String str = "/" + this.runtime.getMethodName();
        List children = this.zkc.get().getChildren(str, false);
        Collections.sort(children);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(2);
        LogSegmentNamesListener logSegmentNamesListener = new LogSegmentNamesListener() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.4
            public void onSegmentsUpdated(Versioned<List<String>> versioned) {
                TestZKLogSegmentMetadataStore.logger.info("Received segments : {}", versioned);
                newArrayListWithExpectedSize.add((List) versioned.getValue());
                atomicInteger.incrementAndGet();
            }

            public void onLogStreamDeleted() {
            }
        };
        this.lsmStore.getLogSegmentNames(str, logSegmentNamesListener);
        Assert.assertEquals(1L, this.lsmStore.listeners.size());
        Assert.assertTrue("Should contain listener", this.lsmStore.listeners.containsKey(str));
        Assert.assertTrue("Should contain listener", ((Map) this.lsmStore.listeners.get(str)).containsKey(logSegmentNamesListener));
        while (atomicInteger.get() < 1) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive one segment list update", 1L, atomicInteger.get());
        List list = (List) newArrayListWithExpectedSize.get(0);
        Collections.sort(list);
        Assert.assertEquals("List of segments should be same", children, list);
        ZooKeeperClientUtils.expireSession(this.zkc, BKNamespaceDriver.getZKServersFromDLUri(this.uri), conf.getZKSessionTimeoutMilliseconds());
        logger.info("Create another {} segments.", 3);
        Transaction transaction2 = this.lsmStore.transaction();
        for (int i2 = 3; i2 < 2 * 3; i2++) {
            this.lsmStore.createLogSegment(transaction2, createLogSegment(i2), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction2.execute());
        List children2 = this.zkc.get().getChildren(str, false);
        Collections.sort(children2);
        logger.info("All log segments become {}", children2);
        while (atomicInteger.get() < 2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive third segment list update", 2L, atomicInteger.get());
        List list2 = (List) newArrayListWithExpectedSize.get(1);
        Collections.sort(list2);
        Assert.assertEquals("List of segments should be updated", 2 * 3, list2.size());
        Assert.assertEquals("List of segments should be updated", children2, list2);
    }

    @Test(timeout = 60000)
    public void testLogSegmentNamesListenerOnDeletingLogStream() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        for (int i = 0; i < 3; i++) {
            this.lsmStore.createLogSegment(transaction, createLogSegment(i), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction.execute());
        String str = "/" + this.runtime.getMethodName();
        List children = this.zkc.get().getChildren(str, false);
        Collections.sort(children);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(2);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        LogSegmentNamesListener logSegmentNamesListener = new LogSegmentNamesListener() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.5
            public void onSegmentsUpdated(Versioned<List<String>> versioned) {
                TestZKLogSegmentMetadataStore.logger.info("Received segments : {}", versioned);
                newArrayListWithExpectedSize.add((List) versioned.getValue());
                atomicInteger.incrementAndGet();
            }

            public void onLogStreamDeleted() {
                countDownLatch.countDown();
            }
        };
        this.lsmStore.getLogSegmentNames(str, logSegmentNamesListener);
        Assert.assertEquals(1L, this.lsmStore.listeners.size());
        Assert.assertTrue("Should contain listener", this.lsmStore.listeners.containsKey(str));
        Assert.assertTrue("Should contain listener", ((Map) this.lsmStore.listeners.get(str)).containsKey(logSegmentNamesListener));
        while (atomicInteger.get() < 1) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive one segment list update", 1L, atomicInteger.get());
        List list = (List) newArrayListWithExpectedSize.get(0);
        Collections.sort(list);
        Assert.assertEquals("List of segments should be same", children, list);
        Transaction transaction2 = this.lsmStore.transaction();
        for (int i2 = 0; i2 < 3; i2++) {
            this.lsmStore.deleteLogSegment(transaction2, createLogSegment(i2), (Transaction.OpListener) null);
        }
        Utils.ioResult(transaction2.execute());
        List children2 = this.zkc.get().getChildren(str, false);
        Collections.sort(children2);
        while (atomicInteger.get() < 2) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertEquals("Should receive second segment list update", 2L, atomicInteger.get());
        List list2 = (List) newArrayListWithExpectedSize.get(1);
        Collections.sort(list2);
        Assert.assertEquals("List of segments should be updated", 0L, list2.size());
        Assert.assertEquals("List of segments should be updated", children2, list2);
        this.zkc.get().delete(str, -1);
        while (!this.lsmStore.listeners.isEmpty()) {
            TimeUnit.MILLISECONDS.sleep(10L);
        }
        Assert.assertTrue("listener should be removed after root path is deleted", this.lsmStore.listeners.isEmpty());
        countDownLatch.await();
    }

    @Test(timeout = 60000)
    public void testStoreMaxLogSegmentSequenceNumber() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        Versioned versioned = new Versioned(999L, new LongVersion(0L));
        final CompletableFuture completableFuture = new CompletableFuture();
        LogMetadata logMetadata = (LogMetadata) Mockito.mock(LogMetadata.class);
        Mockito.when(logMetadata.getLogSegmentsPath()).thenReturn(this.rootZkPath);
        this.lsmStore.storeMaxLogSegmentSequenceNumber(transaction, logMetadata, versioned, new Transaction.OpListener<Version>() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.6
            public void onCommit(Version version) {
                completableFuture.complete(version);
            }

            public void onAbort(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        Utils.ioResult(transaction.execute());
        Assert.assertEquals(1L, ((LongVersion) Utils.ioResult(completableFuture)).getLongVersion());
        Assert.assertEquals(999L, DLUtils.deserializeLogSegmentSequenceNumber(this.zkc.get().getData(this.rootZkPath, false, new Stat())));
        Assert.assertEquals(1L, r0.getVersion());
    }

    @Test(timeout = 60000)
    public void testStoreMaxLogSegmentSequenceNumberBadVersion() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        Versioned versioned = new Versioned(999L, new LongVersion(10L));
        final CompletableFuture completableFuture = new CompletableFuture();
        LogMetadata logMetadata = (LogMetadata) Mockito.mock(LogMetadata.class);
        Mockito.when(logMetadata.getLogSegmentsPath()).thenReturn(this.rootZkPath);
        this.lsmStore.storeMaxLogSegmentSequenceNumber(transaction, logMetadata, versioned, new Transaction.OpListener<Version>() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.7
            public void onCommit(Version version) {
                completableFuture.complete(version);
            }

            public void onAbort(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        try {
            Utils.ioResult(transaction.execute());
            Assert.fail("Should fail on storing log segment sequence number if providing bad version");
        } catch (ZKException e) {
            Assert.assertEquals(KeeperException.Code.BADVERSION, e.getKeeperExceptionCode());
        }
        try {
            Utils.ioResult(completableFuture);
            Assert.fail("Should fail on storing log segment sequence number if providing bad version");
        } catch (ZKException e2) {
            Assert.assertEquals(KeeperException.Code.BADVERSION, e2.getKeeperExceptionCode());
        }
        byte[] data = this.zkc.get().getData(this.rootZkPath, false, new Stat());
        Assert.assertEquals(0L, r0.getVersion());
        Assert.assertEquals(0L, data.length);
    }

    @Test(timeout = 60000)
    public void testStoreMaxLogSegmentSequenceNumberOnNonExistentPath() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        Versioned versioned = new Versioned(999L, new LongVersion(10L));
        final CompletableFuture completableFuture = new CompletableFuture();
        String str = this.rootZkPath + "/non-existent";
        LogMetadata logMetadata = (LogMetadata) Mockito.mock(LogMetadata.class);
        Mockito.when(logMetadata.getLogSegmentsPath()).thenReturn(str);
        this.lsmStore.storeMaxLogSegmentSequenceNumber(transaction, logMetadata, versioned, new Transaction.OpListener<Version>() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.8
            public void onCommit(Version version) {
                completableFuture.complete(version);
            }

            public void onAbort(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        try {
            Utils.ioResult(transaction.execute());
            Assert.fail("Should fail on storing log segment sequence number if path doesn't exist");
        } catch (ZKException e) {
            Assert.assertEquals(KeeperException.Code.NONODE, e.getKeeperExceptionCode());
        }
        try {
            Utils.ioResult(completableFuture);
            Assert.fail("Should fail on storing log segment sequence number if path doesn't exist");
        } catch (ZKException e2) {
            Assert.assertEquals(KeeperException.Code.NONODE, e2.getKeeperExceptionCode());
        }
    }

    @Test(timeout = 60000)
    public void testStoreMaxTxnId() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        Versioned versioned = new Versioned(999L, new LongVersion(0L));
        final CompletableFuture completableFuture = new CompletableFuture();
        LogMetadataForWriter logMetadataForWriter = (LogMetadataForWriter) Mockito.mock(LogMetadataForWriter.class);
        Mockito.when(logMetadataForWriter.getMaxTxIdPath()).thenReturn(this.rootZkPath);
        this.lsmStore.storeMaxTxnId(transaction, logMetadataForWriter, versioned, new Transaction.OpListener<Version>() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.9
            public void onCommit(Version version) {
                completableFuture.complete(version);
            }

            public void onAbort(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        Utils.ioResult(transaction.execute());
        Assert.assertEquals(1L, ((LongVersion) Utils.ioResult(completableFuture)).getLongVersion());
        Assert.assertEquals(999L, DLUtils.deserializeTransactionId(this.zkc.get().getData(this.rootZkPath, false, new Stat())));
        Assert.assertEquals(1L, r0.getVersion());
    }

    @Test(timeout = 60000)
    public void testStoreMaxTxnIdBadVersion() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        Versioned versioned = new Versioned(999L, new LongVersion(10L));
        final CompletableFuture completableFuture = new CompletableFuture();
        LogMetadataForWriter logMetadataForWriter = (LogMetadataForWriter) Mockito.mock(LogMetadataForWriter.class);
        Mockito.when(logMetadataForWriter.getMaxTxIdPath()).thenReturn(this.rootZkPath);
        this.lsmStore.storeMaxTxnId(transaction, logMetadataForWriter, versioned, new Transaction.OpListener<Version>() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.10
            public void onCommit(Version version) {
                completableFuture.complete(version);
            }

            public void onAbort(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        try {
            Utils.ioResult(transaction.execute());
            Assert.fail("Should fail on storing log record transaction id if providing bad version");
        } catch (ZKException e) {
            Assert.assertEquals(KeeperException.Code.BADVERSION, e.getKeeperExceptionCode());
        }
        try {
            Utils.ioResult(completableFuture);
            Assert.fail("Should fail on storing log record transaction id if providing bad version");
        } catch (ZKException e2) {
            Assert.assertEquals(KeeperException.Code.BADVERSION, e2.getKeeperExceptionCode());
        }
        byte[] data = this.zkc.get().getData(this.rootZkPath, false, new Stat());
        Assert.assertEquals(0L, r0.getVersion());
        Assert.assertEquals(0L, data.length);
    }

    @Test(timeout = 60000)
    public void testStoreMaxTxnIdOnNonExistentPath() throws Exception {
        Transaction transaction = this.lsmStore.transaction();
        Versioned versioned = new Versioned(999L, new LongVersion(10L));
        final CompletableFuture completableFuture = new CompletableFuture();
        String str = this.rootZkPath + "/non-existent";
        LogMetadataForWriter logMetadataForWriter = (LogMetadataForWriter) Mockito.mock(LogMetadataForWriter.class);
        Mockito.when(logMetadataForWriter.getMaxTxIdPath()).thenReturn(str);
        this.lsmStore.storeMaxTxnId(transaction, logMetadataForWriter, versioned, new Transaction.OpListener<Version>() { // from class: org.apache.distributedlog.impl.TestZKLogSegmentMetadataStore.11
            public void onCommit(Version version) {
                completableFuture.complete(version);
            }

            public void onAbort(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        try {
            Utils.ioResult(transaction.execute());
            Assert.fail("Should fail on storing log record transaction id if path doesn't exist");
        } catch (ZKException e) {
            Assert.assertEquals(KeeperException.Code.NONODE, e.getKeeperExceptionCode());
        }
        try {
            Utils.ioResult(completableFuture);
            Assert.fail("Should fail on storing log record transaction id if path doesn't exist");
        } catch (ZKException e2) {
            Assert.assertEquals(KeeperException.Code.NONODE, e2.getKeeperExceptionCode());
        }
    }
}
