package com.linkedin.venice.controller;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.IOUtils;
import org.apache.samza.system.SystemProducer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/controller/TestTopicRequestOnHybridDelete.class */
public class TestTopicRequestOnHybridDelete {
    private VeniceClusterWrapper venice;
    private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeClass
    public void setUp() {
        this.venice = ServiceFactory.getVeniceCluster();
    }

    @AfterClass
    public void cleanUp() {
        IOUtils.closeQuietly(this.venice);
    }

    @Test(timeOut = 60000)
    public void serverRestartOnHybridStoreKeepsVersionOnline() {
        AvroGenericStoreClient avroGenericStoreClient = null;
        ControllerClient controllerClient = null;
        SystemProducer systemProducer = null;
        try {
            controllerClient = new ControllerClient(this.venice.getClusterName(), this.venice.getRandomRouterURL());
            String uniqueString = Utils.getUniqueString("hybrid-store");
            this.venice.getNewStore(uniqueString);
            IntegrationTestPushUtils.makeStoreHybrid(this.venice, uniqueString, 100L, 5L);
            controllerClient.emptyPush(uniqueString, Utils.getUniqueString("push-id"), 1L);
            systemProducer = IntegrationTestPushUtils.getSamzaProducer(this.venice, uniqueString, Version.PushType.STREAM, new Pair[0]);
            for (int i = 1; i <= 10; i++) {
                IntegrationTestPushUtils.sendStreamingRecord(systemProducer, uniqueString, i);
            }
            systemProducer.stop();
            avroGenericStoreClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.venice.getRandomRouterURL()));
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                try {
                    Assert.assertEquals(avroGenericStoreClient.get("9").get(), new Utf8("stream_9"));
                } catch (Exception e) {
                    Assert.fail("Got an exception while querying Venice!", e);
                }
            });
            controllerClient.emptyPush(uniqueString, Utils.getUniqueString("push-id"), 1L);
            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 2);
            });
            try {
                this.venice.getNewStore(uniqueString);
                Assert.fail("Must not be able to create a store that already exists");
            } catch (AssertionError e) {
                Assert.assertTrue(e.getMessage().contains("already exists"));
            }
            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setEnableReads(false).setEnableWrites(false));
            Assert.assertFalse(controllerClient.deleteStore(uniqueString).isError());
            try {
                this.venice.getLeaderVeniceController().getVeniceAdmin().getTopicManager().ensureTopicIsDeletedAndBlock(this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(uniqueString)));
            } catch (ExecutionException e2) {
                Assert.fail("Exception during topic deletion " + e2);
            }
            Assert.assertTrue(controllerClient.getStore(uniqueString).isError());
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                Assert.assertFalse(controllerClient.checkResourceCleanupForStoreCreation(uniqueString).isError());
            });
            this.venice.getNewStore(uniqueString);
            Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getVersions().size(), 0);
            IntegrationTestPushUtils.makeStoreHybrid(this.venice, uniqueString, 100L, 5L);
            controllerClient.emptyPush(uniqueString, Utils.getUniqueString("push-id3"), 1L);
            int i2 = 3;
            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), i2);
            });
            systemProducer = IntegrationTestPushUtils.getSamzaProducer(this.venice, uniqueString, Version.PushType.STREAM, new Pair[0]);
            for (int i3 = 11; i3 <= 20; i3++) {
                IntegrationTestPushUtils.sendStreamingRecord(systemProducer, uniqueString, i3);
            }
            this.venice.refreshAllRouterMetaData();
            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                try {
                    Assert.assertEquals(avroGenericStoreClient.get("19").get(), new Utf8("stream_19"));
                } catch (Exception e3) {
                    throw new VeniceException(e3);
                }
            });
            boolean z = false;
            for (Version version : controllerClient.getStore(uniqueString).getStore().getVersions()) {
                if (version.getNumber() == 3) {
                    Assert.assertEquals(version.getStatus(), VersionStatus.ONLINE);
                    z = true;
                }
            }
            Assert.assertTrue(z, "Store's versions must contain the current version 3");
            IOUtils.closeQuietly(avroGenericStoreClient);
            IOUtils.closeQuietly(controllerClient);
            if (systemProducer != null) {
                systemProducer.stop();
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(avroGenericStoreClient);
            IOUtils.closeQuietly(controllerClient);
            if (systemProducer != null) {
                systemProducer.stop();
            }
            throw th;
        }
    }

    @Test(timeOut = 60000)
    public void deleteStoreAfterStartedPushAllowsNewPush() {
        ControllerClient controllerClient = new ControllerClient(this.venice.getClusterName(), this.venice.getRandomRouterURL());
        TopicManagerRepository topicManagerRepo = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, this.venice.getKafka().getAddress(), this.pubSubTopicRepository);
        try {
            TopicManager topicManager = topicManagerRepo.getTopicManager();
            String uniqueString = Utils.getUniqueString("hybrid-store");
            this.venice.getNewStore(uniqueString);
            IntegrationTestPushUtils.makeStoreHybrid(this.venice, uniqueString, 100L, 5L);
            VersionCreationResponse requestTopicForWrites = controllerClient.requestTopicForWrites(uniqueString, 1L, Version.PushType.BATCH, Utils.getUniqueString("pushId"), true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
            Assert.assertFalse(requestTopicForWrites.isError(), "The call to controllerClient.requestTopicForWrites() returned an error: " + requestTopicForWrites.getError());
            Assert.assertEquals(controllerClient.queryJobStatus(requestTopicForWrites.getKafkaTopic()).getStatus(), ExecutionStatus.STARTED.toString());
            Assert.assertTrue(topicManager.containsTopicAndAllPartitionsAreOnline(this.pubSubTopicRepository.getTopic(requestTopicForWrites.getKafkaTopic())));
            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setEnableReads(false).setEnableWrites(false));
            controllerClient.deleteAllVersions(uniqueString);
            controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setEnableReads(true).setEnableWrites(true));
            controllerClient.emptyPush(uniqueString, Utils.getUniqueString("push-id3"), 1L);
            int i = 2;
            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), i);
            });
            if (topicManagerRepo != null) {
                topicManagerRepo.close();
            }
        } catch (Throwable th) {
            if (topicManagerRepo != null) {
                try {
                    topicManagerRepo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
