package com.linkedin.venice.restart;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/restart/TestRestartServerDuringIngestion.class */
public abstract class TestRestartServerDuringIngestion {
    private VeniceClusterWrapper cluster;
    private VeniceServerWrapper serverWrapper;

    protected abstract PersistenceType getPersistenceType();

    protected abstract Properties getExtraProperties();

    private Properties getVeniceServerProperties() {
        Properties properties = new Properties();
        properties.put("persistence.type", getPersistenceType());
        properties.put("server.database.sync.bytes.interval.for.deferred.write.mode", 100);
        properties.put("server.database.sync.bytes.interval.for.transactional.mode", 100);
        properties.putAll(getExtraProperties());
        return properties;
    }

    @BeforeClass(alwaysRun = true)
    public void setUp() {
        this.cluster = ServiceFactory.getVeniceCluster(1, 0, 1, 1, 1000, false, false);
        this.serverWrapper = this.cluster.addVeniceServer(new Properties(), getVeniceServerProperties());
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        this.cluster.close();
    }

    @Test(timeOut = 90000)
    public void testIngestionRecovery() throws ExecutionException, InterruptedException {
        AvroSerializer avroSerializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{"\"string\""}));
        AvroGenericDeserializer avroGenericDeserializer = new AvroGenericDeserializer(Schema.parse("\"string\""), Schema.parse("\"string\""));
        String uniqueString = Utils.getUniqueString("test_store");
        String controllerUrl = this.cluster.getLeaderVeniceController().getControllerUrl();
        Properties properties = new Properties();
        properties.put("venice.discover.urls", controllerUrl);
        properties.put("venice.store.name", uniqueString);
        properties.put("multi.region", false);
        IntegrationTestPushUtils.createStoreForJob(this.cluster, "\"string\"", "\"string\"", properties).close();
        IntegrationTestPushUtils.makeStoreHybrid(this.cluster, uniqueString, 3600L, 10L);
        ControllerClient controllerClient = new ControllerClient(this.cluster.getClusterName(), controllerUrl);
        try {
            VersionCreationResponse assertCommand = TestUtils.assertCommand(controllerClient.requestTopicForWrites(uniqueString, 1048576L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
            controllerClient.close();
            String kafkaTopic = assertCommand.getKafkaTopic();
            VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(assertCommand.getKafkaBootstrapServers());
            VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).build());
            try {
                createVeniceWriter.broadcastStartOfPush(true, Collections.emptyMap());
                Map generateInput = TestUtils.generateInput(1000, true, 0, avroSerializer);
                HashSet hashSet = new HashSet();
                hashSet.add(134);
                hashSet.add(346);
                hashSet.add(678);
                hashSet.add(831);
                int i = 0;
                for (Map.Entry entry : generateInput.entrySet()) {
                    i++;
                    if (hashSet.contains(Integer.valueOf(i))) {
                        this.cluster.stopVeniceServer(this.serverWrapper.getPort());
                        TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                            Assert.assertFalse(this.cluster.getRandomVeniceRouter().getRoutingDataRepository().containsKafkaTopic(kafkaTopic));
                        });
                        this.cluster.restartVeniceServer(this.serverWrapper.getPort());
                    }
                    createVeniceWriter.put((byte[]) entry.getKey(), (byte[]) entry.getValue(), 1, (PubSubProducerCallback) null);
                }
                createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
                TestUtils.waitForNonDeterministicCompletion(20L, TimeUnit.SECONDS, () -> {
                    return this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), kafkaTopic).getExecutionStatus().equals(ExecutionStatus.COMPLETED);
                });
                restartAllRouters();
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.cluster.getRandomRouterURL()).setSslFactory(SslUtils.getVeniceLocalSslFactory()));
                try {
                    for (Map.Entry entry2 : generateInput.entrySet()) {
                        Assert.assertEquals((CharSequence) andStartGenericAvroClient.get(avroGenericDeserializer.deserialize((byte[]) entry2.getKey()).toString()).get(), (CharSequence) avroGenericDeserializer.deserialize((byte[]) entry2.getValue()));
                    }
                    Map generateInput2 = TestUtils.generateInput(1000, false, 5000, avroSerializer);
                    HashSet hashSet2 = new HashSet();
                    hashSet2.add(134);
                    hashSet2.add(346);
                    hashSet2.add(678);
                    hashSet2.add(831);
                    int i2 = 0;
                    createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(uniqueString)).build());
                    try {
                        for (Map.Entry entry3 : generateInput2.entrySet()) {
                            i2++;
                            if (hashSet2.contains(Integer.valueOf(i2))) {
                                this.cluster.stopVeniceServer(this.serverWrapper.getPort());
                                TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, true, true, () -> {
                                    Assert.assertFalse(this.cluster.getRandomVeniceRouter().getRoutingDataRepository().containsKafkaTopic(kafkaTopic));
                                });
                                this.cluster.restartVeniceServer(this.serverWrapper.getPort());
                            }
                            createVeniceWriter.put((byte[]) entry3.getKey(), (byte[]) entry3.getValue(), 1, (PubSubProducerCallback) null);
                        }
                        if (createVeniceWriter != null) {
                            createVeniceWriter.close();
                        }
                        TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, () -> {
                            HelixBaseRoutingRepository routingDataRepository = this.cluster.getRandomVeniceRouter().getRoutingDataRepository();
                            Assert.assertTrue(routingDataRepository.containsKafkaTopic(kafkaTopic));
                            int size = routingDataRepository.getPartitionAssignments(kafkaTopic).getAllPartitions().size();
                            for (int i3 = 0; i3 < size; i3++) {
                                Assert.assertTrue(routingDataRepository.getReadyToServeInstances(kafkaTopic, i3).size() > 0);
                            }
                        });
                        restartAllRouters();
                        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                            for (Map.Entry entry4 : generateInput2.entrySet()) {
                                String obj = avroGenericDeserializer.deserialize((byte[]) entry4.getKey()).toString();
                                CharSequence charSequence = (CharSequence) avroGenericDeserializer.deserialize((byte[]) entry4.getValue());
                                CharSequence charSequence2 = (CharSequence) andStartGenericAvroClient.get(obj).get();
                                Assert.assertNotNull(charSequence2);
                                Assert.assertEquals(charSequence2, charSequence);
                            }
                        });
                        if (andStartGenericAvroClient != null) {
                            andStartGenericAvroClient.close();
                        }
                        if (createVeniceWriter != null) {
                            createVeniceWriter.close();
                        }
                    } finally {
                        if (createVeniceWriter != null) {
                            try {
                                createVeniceWriter.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } catch (Throwable th2) {
                throw th2;
            }
        } catch (Throwable th3) {
            try {
                controllerClient.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test(timeOut = 120000)
    public void testIngestionDrainer() {
        AvroSerializer avroSerializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{"\"string\""}));
        String uniqueString = Utils.getUniqueString("test_store");
        String controllerUrl = this.cluster.getLeaderVeniceController().getControllerUrl();
        Properties properties = new Properties();
        properties.put("venice.discover.urls", controllerUrl);
        properties.put("venice.store.name", uniqueString);
        properties.put("multi.region", false);
        IntegrationTestPushUtils.createStoreForJob(this.cluster, "\"string\"", "\"string\"", properties).close();
        IntegrationTestPushUtils.makeStoreHybrid(this.cluster, uniqueString, 3600L, 10L);
        ControllerClient controllerClient = new ControllerClient(this.cluster.getClusterName(), controllerUrl);
        try {
            VersionCreationResponse assertCommand = TestUtils.assertCommand(controllerClient.requestTopicForWrites(uniqueString, 1048576L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), false, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
            controllerClient.close();
            String kafkaTopic = assertCommand.getKafkaTopic();
            VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(assertCommand.getKafkaBootstrapServers()).createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).build());
            try {
                createVeniceWriter.broadcastStartOfPush(false, Collections.emptyMap());
                TestUtils.generateInput(1000, false, 0, avroSerializer).forEach((bArr, bArr2) -> {
                    createVeniceWriter.put(bArr, bArr2, 1, (PubSubProducerCallback) null);
                });
                createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
                TestUtils.waitForNonDeterministicCompletion(20L, TimeUnit.SECONDS, () -> {
                    return this.cluster.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.cluster.getClusterName(), kafkaTopic).getExecutionStatus().equals(ExecutionStatus.COMPLETED);
                });
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } catch (Throwable th) {
                if (createVeniceWriter != null) {
                    try {
                        createVeniceWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            try {
                controllerClient.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    private void restartAllRouters() {
        for (VeniceRouterWrapper veniceRouterWrapper : this.cluster.getVeniceRouters()) {
            this.cluster.stopVeniceRouter(veniceRouterWrapper.getPort());
            this.cluster.restartVeniceRouter(veniceRouterWrapper.getPort());
        }
    }
}
