package org.apache.pulsar.metadata;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/pulsar/metadata/ZKSessionTest.class */
public class ZKSessionTest extends BaseMetadataStoreTest {
    @Test
    public void testDisconnection() throws Exception {
        MetadataStoreExtended create = MetadataStoreExtended.create(this.zks.getConnectionString(), MetadataStoreConfig.builder().sessionTimeoutMillis(300000).build());
        try {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            Objects.requireNonNull(linkedBlockingQueue);
            create.registerSessionListener((v1) -> {
                r1.add(v1);
            });
            this.zks.stop();
            Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS), SessionEvent.ConnectionLost);
            this.zks.start();
            Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(20L, TimeUnit.SECONDS), SessionEvent.Reconnected);
            Assert.assertNull((SessionEvent) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testSessionLost() throws Exception {
        MetadataStoreExtended create = MetadataStoreExtended.create(this.zks.getConnectionString(), MetadataStoreConfig.builder().sessionTimeoutMillis(5000).build());
        try {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            Objects.requireNonNull(linkedBlockingQueue);
            create.registerSessionListener((v1) -> {
                r1.add(v1);
            });
            this.zks.stop();
            Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS), SessionEvent.ConnectionLost);
            Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.SessionLost);
            this.zks.start();
            TestZKServer testZKServer = this.zks;
            Assert.assertTrue(TestZKServer.waitForServerUp(this.zks.getConnectionString(), 30000L));
            Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.Reconnected);
            Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.SessionReestablished);
            Assert.assertNull((SessionEvent) linkedBlockingQueue.poll(1L, TimeUnit.SECONDS));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReacquireLocksAfterSessionLost() throws Exception {
        ZKMetadataStore create = MetadataStoreExtended.create(this.zks.getConnectionString(), MetadataStoreConfig.builder().sessionTimeoutMillis(2000).build());
        try {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            Objects.requireNonNull(linkedBlockingQueue);
            create.registerSessionListener((v1) -> {
                r1.add(v1);
            });
            CoordinationServiceImpl coordinationServiceImpl = new CoordinationServiceImpl(create);
            try {
                LockManager lockManager = coordinationServiceImpl.getLockManager(String.class);
                try {
                    String newKey = newKey();
                    ResourceLock resourceLock = (ResourceLock) lockManager.acquireLock(newKey, "value-1").join();
                    this.zks.expireSession(create.getZkSessionId());
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS), SessionEvent.ConnectionLost);
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.SessionLost);
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.Reconnected);
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.SessionReestablished);
                    Awaitility.await().untilAsserted(() -> {
                        Assert.assertTrue(((Optional) create.get(newKey).join()).isPresent());
                    });
                    Assert.assertFalse(resourceLock.getLockExpiredFuture().isDone());
                    if (Collections.singletonList(lockManager).get(0) != null) {
                        lockManager.close();
                    }
                    if (Collections.singletonList(coordinationServiceImpl).get(0) != null) {
                        coordinationServiceImpl.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(lockManager).get(0) != null) {
                        lockManager.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(coordinationServiceImpl).get(0) != null) {
                    coordinationServiceImpl.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReacquireLeadershipAfterSessionLost() throws Exception {
        ZKMetadataStore create = MetadataStoreExtended.create(this.zks.getConnectionString(), MetadataStoreConfig.builder().sessionTimeoutMillis(2000).build());
        try {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            Objects.requireNonNull(linkedBlockingQueue);
            create.registerSessionListener((v1) -> {
                r1.add(v1);
            });
            LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
            String newKey = newKey();
            CoordinationServiceImpl coordinationServiceImpl = new CoordinationServiceImpl(create);
            try {
                Objects.requireNonNull(linkedBlockingQueue2);
                LeaderElection leaderElection = coordinationServiceImpl.getLeaderElection(String.class, newKey, (v1) -> {
                    r3.add(v1);
                });
                try {
                    leaderElection.elect("value-1").join();
                    Assert.assertEquals(leaderElection.getState(), LeaderElectionState.Leading);
                    Assert.assertEquals((LeaderElectionState) linkedBlockingQueue2.poll(5L, TimeUnit.SECONDS), LeaderElectionState.Leading);
                    this.zks.expireSession(create.getZkSessionId());
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS), SessionEvent.ConnectionLost);
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.SessionLost);
                    Awaitility.await().atMost(Duration.ofSeconds(15L)).untilAsserted(() -> {
                        Assert.assertEquals(leaderElection.getState(), LeaderElectionState.Leading);
                    });
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.Reconnected);
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.SessionReestablished);
                    Awaitility.await().atMost(Duration.ofSeconds(15L)).untilAsserted(() -> {
                        Assert.assertEquals(leaderElection.getState(), LeaderElectionState.Leading);
                    });
                    Assert.assertTrue(((Optional) create.get(newKey).join()).isPresent());
                    if (Collections.singletonList(leaderElection).get(0) != null) {
                        leaderElection.close();
                    }
                    if (Collections.singletonList(coordinationServiceImpl).get(0) != null) {
                        coordinationServiceImpl.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(leaderElection).get(0) != null) {
                        leaderElection.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(coordinationServiceImpl).get(0) != null) {
                    coordinationServiceImpl.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testElectAfterReconnected() throws Exception {
        MetadataStoreExtended create = MetadataStoreExtended.create(this.zks.getConnectionString(), MetadataStoreConfig.builder().sessionTimeoutMillis(2000).build());
        try {
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            Objects.requireNonNull(linkedBlockingQueue);
            create.registerSessionListener((v1) -> {
                r1.add(v1);
            });
            LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
            String newKey = newKey();
            CoordinationServiceImpl coordinationServiceImpl = new CoordinationServiceImpl(create);
            try {
                Objects.requireNonNull(linkedBlockingQueue2);
                LeaderElection leaderElection = coordinationServiceImpl.getLeaderElection(String.class, newKey, (v1) -> {
                    r3.add(v1);
                });
                try {
                    leaderElection.elect("value-1").join();
                    Assert.assertEquals(leaderElection.getState(), LeaderElectionState.Leading);
                    Assert.assertEquals((LeaderElectionState) linkedBlockingQueue2.poll(5L, TimeUnit.SECONDS), LeaderElectionState.Leading);
                    FieldUtils.writeDeclaredField(leaderElection, "leaderElectionState", LeaderElectionState.NoLeader, true);
                    this.zks.stop();
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(5L, TimeUnit.SECONDS), SessionEvent.ConnectionLost);
                    this.zks.start();
                    Assert.assertEquals((SessionEvent) linkedBlockingQueue.poll(10L, TimeUnit.SECONDS), SessionEvent.Reconnected);
                    Awaitility.await().atMost(Duration.ofSeconds(15L)).untilAsserted(() -> {
                        Assert.assertEquals(leaderElection.getState(), LeaderElectionState.Leading);
                    });
                    Assert.assertTrue(((Optional) create.get(newKey).join()).isPresent());
                    if (Collections.singletonList(leaderElection).get(0) != null) {
                        leaderElection.close();
                    }
                    if (Collections.singletonList(coordinationServiceImpl).get(0) != null) {
                        coordinationServiceImpl.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(leaderElection).get(0) != null) {
                        leaderElection.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(coordinationServiceImpl).get(0) != null) {
                    coordinationServiceImpl.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
