package org.apache.distributedlog.lock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.commons.cli.HelpFormatter;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/lock/TestDistributedLock.class */
public class TestDistributedLock extends TestDistributedLogBase {
    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLock.class);

    @Rule
    public TestName runtime = new TestName();
    private static final int sessionTimeoutMs = 2000;
    private ZooKeeperClient zkc;
    private ZooKeeperClient zkc0;
    private OrderedScheduler lockStateExecutor;

    /* loaded from: input_file:org/apache/distributedlog/lock/TestDistributedLock$CountDownThrowFailPointAction.class */
    static class CountDownThrowFailPointAction extends FailpointUtils.AbstractFailPointAction {
        final AtomicInteger successCounter;
        final AtomicInteger failureCounter;

        CountDownThrowFailPointAction(int i, int i2) {
            this.successCounter = new AtomicInteger(i);
            this.failureCounter = new AtomicInteger(i2);
        }

        @Override // org.apache.distributedlog.util.FailpointUtils.FailPointAction
        public boolean checkFailPoint() throws IOException {
            int andDecrement;
            if (this.successCounter.getAndDecrement() <= 0 && (andDecrement = this.failureCounter.getAndDecrement()) > 0) {
                throw new IOException("counter = " + andDecrement);
            }
            return true;
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/lock/TestDistributedLock$TestLockFactory.class */
    static class TestLockFactory {
        final String lockPath;
        final String clientId;
        final OrderedScheduler lockStateExecutor;

        public TestLockFactory(String str, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler) throws Exception {
            this.lockPath = "/" + str + System.currentTimeMillis();
            this.clientId = str;
            TestDistributedLock.createLockPath(zooKeeperClient.get(), this.lockPath);
            this.lockStateExecutor = orderedScheduler;
        }

        public ZKDistributedLock createLock(int i, ZooKeeperClient zooKeeperClient) throws Exception {
            return new ZKDistributedLock(this.lockStateExecutor, new ZKSessionLockFactory(zooKeeperClient, this.clientId + i, this.lockStateExecutor, 0, DistributedLogConstants.MAX_TXID, 2000L, NullStatsLogger.INSTANCE), this.lockPath, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
        }

        public String getLockPath() {
            return this.lockPath;
        }
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zkc = ZooKeeperClientBuilder.newBuilder().name("zkc").uri(createDLMURI("/")).sessionTimeoutMs(2000).zkAclId(null).build();
        this.zkc0 = ZooKeeperClientBuilder.newBuilder().name("zkc0").uri(createDLMURI("/")).sessionTimeoutMs(2000).zkAclId(null).build();
        this.lockStateExecutor = OrderedScheduler.newSchedulerBuilder().name("test-scheduer").numThreads(1).build();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        this.zkc.close();
        this.zkc0.close();
        this.lockStateExecutor.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void createLockPath(ZooKeeper zooKeeper, String str) throws Exception {
        zooKeeper.create(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    private static List<String> getLockWaiters(ZooKeeperClient zooKeeperClient, String str) throws Exception {
        List<String> children = zooKeeperClient.get().getChildren(str, false);
        Collections.sort(children, ZKSessionLock.MEMBER_COMPARATOR);
        return children;
    }

    private SessionLockFactory createLockFactory(String str, ZooKeeperClient zooKeeperClient) {
        return createLockFactory(str, zooKeeperClient, DistributedLogConstants.MAX_TXID, 0);
    }

    private SessionLockFactory createLockFactory(String str, ZooKeeperClient zooKeeperClient, long j, int i) {
        return new ZKSessionLockFactory(zooKeeperClient, str, this.lockStateExecutor, i, j, 2000L, NullStatsLogger.INSTANCE);
    }

    private static void checkLockAndReacquire(ZKDistributedLock zKDistributedLock, boolean z) throws Exception {
        zKDistributedLock.checkOwnershipAndReacquire();
        CompletableFuture<ZKDistributedLock> lockReacquireFuture = zKDistributedLock.getLockReacquireFuture();
        if (null == lockReacquireFuture || !z) {
            return;
        }
        Utils.ioResult(lockReacquireFuture);
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeout = 60000)
    public void testZooKeeperConnectionLossOnLockCreation() throws Exception {
        String str = "/test-zookeeper-connection-loss-on-lock-creation-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss, new CountDownThrowFailPointAction(0, Integer.MAX_VALUE));
        try {
            try {
                Utils.ioResult(new ZKDistributedLock(this.lockStateExecutor, createLockFactory("zookeeper-connection-loss", this.zkc, DistributedLogConstants.MAX_TXID, 0), str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE).asyncAcquire());
                Assert.fail("Should fail on creating lock if couldn't establishing connections to zookeeper");
            } finally {
            }
        } catch (IOException e) {
        }
        FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss, new CountDownThrowFailPointAction(0, Integer.MAX_VALUE));
        try {
            try {
                Utils.ioResult(new ZKDistributedLock(this.lockStateExecutor, createLockFactory("zookeeper-connection-loss", this.zkc, DistributedLogConstants.MAX_TXID, 3), str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE).asyncAcquire());
                Assert.fail("Should fail on creating lock if couldn't establishing connections to zookeeper after 3 retries");
            } finally {
            }
        } catch (IOException e2) {
        }
        FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss, new CountDownThrowFailPointAction(0, 3));
        try {
            ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("zookeeper-connection-loss", this.zkc, DistributedLogConstants.MAX_TXID, 5), str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
            Utils.ioResult(zKDistributedLock.asyncAcquire());
            Pair<String, Long> lockId = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
            List<String> lockWaiters = getLockWaiters(this.zkc, str);
            Assert.assertEquals(1L, lockWaiters.size());
            Assert.assertTrue(zKDistributedLock.haveLock());
            Assert.assertEquals(lockId, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
            zKDistributedLock.asyncClose();
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
        } catch (Throwable th) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testBasicAcquireRelease() throws Exception {
        String str = "/test-basic-acquire-release-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        SessionLockFactory createLockFactory = createLockFactory("basic-acquire-release", this.zkc);
        ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, createLockFactory, str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock.asyncAcquire());
        Pair<String, Long> lockId = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
        List<String> lockWaiters = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertTrue(zKDistributedLock.haveLock());
        Assert.assertEquals(lockId, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        Utils.ioResult(zKDistributedLock.asyncClose());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        Assert.assertFalse(zKDistributedLock.haveLock());
        ZKDistributedLock zKDistributedLock2 = new ZKDistributedLock(this.lockStateExecutor, createLockFactory, str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock2.asyncAcquire());
        Pair<String, Long> lockId2 = ((ZKSessionLock) zKDistributedLock2.getInternalLock()).getLockId();
        List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertTrue(zKDistributedLock2.haveLock());
        Assert.assertEquals(lockId2, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters2.get(0))));
        Assert.assertEquals(lockId, lockId2);
        Utils.ioResult(zKDistributedLock2.asyncClose());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        Assert.assertFalse(zKDistributedLock2.haveLock());
        try {
            Utils.ioResult(zKDistributedLock2.asyncAcquire());
            Assert.fail("Should fail on acquiring a closed lock");
        } catch (UnexpectedException e) {
        }
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        Assert.assertFalse(zKDistributedLock2.haveLock());
    }

    @Test(timeout = 60000)
    public void testCheckWriteLockFailureWhenLockIsAcquiredByOthers() throws Exception {
        String str = "/test-check-write-lock-failure-when-lock-is-acquired-by-others-" + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("test-check-write-lock-failure", this.zkc0), str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock.asyncAcquire());
        Pair<String, Long> lockId = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
        List<String> lockWaiters = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertTrue(zKDistributedLock.haveLock());
        Assert.assertEquals(lockId, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        checkLockAndReacquire(zKDistributedLock, true);
        Pair<String, Long> lockId2 = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
        Assert.assertFalse("New lock should be created under different session", lockId.equals(lockId2));
        List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters2.size());
        Assert.assertTrue(zKDistributedLock.haveLock());
        Assert.assertEquals(lockId2, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters2.get(0))));
        final ZKDistributedLock zKDistributedLock2 = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("test-check-write-lock-failure", this.zkc), str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.distributedlog.lock.TestDistributedLock.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Utils.ioResult(zKDistributedLock2.asyncAcquire());
                    countDownLatch.countDown();
                } catch (Exception e) {
                    TestDistributedLock.logger.error("Failed on locking lock1 : ", e);
                }
            }
        }, "lock-thread");
        thread.start();
        do {
            Thread.sleep(1L);
        } while (getLockWaiters(this.zkc, str).size() < 2);
        ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        countDownLatch.await();
        thread.join();
        try {
            checkLockAndReacquire(zKDistributedLock, true);
            Assert.fail("Should fail on checking write lock since lock is acquired by lock1");
        } catch (LockingException e) {
        }
        try {
            checkLockAndReacquire(zKDistributedLock, false);
            Assert.fail("Should fail on checking write lock since lock is acquired by lock1");
        } catch (LockingException e2) {
        }
    }

    @Test(timeout = 60000)
    public void testLockReacquireSuccessAfterCheckWriteLock() throws Exception {
        testLockReacquireSuccess(true);
    }

    @Test(timeout = 60000)
    public void testLockReacquireSuccessWithoutCheckWriteLock() throws Exception {
        testLockReacquireSuccess(false);
    }

    private void testLockReacquireSuccess(boolean z) throws Exception {
        CompletableFuture<ZKDistributedLock> lockReacquireFuture;
        String str = "/test-lock-re-acquire-success-" + z + HelpFormatter.DEFAULT_OPT_PREFIX + System.currentTimeMillis();
        createLockPath(this.zkc.get(), str);
        ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("test-lock-re-acquire", this.zkc0), str, DistributedLogConstants.MAX_TXID, NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock.asyncAcquire());
        Pair<String, Long> lockId = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
        List<String> lockWaiters = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters.size());
        Assert.assertTrue(zKDistributedLock.haveLock());
        Assert.assertEquals(lockId, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc0.get(), str, lockWaiters.get(0))));
        ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        if (z) {
            checkLockAndReacquire(zKDistributedLock, true);
            checkLockAndReacquire(zKDistributedLock, false);
            List<String> lockWaiters2 = getLockWaiters(this.zkc, str);
            Assert.assertEquals(1L, lockWaiters2.size());
            Assert.assertTrue(zKDistributedLock.haveLock());
            Pair<String, Long> lockId2 = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
            Assert.assertEquals(lockId2, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters2.get(0))));
            Assert.assertEquals("test-lock-re-acquire", lockId2.getLeft());
            Assert.assertFalse(lockId.equals(lockId2));
            Utils.ioResult(zKDistributedLock.asyncClose());
            Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
        }
        do {
            Thread.sleep(1L);
            lockReacquireFuture = zKDistributedLock.getLockReacquireFuture();
            if (null != lockReacquireFuture) {
                break;
            }
        } while (zKDistributedLock.getReacquireCount() < 1);
        if (null != lockReacquireFuture) {
            Utils.ioResult(lockReacquireFuture);
        }
        checkLockAndReacquire(zKDistributedLock, false);
        List<String> lockWaiters22 = getLockWaiters(this.zkc, str);
        Assert.assertEquals(1L, lockWaiters22.size());
        Assert.assertTrue(zKDistributedLock.haveLock());
        Pair<String, Long> lockId22 = ((ZKSessionLock) zKDistributedLock.getInternalLock()).getLockId();
        Assert.assertEquals(lockId22, Utils.ioResult(ZKSessionLock.asyncParseClientID(this.zkc.get(), str, lockWaiters22.get(0))));
        Assert.assertEquals("test-lock-re-acquire", lockId22.getLeft());
        Assert.assertFalse(lockId.equals(lockId22));
        Utils.ioResult(zKDistributedLock.asyncClose());
        Assert.assertEquals(0L, getLockWaiters(this.zkc, str).size());
    }

    @Test(timeout = 60000)
    public void testLockReacquireFailureAfterCheckWriteLock() throws Exception {
        testLockReacquireFailure(true);
    }

    @Test(timeout = 60000)
    public void testLockReacquireFailureWithoutCheckWriteLock() throws Exception {
        testLockReacquireFailure(false);
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x01df, code lost:
    
        r21 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x01e1, code lost:
    
        org.junit.Assert.assertEquals(((org.apache.distributedlog.lock.ZKSessionLock) r0.getInternalLock()).getLockId().getLeft(), r21.getCurrentOwner());
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void testLockReacquireFailure(boolean r10) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 660
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.distributedlog.lock.TestDistributedLock.testLockReacquireFailure(boolean):void");
    }

    @Test(timeout = 60000)
    public void testLockReacquire() throws Exception {
        Utils.zkCreateFullPathOptimistic(this.zkc, "/reacquirePath", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("lockHolder", this.zkc, conf.getLockTimeoutMilliSeconds(), 0), "/reacquirePath", conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock.asyncAcquire());
        zKDistributedLock.getInternalLock().unlock();
        checkLockAndReacquire(zKDistributedLock, true);
        Assert.assertEquals(true, Boolean.valueOf(zKDistributedLock.haveLock()));
        Assert.assertEquals(true, Boolean.valueOf(zKDistributedLock.getInternalLock().isLockHeld()));
        ZKDistributedLock zKDistributedLock2 = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("lockHolder_2", this.zkc, conf.getLockTimeoutMilliSeconds(), 0), "/reacquirePath", 0L, NullStatsLogger.INSTANCE);
        boolean z = false;
        try {
            Utils.ioResult(zKDistributedLock2.asyncAcquire());
        } catch (OwnershipAcquireFailedException e) {
            Assert.assertEquals("lockHolder", e.getCurrentOwner());
            z = true;
        }
        Assert.assertTrue(z);
        Utils.ioResult(zKDistributedLock.asyncClose());
        Utils.ioResult(zKDistributedLock2.asyncClose());
    }

    @Test(timeout = 60000)
    public void testLockReacquireMultiple() throws Exception {
        Utils.zkCreateFullPathOptimistic(this.zkc, "/reacquirePathMultiple", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        ZKDistributedLock zKDistributedLock = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("lockHolder", this.zkc, conf.getLockTimeoutMilliSeconds(), 0), "/reacquirePathMultiple", conf.getLockTimeoutMilliSeconds(), NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock.asyncAcquire());
        zKDistributedLock.getInternalLock().unlock();
        checkLockAndReacquire(zKDistributedLock, true);
        Assert.assertEquals(true, Boolean.valueOf(zKDistributedLock.haveLock()));
        Assert.assertEquals(true, Boolean.valueOf(zKDistributedLock.getInternalLock().isLockHeld()));
        ZKDistributedLock zKDistributedLock2 = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("lockHolder_2", this.zkc, 0L, 0), "/reacquirePathMultiple", 0L, NullStatsLogger.INSTANCE);
        boolean z = false;
        try {
            Utils.ioResult(zKDistributedLock2.asyncAcquire());
        } catch (OwnershipAcquireFailedException e) {
            Assert.assertEquals("lockHolder", e.getCurrentOwner());
            z = true;
        }
        Assert.assertTrue(z);
        Utils.ioResult(zKDistributedLock2.asyncClose());
        Utils.ioResult(zKDistributedLock.asyncClose());
        Assert.assertEquals(false, Boolean.valueOf(zKDistributedLock.haveLock()));
        Assert.assertEquals(false, Boolean.valueOf(zKDistributedLock.getInternalLock().isLockHeld()));
        ZKDistributedLock zKDistributedLock3 = new ZKDistributedLock(this.lockStateExecutor, createLockFactory("lockHolder_3", this.zkc, 0L, 0), "/reacquirePathMultiple", 0L, NullStatsLogger.INSTANCE);
        Utils.ioResult(zKDistributedLock3.asyncAcquire());
        Assert.assertEquals(true, Boolean.valueOf(zKDistributedLock3.haveLock()));
        Assert.assertEquals(true, Boolean.valueOf(zKDistributedLock3.getInternalLock().isLockHeld()));
        Utils.ioResult(zKDistributedLock3.asyncClose());
    }

    void assertLatchesSet(CountDownLatch[] countDownLatchArr, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertEquals("latch " + i2 + " should have been set", 0L, countDownLatchArr[i2].getCount());
        }
        for (int i3 = i; i3 < countDownLatchArr.length; i3++) {
            Assert.assertEquals("latch " + i3 + " should not have been set", 1L, countDownLatchArr[i3].getCount());
        }
    }

    void assertLockState(ZKDistributedLock zKDistributedLock, boolean z, boolean z2, ZKDistributedLock zKDistributedLock2, boolean z3, boolean z4, int i, String str) throws Exception {
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(zKDistributedLock.haveLock()));
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(zKDistributedLock.getInternalLock() != null && zKDistributedLock.getInternalLock().isLockHeld()));
        Assert.assertEquals(Boolean.valueOf(z3), Boolean.valueOf(zKDistributedLock2.haveLock()));
        Assert.assertEquals(Boolean.valueOf(z4), Boolean.valueOf(zKDistributedLock2.getInternalLock() != null && zKDistributedLock2.getInternalLock().isLockHeld()));
        Assert.assertEquals(i, getLockWaiters(this.zkc, str).size());
    }

    @Test(timeout = 60000)
    public void testAsyncAcquireBasics() throws Exception {
        TestLockFactory testLockFactory = new TestLockFactory(this.runtime.getMethodName(), this.zkc, this.lockStateExecutor);
        ArrayList arrayList = new ArrayList(3);
        ZKDistributedLock[] zKDistributedLockArr = new ZKDistributedLock[3];
        final CountDownLatch[] countDownLatchArr = new CountDownLatch[3];
        for (int i = 0; i < 3; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
            zKDistributedLockArr[i] = testLockFactory.createLock(i, this.zkc);
            final int i2 = i;
            arrayList.add(zKDistributedLockArr[i].asyncAcquire().whenComplete((BiConsumer<? super ZKDistributedLock, ? super Throwable>) new FutureEventListener<ZKDistributedLock>() { // from class: org.apache.distributedlog.lock.TestDistributedLock.3
                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(ZKDistributedLock zKDistributedLock) {
                    countDownLatchArr[i2].countDown();
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    Assert.fail("unexpected failure " + th);
                }
            }));
        }
        for (int i3 = 0; i3 < 3; i3++) {
            countDownLatchArr[i3].await();
            assertLatchesSet(countDownLatchArr, i3 + 1);
            Utils.ioResult((CompletableFuture) arrayList.get(i3));
            Utils.ioResult(zKDistributedLockArr[i3].asyncClose());
        }
    }

    @Test(timeout = 60000)
    public void testAsyncAcquireSyncThenAsyncOnSameLock() throws Exception {
        TestLockFactory testLockFactory = new TestLockFactory(this.runtime.getMethodName(), this.zkc, this.lockStateExecutor);
        ZKDistributedLock createLock = testLockFactory.createLock(0, this.zkc);
        final ZKDistributedLock createLock2 = testLockFactory.createLock(1, this.zkc0);
        Utils.ioResult(createLock.asyncAcquire());
        assertLockState(createLock, true, true, createLock2, false, false, 1, testLockFactory.getLockPath());
        new Thread(new Runnable() { // from class: org.apache.distributedlog.lock.TestDistributedLock.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Utils.ioResult(createLock2.asyncAcquire());
                } catch (Exception e) {
                    Assert.fail("shouldn't fail to acquire");
                }
            }
        }, "lock1-thread").start();
        while (getLockWaiters(this.zkc, testLockFactory.getLockPath()).size() < 2) {
            Thread.sleep(1L);
        }
        assertLockState(createLock, true, true, createLock2, false, false, 2, testLockFactory.getLockPath());
        Utils.ioResult(createLock.asyncClose());
        Utils.ioResult(createLock2.getLockAcquireFuture());
        assertLockState(createLock, false, false, createLock2, true, true, 1, testLockFactory.getLockPath());
        Utils.ioResult(createLock2.asyncClose());
        assertLockState(createLock, false, false, createLock2, false, false, 0, testLockFactory.getLockPath());
    }

    @Test(timeout = 60000)
    public void testAsyncAcquireExpireDuringWait() throws Exception {
        TestLockFactory testLockFactory = new TestLockFactory(this.runtime.getMethodName(), this.zkc, this.lockStateExecutor);
        ZKDistributedLock createLock = testLockFactory.createLock(0, this.zkc);
        ZKDistributedLock createLock2 = testLockFactory.createLock(1, this.zkc0);
        Utils.ioResult(createLock.asyncAcquire());
        CompletableFuture<ZKDistributedLock> asyncAcquire = createLock2.asyncAcquire();
        while (null == createLock2.getLockWaiter()) {
            TimeUnit.MILLISECONDS.sleep(20L);
        }
        ZooKeeperClientUtils.expireSession(this.zkc0, zkServers, 2000);
        try {
            Utils.ioResult(asyncAcquire);
            Assert.fail("future should have been failed");
        } catch (OwnershipAcquireFailedException e) {
        }
        assertLockState(createLock, true, true, createLock2, false, false, 1, testLockFactory.getLockPath());
        createLock.asyncClose();
        createLock2.asyncClose();
    }

    @Test(timeout = 60000)
    public void testAsyncAcquireCloseDuringWait() throws Exception {
        TestLockFactory testLockFactory = new TestLockFactory(this.runtime.getMethodName(), this.zkc, this.lockStateExecutor);
        ZKDistributedLock createLock = testLockFactory.createLock(0, this.zkc);
        ZKDistributedLock createLock2 = testLockFactory.createLock(1, this.zkc0);
        Utils.ioResult(createLock.asyncAcquire());
        CompletableFuture<ZKDistributedLock> asyncAcquire = createLock2.asyncAcquire();
        Utils.ioResult(createLock2.asyncClose());
        try {
            Utils.ioResult(asyncAcquire);
            Assert.fail("future should have been failed");
        } catch (LockClosedException e) {
        }
        assertLockState(createLock, true, true, createLock2, false, false, 1, testLockFactory.getLockPath());
        createLock.asyncClose();
    }

    @Test(timeout = 60000)
    public void testAsyncAcquireCloseAfterAcquire() throws Exception {
        ZKDistributedLock createLock = new TestLockFactory(this.runtime.getMethodName(), this.zkc, this.lockStateExecutor).createLock(0, this.zkc);
        CompletableFuture<ZKDistributedLock> asyncAcquire = createLock.asyncAcquire();
        Utils.ioResult(asyncAcquire);
        Utils.ioResult(createLock.asyncClose());
        Utils.ioResult(asyncAcquire);
        Assert.assertEquals(false, Boolean.valueOf(createLock.haveLock()));
        Assert.assertEquals(false, Boolean.valueOf(createLock.getInternalLock().isLockHeld()));
    }
}
