package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/helix/TestHelixReadOnlyStorageEngineRepository.class */
public class TestHelixReadOnlyStorageEngineRepository {
    private static final Logger LOGGER = LogManager.getLogger(TestHelixReadOnlyStorageEngineRepository.class);
    private String zkAddress;
    private ZkClient zkClient;
    private ZkServerWrapper zkServerWrapper;
    private HelixReadOnlyStoreRepository repo;
    private HelixReadWriteStoreRepository writeRepo;
    private String cluster = "test-metadata-cluster";
    private String clusterPath = "/test-metadata-cluster";
    private String storesPath = "/stores";
    private HelixAdapterSerializer adapter = new HelixAdapterSerializer();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/helix/TestHelixReadOnlyStorageEngineRepository$TestListener.class */
    public static class TestListener implements StoreDataChangedListener {
        AtomicInteger creationCount = new AtomicInteger(0);
        AtomicInteger changeCount = new AtomicInteger(0);
        AtomicInteger deletionCount = new AtomicInteger(0);

        TestListener() {
        }

        public void handleStoreCreated(Store store) {
            this.creationCount.incrementAndGet();
        }

        public void handleStoreDeleted(String str) {
            this.deletionCount.incrementAndGet();
        }

        public void handleStoreChanged(Store store) {
            TestHelixReadOnlyStorageEngineRepository.LOGGER.info("Received handleStoreChanged: {}", store);
            this.changeCount.incrementAndGet();
        }

        public int getCreationCount() {
            return this.creationCount.get();
        }

        public int getChangeCount() {
            return this.changeCount.get();
        }

        public int getDeletionCount() {
            return this.deletionCount.get();
        }
    }

    @BeforeMethod
    public void zkSetup() {
        this.zkServerWrapper = ServiceFactory.getZkServer();
        this.zkAddress = this.zkServerWrapper.getAddress();
        this.zkClient = ZkClientFactory.newZkClient(this.zkAddress);
        this.zkClient.setZkSerializer(this.adapter);
        this.zkClient.create(this.clusterPath, (Object) null, CreateMode.PERSISTENT);
        this.zkClient.create(this.clusterPath + this.storesPath, (Object) null, CreateMode.PERSISTENT);
        this.repo = new HelixReadOnlyStoreRepository(this.zkClient, this.adapter, this.cluster, 1, 1000L);
        this.writeRepo = new HelixReadWriteStoreRepository(this.zkClient, this.adapter, this.cluster, Optional.empty(), new ClusterLockManager(this.cluster));
        this.repo.refresh();
        this.writeRepo.refresh();
    }

    @AfterMethod
    public void zkCleanup() {
        this.repo.clear();
        this.writeRepo.clear();
        this.zkClient.deleteRecursively(this.clusterPath);
        this.zkClient.close();
        this.zkServerWrapper.close();
    }

    @Test
    public void testGetStore() throws InterruptedException {
        Store createTestStore = TestUtils.createTestStore("s1", "owner", System.currentTimeMillis());
        createTestStore.addVersion(new VersionImpl("s1", 1, "pushJobId"));
        createTestStore.setReadQuotaInCU(100L);
        this.writeRepo.addStore(createTestStore);
        Thread.sleep(1000L);
        Assert.assertEquals(this.repo.getStore(createTestStore.getName()).cloneStore(), createTestStore, "Can not get store from ZK notification successfully");
        Store store = this.writeRepo.getStore(createTestStore.getName());
        store.addVersion(new VersionImpl(store.getName(), store.getLargestUsedVersionNumber() + 1, "pushJobId2"));
        this.writeRepo.updateStore(store);
        Thread.sleep(1000L);
        Assert.assertEquals(this.repo.getStore(createTestStore.getName()).cloneStore(), store, "Can not get store from ZK notification successfully");
        this.writeRepo.deleteStore(createTestStore.getName());
        Thread.sleep(1000L);
        Assert.assertNull(this.repo.getStore(createTestStore.getName()), "Can not get store from ZK notification successfully");
    }

    @Test
    public void testLoadFromZK() throws InterruptedException {
        Store[] storeArr = new Store[10];
        for (int i = 0; i < 10; i++) {
            Store createTestStore = TestUtils.createTestStore("s" + i, "owner", System.currentTimeMillis());
            createTestStore.addVersion(new VersionImpl(createTestStore.getName(), createTestStore.getLargestUsedVersionNumber() + 1, "pushJobId"));
            createTestStore.setReadQuotaInCU(i + 1);
            this.writeRepo.addStore(createTestStore);
            storeArr[i] = createTestStore;
        }
        Thread.sleep(1000L);
        for (Store store : storeArr) {
            Assert.assertEquals(this.repo.getStore(store.getName()).cloneStore(), store, "Can not get store from ZK notification successfully");
        }
        this.repo.refresh();
        for (Store store2 : storeArr) {
            Assert.assertEquals(this.repo.getStore(store2.getName()).cloneStore(), store2, "Can not get store from ZK after refreshing successfully");
        }
    }

    @Test
    public void testNotifiers() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        TestListener testListener = new TestListener();
        this.repo.registerStoreDataChangedListener(testListener);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.get(), atomicInteger3.get(), "initialization");
        this.writeRepo.addStore(TestUtils.getRandomStore());
        this.writeRepo.addStore(TestUtils.getRandomStore());
        this.writeRepo.addStore(TestUtils.getRandomStore());
        assertListenerCounts(testListener, atomicInteger.addAndGet(3), atomicInteger2.get(), atomicInteger3.get(), "store creations");
        Store randomStore = TestUtils.getRandomStore();
        this.writeRepo.addStore(randomStore);
        try {
            this.writeRepo.addStore(randomStore);
        } catch (VeniceException e) {
        }
        assertListenerCounts(testListener, atomicInteger.addAndGet(1), atomicInteger2.get(), atomicInteger3.get(), "creation of a duplicate store");
        randomStore.setCurrentVersion(10);
        this.writeRepo.updateStore(randomStore);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.addAndGet(1), atomicInteger3.get(), "change of a store-version");
        randomStore.setOwner(Utils.getUniqueString("NewRandomOwner"));
        this.writeRepo.updateStore(randomStore);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.addAndGet(1), atomicInteger3.get(), "change of owner");
        randomStore.setPartitionCount(10);
        this.writeRepo.updateStore(randomStore);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.addAndGet(1), atomicInteger3.get(), "change of partition count");
        randomStore.setLargestUsedVersionNumber(100);
        this.writeRepo.updateStore(randomStore);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.addAndGet(1), atomicInteger3.get(), "change of largest used version number");
        randomStore.setEnableWrites(false);
        this.writeRepo.updateStore(randomStore);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.addAndGet(1), atomicInteger3.get(), "disabling writes");
        randomStore.setEnableReads(false);
        this.writeRepo.updateStore(randomStore);
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.addAndGet(1), atomicInteger3.get(), "disabling reads");
        this.writeRepo.deleteStore(randomStore.getName());
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.get(), atomicInteger3.addAndGet(1), "store deletion");
        this.repo.refresh();
        assertListenerCounts(testListener, atomicInteger.get(), atomicInteger2.get(), atomicInteger3.get(), "ReadOnly Repo refresh()");
    }

    private void assertListenerCounts(TestListener testListener, int i, int i2, int i3, String str) {
        TestUtils.waitForNonDeterministicAssertion(3L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(testListener.getCreationCount(), i, "Listener's creation count should be " + i + " following: " + str);
            Assert.assertEquals(testListener.getChangeCount(), i2, "Listener's change count should be " + i2 + " following: " + str);
            Assert.assertEquals(testListener.getDeletionCount(), i3, "Listener's deletion count should be " + i3 + " following: " + str);
            LOGGER.info("Successfully asserted that notifications work after {}", str);
        });
    }
}
