package com.linkedin.venice.endToEnd;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.samza.system.SystemProducer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestEmptyPush.class */
public class TestEmptyPush {
    private VeniceClusterWrapper venice;

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        this.venice = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().numberOfControllers(1).numberOfRouters(1).numberOfServers(2).sslToKafka(false).sslToStorageNodes(false).build());
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.venice});
    }

    private ByteBuffer getDictFromTopic(String str, Properties properties) {
        String address = this.venice.getKafka().getAddress();
        Properties properties2 = (Properties) properties.clone();
        properties2.setProperty("kafka.bootstrap.servers", address);
        return DictionaryUtils.readDictionaryFromKafka(str, new VeniceProperties(properties2));
    }

    @Test(timeOut = 120000)
    public void testEmptyPushByChangingCompressionStrategy() throws IOException {
        String uniqueString = Utils.getUniqueString("test_empty_push_store");
        ControllerClient controllerClient = new ControllerClient(this.venice.getClusterName(), this.venice.getAllControllersURLs());
        try {
            TopicManager topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, this.venice.getKafka().getAddress(), this.venice.getPubSubTopicRepository()).getTopicManager();
            try {
                controllerClient.createNewStore(uniqueString, "owner", "\"string\"", "\"string\"");
                controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setHybridRewindSeconds(60L).setHybridOffsetLagThreshold(10L).setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT));
                File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
                String str = "file://" + tempDataDirectory.getAbsolutePath();
                TestWriteUtils.writeEmptyAvroFileWithUserSchema(tempDataDirectory);
                Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.venice, str, uniqueString);
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 1, controllerClient);
                ByteBuffer dictFromTopic = getDictFromTopic(Version.composeKafkaTopic(uniqueString, 1), defaultVPJProps);
                Assert.assertNotNull(dictFromTopic, "Dict shouldn't be null for the empty push to a hybrid store without any records in RT");
                Assert.assertTrue(topicManager.containsTopicAndAllPartitionsAreOnline(this.venice.getPubSubTopicRepository().getTopic(Version.composeRealTimeTopic(uniqueString))));
                SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(this.venice, uniqueString, Version.PushType.STREAM, new Pair[0]);
                String str2 = "test_key_";
                String str3 = "test_value_";
                for (int i = 1; i <= 1000; i++) {
                    IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, "test_key_" + i, "test_value_" + i);
                }
                Runnable runnable = () -> {
                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.venice.getRandomRouterURL()));
                    try {
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                            for (int i2 = 1; i2 <= 1000; i2++) {
                                try {
                                    String str4 = str2 + i2;
                                    Object obj = andStartGenericAvroClient.get(str4).get();
                                    Assert.assertNotNull(obj, "value for key: " + str4 + " shouldn't be null");
                                    Assert.assertEquals(obj.toString(), str3 + i2);
                                } catch (Exception e) {
                                    throw new VeniceException(e);
                                }
                            }
                        });
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                    } catch (Throwable th) {
                        if (andStartGenericAvroClient != null) {
                            try {
                                andStartGenericAvroClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                };
                runnable.run();
                controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.GZIP));
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 2, controllerClient);
                Assert.assertNull(getDictFromTopic(Version.composeKafkaTopic(uniqueString, 2), defaultVPJProps), "Dict should be null since `gzip` doesn't require any dict");
                runnable.run();
                controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT));
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 3, controllerClient);
                ByteBuffer dictFromTopic2 = getDictFromTopic(Version.composeKafkaTopic(uniqueString, 3), defaultVPJProps);
                Assert.assertNotNull(dictFromTopic2, "Dict shouldn't be null for empty push to a hybrid store");
                Assert.assertNotEquals(dictFromTopic2, dictFromTopic, "Dict built with a current version will be different from the dummy dict built in the very first empty push");
                runnable.run();
                IntegrationTestPushUtils.runVPJ(defaultVPJProps, 4, controllerClient);
                ByteBuffer dictFromTopic3 = getDictFromTopic(Version.composeKafkaTopic(uniqueString, 4), defaultVPJProps);
                Assert.assertNotNull(dictFromTopic3, "Dict shouldn't be null for empty push to a hybrid store");
                Assert.assertEquals(dictFromTopic3, dictFromTopic2, "Venice Push Job should build a same dict for the same input");
                runnable.run();
                if (topicManager != null) {
                    topicManager.close();
                }
                controllerClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                controllerClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
