package com.linkedin.venice.restart;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.store.rocksdb.ReplicationMetadataRocksDBStoragePartition;
import com.linkedin.davinci.store.rocksdb.RocksDBStorageEngine;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.view.TestView;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.ComparatorOptions;
import org.rocksdb.util.BytewiseComparator;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.class */
public class TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion {
    private static final Logger LOGGER = LogManager.getLogger(TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.class);
    private static final int TEST_TIMEOUT = 120000;
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private VeniceClusterWrapper clusterWrapper;
    private VeniceServerWrapper serverWrapper;
    private ControllerClient parentControllerClient;
    private AvroSerializer serializer;
    private final int numKeys = 100;
    private int startKey = 0;
    private int newVersion = 0;
    private final String KEY_PREFIX = "key";
    private final String VALUE_PREFIX = "value";
    private final String VALUE_PREFIX_INC_PUSH = "value-inc";
    private final String METADATA_PREFIX = "metadata";
    private String storeName = Utils.getUniqueString("store");
    private final int numServers = 5;
    List<Integer> allIncPushKeys = new ArrayList();
    List<Integer> allNonIncPushKeysUntilLastVersion = new ArrayList();

    @BeforeClass
    public void setUp() throws Exception {
        this.serializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{"\"string\""}));
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        properties.put("rocksdb.plain.table.format.enabled", false);
        properties.put("persistence.type", PersistenceType.ROCKS_DB);
        properties.setProperty("server.database.sync.bytes.interval.for.deferred.write.mode", "300");
        properties.put("child.data.center.kafka.url.dc-parent-0", "localhost:" + Utils.getFreePort());
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 1, 1, 1, 5, 1, 1, Optional.empty(), Optional.empty(), Optional.of(new VeniceProperties(properties)), false);
        List<VeniceMultiClusterWrapper> childRegions = this.multiRegionMultiClusterWrapper.getChildRegions();
        List<VeniceControllerWrapper> parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
        this.clusterWrapper = childRegions.get(0).getClusters().get("venice-cluster0");
        this.parentControllerClient = new ControllerClient("venice-cluster0", (String) parentControllers.stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(",")));
        TestUtils.assertCommand(this.parentControllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.INCREMENTAL_PUSH.toString(), Optional.empty()));
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        IntegrationTestPushUtils.createStoreForJob("venice-cluster0", writeSimpleAvroFileWithUserSchema.getField("key").schema().toString(), writeSimpleAvroFileWithUserSchema.getField("value").schema().toString(), IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, "file:" + tempDataDirectory.getAbsolutePath(), this.storeName), new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true).setHybridRewindSeconds(500L).setHybridOffsetLagThreshold(2L).setNativeReplicationEnabled(true).setBackupVersionRetentionMs(1L).setIncrementalPushEnabled(true)).close();
    }

    @AfterClass
    public void cleanUp() {
        this.parentControllerClient.disableAndDeleteStore(this.storeName);
        this.multiRegionMultiClusterWrapper.close();
        TestView.resetCounters();
    }

    private Map<byte[], Pair<byte[], byte[]>> generateInputWithMetadata(int i, int i2, boolean z, boolean z2, AvroSerializer avroSerializer) {
        AbstractMap hashMap;
        if (z) {
            BytewiseComparator bytewiseComparator = new BytewiseComparator(new ComparatorOptions());
            hashMap = new TreeMap((bArr, bArr2) -> {
                return bytewiseComparator.compare(ByteBuffer.wrap(bArr), ByteBuffer.wrap(bArr2));
            });
        } else {
            hashMap = new HashMap();
        }
        for (int i3 = i; i3 < i2; i3++) {
            hashMap.put(avroSerializer.serialize("key" + i3), Pair.create(avroSerializer.serialize(z2 ? "value-inc" + i3 : "value" + i3), avroSerializer.serialize("metadata" + i3)));
        }
        return hashMap;
    }

    private byte[] getReplicationMetadataWithValueSchemaId(byte[] bArr, int i) {
        ByteBuffer prependIntHeaderToByteBuffer = ByteUtils.prependIntHeaderToByteBuffer(ByteBuffer.wrap(bArr), i, false);
        prependIntHeaderToByteBuffer.position(prependIntHeaderToByteBuffer.position() - 4);
        return ByteUtils.extractByteArray(prependIntHeaderToByteBuffer);
    }

    private void getPartitionForTopic(String str, List<ReplicationMetadataRocksDBStoragePartition> list) {
        TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                this.serverWrapper = this.clusterWrapper.getVeniceServers().get(i);
                if (this.serverWrapper.getVeniceServer().getStorageService().getStorageEngineRepository().getLocalStorageEngine(str) != null) {
                    LOGGER.info("selected server is: {}", Integer.valueOf(i));
                    break;
                }
                i++;
            }
            Assert.assertFalse(i == 5);
        });
        RocksDBStorageEngine localStorageEngine = this.serverWrapper.getVeniceServer().getStorageService().getStorageEngineRepository().getLocalStorageEngine(str);
        Assert.assertNotNull(localStorageEngine);
        Assert.assertEquals(localStorageEngine.getNumberOfPartitions(), 1L);
        list.clear();
        list.add((ReplicationMetadataRocksDBStoragePartition) localStorageEngine.getPartitionOrThrow(0));
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 120000, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testActiveActiveStoreWithRMDAndRestartServer(boolean z, boolean z2) throws Exception {
        VersionCreationResponse assertCommand = TestUtils.assertCommand(this.parentControllerClient.requestTopicForWrites(this.storeName, 1048576L, Version.PushType.BATCH, System.currentTimeMillis() + "_test_server_restart_push", true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
        int version = assertCommand.getVersion();
        Assert.assertEquals(this.newVersion + 1, version);
        this.newVersion = version;
        String kafkaTopic = assertCommand.getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(assertCommand.getKafkaBootstrapServers());
        this.startKey += 100;
        int i = this.startKey + 100;
        ArrayList arrayList = new ArrayList();
        VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).build());
        try {
            createVeniceWriter.broadcastStartOfPush(true, Collections.emptyMap());
            Map<byte[], Pair<byte[], byte[]>> generateInputWithMetadata = generateInputWithMetadata(this.startKey, i, true, false, this.serializer);
            int i2 = this.startKey;
            for (Map.Entry<byte[], Pair<byte[], byte[]>> entry : generateInputWithMetadata.entrySet()) {
                int i3 = i2;
                i2++;
                arrayList.add(Integer.valueOf(i3));
                createVeniceWriter.put(entry.getKey(), (byte[]) entry.getValue().getFirst(), 1, (PubSubProducerCallback) null, new PutMetadata(1, ByteBuffer.wrap(getReplicationMetadataWithValueSchemaId((byte[]) entry.getValue().getSecond(), 1)))).get();
            }
            ArrayList arrayList2 = new ArrayList();
            TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                getPartitionForTopic(kafkaTopic, arrayList2);
                Assert.assertNotNull(((ReplicationMetadataRocksDBStoragePartition) arrayList2.get(0)).getValueRocksDBSstFileWriter());
                Assert.assertNotNull(((ReplicationMetadataRocksDBStoragePartition) arrayList2.get(0)).getRocksDBSstFileWriter());
            });
            TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, () -> {
                AtomicInteger atomicInteger = new AtomicInteger();
                AtomicInteger atomicInteger2 = new AtomicInteger();
                arrayList2.stream().forEach(replicationMetadataRocksDBStoragePartition -> {
                    atomicInteger.addAndGet((int) replicationMetadataRocksDBStoragePartition.getValueRocksDBSstFileWriter().getRecordNumInAllSSTFiles());
                    atomicInteger2.addAndGet((int) replicationMetadataRocksDBStoragePartition.getRocksDBSstFileWriter().getRecordNumInAllSSTFiles());
                });
                Assert.assertEquals(atomicInteger.get(), 100);
                Assert.assertEquals(atomicInteger2.get(), 100);
            });
            LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files");
            arrayList2.stream().forEach(replicationMetadataRocksDBStoragePartition -> {
                if (z) {
                    replicationMetadataRocksDBStoragePartition.deleteFilesInDirectory(replicationMetadataRocksDBStoragePartition.getValueFullPathForTempSSTFileDir());
                }
                if (z2) {
                    replicationMetadataRocksDBStoragePartition.deleteFilesInDirectory(replicationMetadataRocksDBStoragePartition.getFullPathForTempSSTFileDir());
                }
            });
            this.clusterWrapper.stopVeniceServer(this.serverWrapper.getPort());
            this.clusterWrapper.restartVeniceServer(this.serverWrapper.getPort());
            createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
            if (createVeniceWriter != null) {
                createVeniceWriter.close();
            }
            TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(this.clusterWrapper.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.clusterWrapper.getClusterName(), kafkaTopic).getExecutionStatus(), ExecutionStatus.COMPLETED);
            });
            TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
                int currentVersion = ControllerClient.getStore(this.clusterWrapper.getLeaderVeniceController().getControllerUrl(), this.clusterWrapper.getClusterName(), this.storeName).getStore().getCurrentVersion();
                LOGGER.info("currentVersion {}, pushVersion {}", Integer.valueOf(currentVersion), Integer.valueOf(this.newVersion));
                return currentVersion == this.newVersion;
            });
            AvroGenericStoreClient avroGenericStoreClient = null;
            try {
                D2Client d2Client = D2TestUtils.getD2Client(this.clusterWrapper.getZk().getAddress(), false);
                D2ClientUtils.startClient(d2Client);
                avroGenericStoreClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setForceClusterDiscoveryAtStartTime(true).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setD2Client(d2Client).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setSslFactory(SslUtils.getVeniceLocalSslFactory()).setRetryOnAllErrors(true));
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                    Iterator<Integer> it = this.allNonIncPushKeysUntilLastVersion.iterator();
                    while (it.hasNext()) {
                        Assert.assertNull(avroGenericStoreClient.get("key" + it.next().intValue()).get());
                    }
                });
                for (int i4 = this.startKey; i4 < i; i4++) {
                    Assert.assertEquals(avroGenericStoreClient.get("key" + i4).get().toString(), "value" + i4);
                }
                if (avroGenericStoreClient != null) {
                    avroGenericStoreClient.close();
                }
                String str = System.currentTimeMillis() + "_test_inc_push";
                VersionCreationResponse requestTopicForWrites = this.parentControllerClient.requestTopicForWrites(this.storeName, 1048576L, Version.PushType.INCREMENTAL, str, true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L);
                Assert.assertFalse(requestTopicForWrites.isError());
                String kafkaTopic2 = requestTopicForWrites.getKafkaTopic();
                Assert.assertNotNull(kafkaTopic2);
                int i5 = this.startKey + 90;
                createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic2).build());
                try {
                    createVeniceWriter.broadcastStartOfIncrementalPush(str, new HashMap());
                    int i6 = i5;
                    for (Map.Entry<byte[], Pair<byte[], byte[]>> entry2 : generateInputWithMetadata(i5, i, false, true, this.serializer).entrySet()) {
                        int i7 = i6;
                        i6++;
                        this.allIncPushKeys.add(Integer.valueOf(i7));
                        arrayList.remove(arrayList.size() - 1);
                        createVeniceWriter.put(entry2.getKey(), (byte[]) entry2.getValue().getFirst(), 1, (PubSubProducerCallback) null, new PutMetadata(1, ByteBuffer.wrap(getReplicationMetadataWithValueSchemaId((byte[]) entry2.getValue().getSecond(), 1)))).get();
                    }
                    createVeniceWriter.broadcastEndOfIncrementalPush(str, Collections.emptyMap());
                    if (createVeniceWriter != null) {
                        createVeniceWriter.close();
                    }
                    AvroGenericStoreClient avroGenericStoreClient2 = null;
                    try {
                        D2Client d2Client2 = D2TestUtils.getD2Client(this.clusterWrapper.getZk().getAddress(), false);
                        D2ClientUtils.startClient(d2Client2);
                        avroGenericStoreClient2 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setForceClusterDiscoveryAtStartTime(true).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setD2Client(d2Client2).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setSslFactory(SslUtils.getVeniceLocalSslFactory()).setRetryOnAllErrors(true));
                        int i8 = this.startKey;
                        while (i8 < i5) {
                            Assert.assertEquals(avroGenericStoreClient2.get("key" + i8).get().toString(), "value" + i8);
                            i8++;
                        }
                        while (i8 < i) {
                            int i9 = i8;
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                                Assert.assertEquals(avroGenericStoreClient2.get("key" + i9).get().toString(), "value-inc" + i9);
                            });
                            i8++;
                        }
                        Iterator<Integer> it = this.allIncPushKeys.iterator();
                        while (it.hasNext()) {
                            int intValue = it.next().intValue();
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                                Assert.assertNotNull(avroGenericStoreClient2.get("key" + intValue).get());
                                Assert.assertEquals(avroGenericStoreClient2.get("key" + intValue).get().toString(), "value-inc" + intValue);
                            });
                        }
                        if (avroGenericStoreClient2 != null) {
                            avroGenericStoreClient2.close();
                        }
                        this.allNonIncPushKeysUntilLastVersion.addAll(arrayList);
                    } catch (Throwable th) {
                        if (avroGenericStoreClient2 != null) {
                            avroGenericStoreClient2.close();
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (avroGenericStoreClient != null) {
                    avroGenericStoreClient.close();
                }
                throw th2;
            }
        } finally {
        }
    }
}
