package com.linkedin.venice.controller.server;

import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.MultiStoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/venice/controller/server/TestAdminSparkServerWithMultiServers.class */
public class TestAdminSparkServerWithMultiServers {
    private static final int TEST_TIMEOUT = 20000;
    private static final int STORAGE_NODE_COUNT = 3;
    private VeniceClusterWrapper cluster;
    private ControllerClient controllerClient;

    @BeforeClass
    public void setUp() {
        this.cluster = ServiceFactory.getVeniceCluster(1, 3, 0);
        this.controllerClient = ControllerClient.constructClusterControllerClient(this.cluster.getClusterName(), this.cluster.getAllControllersURLs());
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
    }

    @Test(timeOut = 20000)
    public void controllerClientShouldListStores() {
        ArrayList<String> arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(this.cluster.getNewStore(Utils.getUniqueString("venice-store")).getName());
        }
        MultiStoreResponse queryStoreList = this.controllerClient.queryStoreList();
        Assert.assertFalse(queryStoreList.isError());
        List asList = Arrays.asList(queryStoreList.getStores());
        for (String str : arrayList) {
            Assert.assertTrue(asList.contains(str), "Query store list should include " + str);
        }
    }

    public void testListStoreWithConfigFilter() {
        String uniqueString = Utils.getUniqueString("native-replication-store");
        Assert.assertFalse(this.controllerClient.createNewStore(uniqueString, "test", "\"string\"", "\"string\"").isError());
        Assert.assertFalse(this.controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setNativeReplicationEnabled(true)).isError());
        String uniqueString2 = Utils.getUniqueString("incremental-push-store");
        Assert.assertFalse(this.controllerClient.createNewStore(uniqueString2, "test", "\"string\"", "\"string\"").isError());
        Assert.assertFalse(this.controllerClient.updateStore(uniqueString2, new UpdateStoreQueryParams().setIncrementalPushEnabled(true).setHybridOffsetLagThreshold(10L).setHybridRewindSeconds(1L)).isError());
        MultiStoreResponse queryStoreList = this.controllerClient.queryStoreList(false, Optional.of("nativeReplicationEnabled"), Optional.of("true"));
        Assert.assertFalse(queryStoreList.isError());
        Assert.assertEquals(queryStoreList.getStores().length, 1);
        Assert.assertEquals(queryStoreList.getStores()[0], uniqueString);
        MultiStoreResponse queryStoreList2 = this.controllerClient.queryStoreList(false, Optional.of("incrementalPushEnabled"), Optional.of("true"));
        Assert.assertFalse(queryStoreList2.isError());
        Assert.assertEquals(queryStoreList2.getStores().length, 1);
        Assert.assertEquals(queryStoreList2.getStores()[0], uniqueString2);
        String uniqueString3 = Utils.getUniqueString("hybrid-non-aggregate");
        Assert.assertFalse(this.controllerClient.createNewStore(uniqueString3, "test", "\"string\"", "\"string\"").isError());
        Assert.assertFalse(this.controllerClient.updateStore(uniqueString3, new UpdateStoreQueryParams().setHybridRewindSeconds(100L).setHybridOffsetLagThreshold(10L).setHybridDataReplicationPolicy(DataReplicationPolicy.NON_AGGREGATE)).isError());
        String uniqueString4 = Utils.getUniqueString("hybrid-aggregate");
        Assert.assertFalse(this.controllerClient.createNewStore(uniqueString4, "test", "\"string\"", "\"string\"").isError());
        Assert.assertFalse(this.controllerClient.updateStore(uniqueString4, new UpdateStoreQueryParams().setHybridRewindSeconds(100L).setHybridOffsetLagThreshold(10L).setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE)).isError());
        MultiStoreResponse queryStoreList3 = this.controllerClient.queryStoreList(false, Optional.of("hybridConfig"), Optional.of("true"));
        Assert.assertFalse(queryStoreList3.isError());
        HashSet hashSet = new HashSet(Arrays.asList(queryStoreList3.getStores()));
        Assert.assertTrue(hashSet.contains(uniqueString4));
        Assert.assertTrue(hashSet.contains(uniqueString3));
        Assert.assertFalse(hashSet.contains(uniqueString));
        Assert.assertTrue(hashSet.contains(uniqueString2));
        MultiStoreResponse queryStoreList4 = this.controllerClient.queryStoreList(false, Optional.of("dataReplicationPolicy"), Optional.of("NON_AGGREGATE"));
        Assert.assertFalse(queryStoreList4.isError());
        HashSet hashSet2 = new HashSet(Arrays.asList(queryStoreList4.getStores()));
        Assert.assertFalse(hashSet2.contains(uniqueString4));
        Assert.assertTrue(hashSet2.contains(uniqueString3));
        Assert.assertFalse(hashSet2.contains(uniqueString));
        Assert.assertTrue(hashSet2.contains(uniqueString2));
        MultiStoreResponse queryStoreList5 = this.controllerClient.queryStoreList(false, Optional.of("dataReplicationPolicy"), Optional.of("AGGREGATE"));
        Assert.assertFalse(queryStoreList5.isError());
        HashSet hashSet3 = new HashSet(Arrays.asList(queryStoreList5.getStores()));
        Assert.assertTrue(hashSet3.contains(uniqueString4));
        Assert.assertFalse(hashSet3.contains(uniqueString3));
        Assert.assertFalse(hashSet3.contains(uniqueString));
        Assert.assertFalse(hashSet3.contains(uniqueString2));
    }

    @Test(timeOut = 20000)
    public void controllerClientShouldSendEmptyPushAndWait() {
        try {
            ControllerResponse sendEmptyPushAndWait = this.controllerClient.sendEmptyPushAndWait(this.cluster.getNewStore(Utils.getUniqueString("venice-store")).getName(), Utils.getUniqueString("emptyPushId"), 10000L, 20000L);
            Assert.assertFalse(sendEmptyPushAndWait.isError(), "Received error response on empty push:" + sendEmptyPushAndWait.getError());
        } catch (Exception e) {
            Assert.fail("Could not send empty push!!", e);
        }
    }

    @Test(timeOut = 20000)
    public void controllerClientShouldCreateStoreWithParameters() {
        String uniqueString = Utils.getUniqueString("venice-store");
        try {
            ControllerResponse createNewStoreWithParameters = this.controllerClient.createNewStoreWithParameters(uniqueString, "The_Doge", "\"string\"", "\"string\"", new UpdateStoreQueryParams().setHybridRewindSeconds(1000L).setHybridOffsetLagThreshold(1000L).setHybridStoreOverheadBypass(true).setEnableWrites(true).setOwner("Napolean"));
            Assert.assertFalse(createNewStoreWithParameters.isError(), "Received error response on store creation:" + createNewStoreWithParameters.getError());
            StoreInfo store = this.controllerClient.getStore(uniqueString).getStore();
            Assert.assertEquals(store.getOwner(), "Napolean");
            Assert.assertEquals(store.getHybridStoreConfig().getRewindTimeInSeconds(), 1000L);
        } catch (Exception e) {
            Assert.fail("Could not create new Store with Exception!!", e);
        }
    }

    @Test(timeOut = 20000)
    public void controllerClientShouldCreateStoreWithParametersAndNotDeleteItIfItExists() {
        String uniqueString = Utils.getUniqueString("venice-store");
        try {
            ControllerResponse createNewStoreWithParameters = this.controllerClient.createNewStoreWithParameters(uniqueString, "The_Doge", "\"string\"", "\"string\"", new UpdateStoreQueryParams().setHybridRewindSeconds(1000L).setHybridOffsetLagThreshold(1000L).setHybridStoreOverheadBypass(true).setEnableWrites(true).setOwner("Napolean"));
            Assert.assertFalse(createNewStoreWithParameters.isError(), "Received error response on store creation:" + createNewStoreWithParameters.getError());
            StoreInfo store = this.controllerClient.getStore(uniqueString).getStore();
            Assert.assertEquals(store.getOwner(), "Napolean");
            Assert.assertEquals(store.getHybridStoreConfig().getRewindTimeInSeconds(), 1000L);
            Assert.assertTrue(this.controllerClient.createNewStoreWithParameters(uniqueString, "The_Doge", "\"string\"", "\"string\"", new UpdateStoreQueryParams().setHybridRewindSeconds(1000L).setHybridOffsetLagThreshold(1000L).setHybridStoreOverheadBypass(true).setEnableWrites(true).setOwner("Napolean")).isError(), "No Error Received!!!!!");
            StoreInfo store2 = this.controllerClient.getStore(uniqueString).getStore();
            Assert.assertNotNull(store2, "Store unreadable!!  It may no longer exist!");
            Assert.assertEquals(store2.getOwner(), "Napolean");
            Assert.assertEquals(store2.getHybridStoreConfig().getRewindTimeInSeconds(), 1000L);
        } catch (Exception e) {
            Assert.fail("Could not create new Store with Exception!!", e);
        }
    }

    @Test(timeOut = 20000)
    public void requestTopicIsIdempotent() {
        HashMap hashMap = new HashMap(2);
        hashMap.put(Utils.getUniqueString("BatchStore"), Version.PushType.BATCH);
        hashMap.put(Utils.getUniqueString("StreamStore"), Version.PushType.STREAM);
        String uniqueString = Utils.getUniqueString("pushId");
        String uniqueString2 = Utils.getUniqueString("pushId");
        for (Map.Entry entry : hashMap.entrySet()) {
            String str = (String) entry.getKey();
            Version.PushType pushType = (Version.PushType) entry.getValue();
            this.cluster.getNewStore(str);
            if (pushType.equals(Version.PushType.STREAM)) {
                this.controllerClient.updateStore(str, new UpdateStoreQueryParams().setHybridRewindSeconds(1000L).setHybridOffsetLagThreshold(1000L));
                this.controllerClient.emptyPush(str, Utils.getUniqueString("emptyPushId"), 10000L);
            }
            VersionCreationResponse requestTopicForWrites = this.controllerClient.requestTopicForWrites(str, 1L, pushType, uniqueString, true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
            if (requestTopicForWrites.isError()) {
                Assert.fail(requestTopicForWrites.getError());
            }
            VersionCreationResponse requestTopicForWrites2 = this.controllerClient.requestTopicForWrites(str, 1L, pushType, uniqueString, true, false, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
            if (requestTopicForWrites2.isError()) {
                Assert.fail(requestTopicForWrites.getError());
            }
            Assert.assertEquals(requestTopicForWrites.getKafkaTopic(), requestTopicForWrites2.getKafkaTopic(), "Multiple requests for topics with the same pushId must return the same kafka topic");
            Assert.assertFalse(this.controllerClient.requestTopicForWrites(str, 1L, pushType, uniqueString2, true, false, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L).isError(), "Controller should not allow concurrent push");
        }
    }

    @Test(timeOut = 20000)
    public void requestTopicIsIdempotentWithConcurrency() {
        String uniqueString = Utils.getUniqueString("store");
        this.cluster.getNewStore(uniqueString);
        AtomicReference<String> atomicReference = new AtomicReference<>();
        for (int i = 0; i < 5; i++) {
            try {
                String uniqueString2 = Utils.getUniqueString("pushId");
                ArrayList arrayList = new ArrayList();
                CountDownLatch countDownLatch = new CountDownLatch(3);
                ArrayList arrayList2 = new ArrayList();
                for (int i2 = 0; i2 < 3; i2++) {
                    try {
                        Thread requestTopicThread = requestTopicThread(uniqueString2, uniqueString, arrayList, countDownLatch, atomicReference);
                        arrayList2.add(requestTopicThread);
                        requestTopicThread.setUncaughtExceptionHandler((thread, th) -> {
                            th.printStackTrace();
                        });
                    } catch (Throwable th2) {
                        Iterator it = arrayList2.iterator();
                        while (it.hasNext()) {
                            TestUtils.shutdownThread((Thread) it.next());
                        }
                        throw th2;
                    }
                }
                Iterator it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    ((Thread) it2.next()).start();
                }
                Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                for (int i3 = 0; i3 < 3; i3++) {
                    if (arrayList.get(i3).isError()) {
                        Assert.fail(arrayList.get(i3).getError());
                    }
                }
                for (int i4 = 1; i4 < 3; i4++) {
                    Assert.assertEquals(arrayList.get(0).getKafkaTopic(), arrayList.get(i4).getKafkaTopic(), "Idempotent topic requests failed under concurrency on attempt " + i + ".  If this test ever fails, investigate! Don't just run it again and hope it passes");
                }
                this.controllerClient.writeEndOfPush(uniqueString, arrayList.get(0).getVersion());
                while (this.controllerClient.getStore(uniqueString).getStore().getCurrentVersion() < arrayList.get(0).getVersion() && Utils.sleep(200L)) {
                }
                Iterator it3 = arrayList2.iterator();
                while (it3.hasNext()) {
                    TestUtils.shutdownThread((Thread) it3.next());
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Captured message: " + atomicReference.get());
                return;
            }
        }
    }

    private Thread requestTopicThread(String str, String str2, List<VersionCreationResponse> list, CountDownLatch countDownLatch, AtomicReference<String> atomicReference) {
        return new Thread(() -> {
            VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
            try {
                try {
                    VersionCreationResponse requestTopicForWrites = this.controllerClient.requestTopicForWrites(str2, 1L, Version.PushType.BATCH, str, true, false, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
                    versionCreationResponse.setKafkaTopic(requestTopicForWrites.getKafkaTopic());
                    versionCreationResponse.setVersion(requestTopicForWrites.getVersion());
                    list.add(versionCreationResponse);
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    atomicReference.set(th.getMessage());
                    versionCreationResponse.setError(th.getMessage());
                    list.add(versionCreationResponse);
                    countDownLatch.countDown();
                }
            } catch (Throwable th2) {
                list.add(versionCreationResponse);
                countDownLatch.countDown();
                throw th2;
            }
        });
    }

    @Test(timeOut = 20000)
    public void endOfPushEndpointTriggersVersionSwap() {
        String uniqueString = Utils.getUniqueString("store");
        String uniqueString2 = Utils.getUniqueString("pushId");
        this.cluster.getNewStore(uniqueString);
        int currentVersion = this.controllerClient.getStore(uniqueString).getStore().getCurrentVersion();
        int version = this.controllerClient.requestTopicForWrites(uniqueString, 1L, Version.PushType.BATCH, uniqueString2, true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L).getVersion();
        Assert.assertNotEquals(Integer.valueOf(version), Integer.valueOf(currentVersion), "Requesting a new version must not return the current version number");
        this.controllerClient.writeEndOfPush(uniqueString, version);
        TestUtils.waitForNonDeterministicAssertion(20000L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), version, "Writing end of push must flip the version to current");
        });
    }

    @Test(timeOut = 20000)
    public void controllerClientCanRemoveNodeFromCluster() {
        Admin veniceAdmin = this.cluster.getLeaderVeniceController().getVeniceAdmin();
        VeniceServerWrapper veniceServerWrapper = this.cluster.getVeniceServers().get(0);
        String helixNodeIdentifier = Utils.getHelixNodeIdentifier(Utils.getHostName(), veniceServerWrapper.getPort());
        Assert.assertTrue(this.controllerClient.removeNodeFromCluster(helixNodeIdentifier).isError(), "Node is still connected to cluster, could not be removed.");
        this.cluster.stopVeniceServer(veniceServerWrapper.getPort());
        Assert.assertFalse(this.controllerClient.removeNodeFromCluster(helixNodeIdentifier).isError(), "Node is already disconnected, could be removed.");
        Assert.assertFalse(veniceAdmin.getStorageNodes(this.cluster.getClusterName()).contains(helixNodeIdentifier), "Node should be removed from the cluster.");
    }
}
