package com.linkedin.venice.helix;

import com.linkedin.venice.VeniceResource;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.utils.TestUtils;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/helix/CachedResourceZKStateListenerTest.class */
public class CachedResourceZKStateListenerTest {
    private ZkServerWrapper zkServer;
    private ZkClient zkClient;
    private static final int WAIT_TIME = 10000;

    /* loaded from: input_file:com/linkedin/venice/helix/CachedResourceZKStateListenerTest$MockVeniceResource.class */
    private static class MockVeniceResource implements VeniceResource {
        protected volatile boolean isRefreshed;
        protected int refreshCount;

        private MockVeniceResource() {
            this.refreshCount = 0;
        }

        public void refresh() {
            this.isRefreshed = true;
            this.refreshCount++;
        }

        public void clear() {
            this.isRefreshed = false;
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/CachedResourceZKStateListenerTest$MockVeniceResourceWillThrowExceptionWhileRefreshing.class */
    private static class MockVeniceResourceWillThrowExceptionWhileRefreshing extends MockVeniceResource {
        private MockVeniceResourceWillThrowExceptionWhileRefreshing() {
            super();
        }

        @Override // com.linkedin.venice.helix.CachedResourceZKStateListenerTest.MockVeniceResource
        public void refresh() {
            super.refresh();
            this.isRefreshed = false;
            throw new VeniceException();
        }
    }

    @BeforeMethod
    public void setUp() {
        this.zkServer = ServiceFactory.getZkServer();
        this.zkClient = ZkClientFactory.newZkClient(this.zkServer.getAddress());
    }

    @AfterMethod
    public void cleanUp() {
        this.zkClient.close();
        this.zkServer.close();
    }

    @Test
    public void testReconnectWithRefreshFailed() throws Exception {
        final MockVeniceResourceWillThrowExceptionWhileRefreshing mockVeniceResourceWillThrowExceptionWhileRefreshing = new MockVeniceResourceWillThrowExceptionWhileRefreshing();
        final int i = 3;
        CachedResourceZkStateListener cachedResourceZkStateListener = new CachedResourceZkStateListener(mockVeniceResourceWillThrowExceptionWhileRefreshing, 3, 100L);
        this.zkClient.subscribeStateChanges(cachedResourceZkStateListener);
        WatchedEvent watchedEvent = new WatchedEvent((Watcher.Event.EventType) null, Watcher.Event.KeeperState.Disconnected, (String) null);
        WatchedEvent watchedEvent2 = new WatchedEvent((Watcher.Event.EventType) null, Watcher.Event.KeeperState.SyncConnected, (String) null);
        this.zkClient.process(watchedEvent);
        this.zkClient.process(watchedEvent2);
        TestUtils.waitForNonDeterministicCompletion(10000L, TimeUnit.MILLISECONDS, new BooleanSupplier() { // from class: com.linkedin.venice.helix.CachedResourceZKStateListenerTest.1
            @Override // java.util.function.BooleanSupplier
            public boolean getAsBoolean() {
                return mockVeniceResourceWillThrowExceptionWhileRefreshing.refreshCount == i;
            }
        });
        Assert.assertFalse(cachedResourceZkStateListener.isDisconnected(), "Client should be reconnected");
        Assert.assertFalse(mockVeniceResourceWillThrowExceptionWhileRefreshing.isRefreshed, "Reconnected, resource should not be refreshed correctly.");
        Assert.assertEquals(mockVeniceResourceWillThrowExceptionWhileRefreshing.refreshCount, 3, "Should retry +3 to avoid non-deterministic issue that zk could return partial result after getting reconnected. ");
    }

    @Test
    public void testReconnectWithRefresh() {
        final MockVeniceResource mockVeniceResource = new MockVeniceResource();
        CachedResourceZkStateListener cachedResourceZkStateListener = new CachedResourceZkStateListener(mockVeniceResource, 3, 100L);
        this.zkClient.subscribeStateChanges(cachedResourceZkStateListener);
        WatchedEvent watchedEvent = new WatchedEvent((Watcher.Event.EventType) null, Watcher.Event.KeeperState.Disconnected, (String) null);
        WatchedEvent watchedEvent2 = new WatchedEvent((Watcher.Event.EventType) null, Watcher.Event.KeeperState.SyncConnected, (String) null);
        this.zkClient.process(watchedEvent);
        this.zkClient.process(watchedEvent2);
        TestUtils.waitForNonDeterministicCompletion(10000L, TimeUnit.MILLISECONDS, new BooleanSupplier() { // from class: com.linkedin.venice.helix.CachedResourceZKStateListenerTest.2
            @Override // java.util.function.BooleanSupplier
            public boolean getAsBoolean() {
                return mockVeniceResource.isRefreshed && mockVeniceResource.refreshCount == 1;
            }
        });
        Assert.assertFalse(cachedResourceZkStateListener.isDisconnected(), "Client should be reconnected");
        Assert.assertTrue(mockVeniceResource.isRefreshed, "Reconnected, resource should be refreshed.");
        Assert.assertEquals(mockVeniceResource.refreshCount, 1, "Should only refresh once. Because is refresh succeed, we should not keep retrying. ");
    }

    @Test
    public void testHandleStateChanged() throws Exception {
        CachedResourceZkStateListener cachedResourceZkStateListener = new CachedResourceZkStateListener(new MockVeniceResourceWillThrowExceptionWhileRefreshing(), 1, 100L);
        cachedResourceZkStateListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
        Assert.assertFalse(cachedResourceZkStateListener.isDisconnected(), "It's the first to connect, but not the reconnecting.");
        cachedResourceZkStateListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
        Assert.assertTrue(cachedResourceZkStateListener.isDisconnected(), "Connection is disconnected.");
        cachedResourceZkStateListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
        Assert.assertFalse(cachedResourceZkStateListener.isDisconnected(), "After reconnecting, client is connected again.");
    }
}
