package com.linkedin.venice.endToEnd;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.consumer.ChangeEvent;
import com.linkedin.davinci.consumer.ChangelogClientConfig;
import com.linkedin.davinci.consumer.VeniceChangelogConsumer;
import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiStoreTopicsResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.integration.utils.PubSubBrokerConfigs;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.MockCircularTime;
import com.linkedin.venice.utils.TestMockTime;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.view.TestView;
import com.linkedin.venice.views.ChangeCaptureView;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/endToEnd/TestActiveActiveIngestion.class */
public class TestActiveActiveIngestion {
    private static final int TEST_TIMEOUT = 360000;
    private static final String[] CLUSTER_NAMES = (String[]) IntStream.range(0, 1).mapToObj(i -> {
        return "venice-cluster" + i;
    }).toArray(i2 -> {
        return new String[i2];
    });
    private List<VeniceMultiClusterWrapper> childDatacenters;
    private List<VeniceControllerWrapper> parentControllers;
    private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
    private String clusterName;
    private VeniceClusterWrapper clusterWrapper;
    private VeniceServerWrapper serverWrapper;
    private AvroSerializer serializer;

    @BeforeClass
    public void setUp() {
        this.serializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{"\"string\""}));
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        properties.put("rocksdb.plain.table.format.enabled", false);
        properties.put("child.data.center.kafka.url.dc-parent-0", "localhost:" + Utils.getFreePort());
        this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(1, 1, 1, 1, 1, 1, 1, Optional.empty(), Optional.empty(), Optional.of(new VeniceProperties(properties)), false);
        this.childDatacenters = this.multiRegionMultiClusterWrapper.getChildRegions();
        this.parentControllers = this.multiRegionMultiClusterWrapper.getParentControllers();
        this.clusterName = CLUSTER_NAMES[0];
        this.clusterWrapper = this.childDatacenters.get(0).getClusters().get(this.clusterName);
    }

    @AfterClass
    public void cleanUp() {
        this.multiRegionMultiClusterWrapper.close();
        TestView.resetCounters();
    }

    private void pollChangeEventsFromChangeCaptureConsumer(Map<String, ChangeEvent<Utf8>> map, VeniceChangelogConsumer veniceChangelogConsumer) {
        for (PubSubMessage pubSubMessage : veniceChangelogConsumer.poll(100L)) {
            map.put(((Utf8) pubSubMessage.getKey()).toString(), (ChangeEvent) pubSubMessage.getValue());
        }
    }

    private int pollAfterImageEventsFromChangeCaptureConsumer(Map<String, Utf8> map, VeniceChangelogConsumer veniceChangelogConsumer) {
        int i = 0;
        for (PubSubMessage pubSubMessage : veniceChangelogConsumer.poll(100L)) {
            map.put(((Utf8) pubSubMessage.getKey()).toString(), (Utf8) ((ChangeEvent) pubSubMessage.getValue()).getCurrentValue());
            i++;
        }
        return i;
    }

    @Test(timeOut = 360000, dataProviderClass = DataProviderUtils.class)
    public void testLeaderLagWithIgnoredData() throws Exception {
        String str = (String) this.parentControllers.stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(","));
        ControllerClient controllerClient = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
        ControllerClient controllerClient2 = new ControllerClient(this.clusterName, str);
        TestUtils.assertCommand(controllerClient2.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty()));
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str2 = "file:" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str2, uniqueString);
        IntegrationTestPushUtils.createStoreForJob(this.clusterName, writeSimpleAvroFileWithUserSchema.getField("key").schema().toString(), writeSimpleAvroFileWithUserSchema.getField("value").schema().toString(), defaultVPJProps, new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true).setHybridRewindSeconds(360L).setHybridOffsetLagThreshold(8L).setChunkingEnabled(true).setNativeReplicationEnabled(true).setPartitionCount(1)).close();
        TestWriteUtils.runPushJob("Run push job", defaultVPJProps);
        Map<String, String> samzaConfig = getSamzaConfig(uniqueString);
        VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        Instant now = Instant.now();
        linkedList.add(Long.valueOf(now.toEpochMilli()));
        linkedList2.add(Long.valueOf(now.toEpochMilli() - 10000));
        Instant minus = now.minus(1L, (TemporalUnit) ChronoUnit.HOURS);
        linkedList.add(Long.valueOf(minus.toEpochMilli()));
        linkedList2.add(Long.valueOf(minus.toEpochMilli() - 10000));
        MockCircularTime mockCircularTime = new MockCircularTime(linkedList);
        MockCircularTime mockCircularTime2 = new MockCircularTime(linkedList2);
        VeniceSystemProducer closableProducer = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
        try {
            closableProducer.start();
            runSamzaStreamJob(closableProducer, uniqueString, mockCircularTime, 10, 0, 20);
            runSamzaStreamJob(closableProducer, uniqueString, mockCircularTime2, 0, 10, 20);
            if (closableProducer != null) {
                closableProducer.close();
            }
            controllerClient2.sendEmptyPushAndWait(uniqueString, "Run empty push job", 1000L, 30000L);
            TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 2);
            });
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()));
            int i = 0;
            for (int i2 = 20; i2 < 30; i2++) {
                try {
                    if (andStartGenericAvroClient.get(Integer.toString(i2)).get() != null) {
                        i++;
                    }
                } catch (Throwable th) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            Assert.assertEquals(i, 10);
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
        } catch (Throwable th3) {
            if (closableProducer != null) {
                try {
                    closableProducer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 360000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testKIFRepushActiveActiveStore(boolean z) throws Exception {
        ControllerClient controllerClient = new ControllerClient(this.clusterName, (String) this.parentControllers.stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(",")));
        TestUtils.assertCommand(controllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty()));
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file:" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str, uniqueString);
        String schema = writeSimpleAvroFileWithUserSchema.getField("key").schema().toString();
        String schema2 = writeSimpleAvroFileWithUserSchema.getField("value").schema().toString();
        UpdateStoreQueryParams partitionCount = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true).setHybridRewindSeconds(360L).setHybridOffsetLagThreshold(8L).setChunkingEnabled(z).setNativeReplicationEnabled(true).setPartitionCount(1);
        MetricsRepository metricsRepository = new MetricsRepository();
        IntegrationTestPushUtils.createStoreForJob(this.clusterName, schema, schema2, defaultVPJProps, partitionCount).close();
        TestWriteUtils.runPushJob("Run push job", defaultVPJProps);
        Map<String, String> samzaConfig = getSamzaConfig(uniqueString);
        VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
        int i = 1000;
        VeniceSystemProducer closableProducer = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
        try {
            closableProducer.start();
            runSamzaStreamJob(closableProducer, uniqueString, null, 10, 10, 0);
            produceRecordWithLogicalTimestamp(closableProducer, uniqueString, 1000, 1000L, true);
            if (closableProducer != null) {
                closableProducer.close();
            }
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
            try {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i)).get());
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                defaultVPJProps.setProperty("source.kafka", "true");
                defaultVPJProps.setProperty("kafka.input.broker.url", this.clusterWrapper.getKafka().getAddress());
                defaultVPJProps.setProperty("kafka.input.max.records.per.mapper", "5");
                defaultVPJProps.put("rewind.time.in.seconds.override", 0);
                TestWriteUtils.runPushJob("Run repush job", defaultVPJProps);
                ControllerClient controllerClient2 = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                    Assert.assertEquals(controllerClient2.getStore(uniqueString).getStore().getCurrentVersion(), 2);
                });
                this.clusterWrapper.refreshAllRouterMetaData();
                AvroGenericStoreClient andStartGenericAvroClient2 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
                try {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        for (int i2 = 0; i2 < 10; i2++) {
                            Utf8 utf8 = (Utf8) andStartGenericAvroClient2.get(Integer.toString(i2)).get();
                            Assert.assertNotNull(utf8);
                            Assert.assertEquals(utf8.toString(), "stream_" + i2);
                        }
                        for (int i3 = 10; i3 < 20; i3++) {
                            Assert.assertNull((Utf8) andStartGenericAvroClient2.get(Integer.toString(i3)).get());
                        }
                        for (int i4 = 20; i4 < 100; i4++) {
                            Utf8 utf82 = (Utf8) andStartGenericAvroClient2.get(Integer.toString(i4)).get();
                            Assert.assertNotNull(utf82);
                            Assert.assertEquals(utf82.toString(), "test_name_" + i4);
                        }
                    });
                    if (andStartGenericAvroClient2 != null) {
                        andStartGenericAvroClient2.close();
                    }
                    VeniceSystemProducer closableProducer2 = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
                    try {
                        closableProducer2.start();
                        produceRecordWithLogicalTimestamp(closableProducer2, uniqueString, 1000, 2L, false);
                        produceRecordWithLogicalTimestamp(closableProducer2, uniqueString, 1000 + 1, 1L, false);
                        if (closableProducer2 != null) {
                            closableProducer2.close();
                        }
                        AvroGenericStoreClient andStartGenericAvroClient3 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
                        try {
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                                Assert.assertNotNull(andStartGenericAvroClient3.get(Integer.toString(i + 1)).get());
                            });
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                                Assert.assertNull(andStartGenericAvroClient3.get(Integer.toString(i)).get());
                            });
                            if (andStartGenericAvroClient3 != null) {
                                andStartGenericAvroClient3.close();
                            }
                            controllerClient.sendEmptyPushAndWait(uniqueString, "Run empty push job", 1000L, 30000L);
                            defaultVPJProps.setProperty("repush.ttl.enable", "true");
                            LinkedList linkedList = new LinkedList();
                            Instant now = Instant.now();
                            linkedList.add(Long.valueOf(now.toEpochMilli()));
                            linkedList.add(Long.valueOf(now.minus(1L, (TemporalUnit) ChronoUnit.HOURS).toEpochMilli()));
                            MockCircularTime mockCircularTime = new MockCircularTime(linkedList);
                            closableProducer2 = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
                            try {
                                closableProducer2.start();
                                runSamzaStreamJob(closableProducer2, uniqueString, mockCircularTime, 10, 10, 20);
                                if (closableProducer2 != null) {
                                    closableProducer2.close();
                                }
                                TestWriteUtils.runPushJob("Run repush job with TTL", defaultVPJProps);
                                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                                    Assert.assertEquals(controllerClient2.getStore(uniqueString).getStore().getCurrentVersion(), 4);
                                });
                                this.clusterWrapper.refreshAllRouterMetaData();
                                andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
                                int i2 = 0;
                                int i3 = 0;
                                for (int i4 = 20; i4 < 30; i4++) {
                                    try {
                                        if (andStartGenericAvroClient.get(Integer.toString(i4)).get() == null) {
                                            i3++;
                                        } else {
                                            i2++;
                                        }
                                    } finally {
                                    }
                                }
                                Assert.assertEquals(i2, 5);
                                Assert.assertEquals(i3, 5);
                                for (int i5 = 30; i5 < 40; i5++) {
                                    Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i5)).get());
                                }
                                for (int i6 = 40; i6 < 100; i6++) {
                                    Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i6)).get());
                                }
                                if (andStartGenericAvroClient != null) {
                                    andStartGenericAvroClient.close();
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                    if (andStartGenericAvroClient2 != null) {
                        try {
                            andStartGenericAvroClient2.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } finally {
                if (andStartGenericAvroClient != null) {
                    try {
                        andStartGenericAvroClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            }
        } finally {
            if (closableProducer != null) {
                try {
                    closableProducer.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        }
    }

    @Test(timeOut = 360000)
    public void testActiveActiveStoreRestart() throws Exception {
        ControllerClient controllerClient = new ControllerClient(this.clusterName, (String) this.parentControllers.stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(",")));
        TestUtils.assertCommand(controllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty()));
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file:" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        IntegrationTestPushUtils.createStoreForJob(this.clusterName, writeSimpleAvroFileWithUserSchema.getField("key").schema().toString(), writeSimpleAvroFileWithUserSchema.getField("value").schema().toString(), IntegrationTestPushUtils.defaultVPJProps(this.multiRegionMultiClusterWrapper, str, uniqueString), new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true).setHybridRewindSeconds(5L).setHybridOffsetLagThreshold(2L).setNativeReplicationEnabled(true)).close();
        VersionCreationResponse assertCommand = TestUtils.assertCommand(controllerClient.requestTopicForWrites(uniqueString, 1048576L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
        String kafkaTopic = assertCommand.getKafkaTopic();
        VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(assertCommand.getKafkaBootstrapServers()).createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).build());
        try {
            createVeniceWriter.broadcastStartOfPush(true, Collections.emptyMap());
            Map generateInput = TestUtils.generateInput(1000, true, 0, this.serializer);
            HashSet hashSet = new HashSet();
            hashSet.add(346);
            hashSet.add(543);
            this.serverWrapper = this.clusterWrapper.getVeniceServers().get(0);
            int i = 0;
            for (Map.Entry entry : generateInput.entrySet()) {
                i++;
                if (hashSet.contains(Integer.valueOf(i))) {
                    this.clusterWrapper.stopVeniceServer(this.serverWrapper.getPort());
                    this.clusterWrapper.restartVeniceServer(this.serverWrapper.getPort());
                }
                createVeniceWriter.put((byte[]) entry.getKey(), (byte[]) entry.getValue(), 1, (PubSubProducerCallback) null);
            }
            createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
            if (createVeniceWriter != null) {
                createVeniceWriter.close();
            }
            TestUtils.waitForNonDeterministicCompletion(20L, TimeUnit.SECONDS, () -> {
                try {
                    return this.clusterWrapper.getLeaderVeniceController().getVeniceAdmin().getOffLinePushStatus(this.clusterWrapper.getClusterName(), kafkaTopic).getExecutionStatus().equals(ExecutionStatus.COMPLETED);
                } catch (Exception e) {
                    return false;
                }
            });
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 360000, priority = 3)
    public void testAAIngestionWithStoreView() throws Exception {
        ControllerClient controllerClient = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
        ControllerClient controllerClient2 = new ControllerClient(this.clusterName, (String) this.parentControllers.stream().map((v0) -> {
            return v0.getControllerUrl();
        }).collect(Collectors.joining(",")));
        TestUtils.assertCommand(controllerClient2.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty()));
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file:" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        Properties defaultVPJProps = TestWriteUtils.defaultVPJProps(this.parentControllers.get(0).getControllerUrl(), str, uniqueString);
        String schema = writeSimpleAvroFileWithUserSchema.getField("key").schema().toString();
        String schema2 = writeSimpleAvroFileWithUserSchema.getField("value").schema().toString();
        HashMap hashMap = new HashMap();
        hashMap.put("testView", "{\"viewClassName\" : \"" + TestView.class.getCanonicalName() + "\", \"viewParameters\" : {}}");
        hashMap.put("changeCaptureView", "{\"viewClassName\" : \"" + ChangeCaptureView.class.getCanonicalName() + "\", \"viewParameters\" : {}}");
        UpdateStoreQueryParams partitionCount = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true).setHybridRewindSeconds(500L).setHybridOffsetLagThreshold(8L).setChunkingEnabled(true).setNativeReplicationEnabled(true).setPartitionCount(1);
        MetricsRepository metricsRepository = new MetricsRepository();
        ControllerClient createStoreForJob = IntegrationTestPushUtils.createStoreForJob(this.clusterName, schema, schema2, defaultVPJProps, partitionCount);
        partitionCount.setStoreViews(hashMap);
        createStoreForJob.retryableRequest(5, controllerClient3 -> {
            return createStoreForJob.updateStore(uniqueString, partitionCount);
        });
        TestWriteUtils.runPushJob("Run push job", defaultVPJProps);
        Map<String, String> samzaConfig = getSamzaConfig(uniqueString);
        VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
        int i = 1000;
        TestMockTime testMockTime = new TestMockTime();
        ZkServerWrapper zkServerWrapper = this.multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper();
        PubSubBrokerWrapper pubSubBroker = ServiceFactory.getPubSubBroker(new PubSubBrokerConfigs.Builder().setZkWrapper(zkServerWrapper).setMockTime(testMockTime).build());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", pubSubBroker.getAddress());
        properties.put("key.deserializer", KafkaKeySerializer.class);
        properties.put("value.deserializer", KafkaValueSerializer.class);
        properties.put("receive.buffer.bytes", 1048576);
        ChangelogClientConfig controllerRequestRetryCount = new ChangelogClientConfig().setViewName("changeCaptureView").setConsumerProperties(properties).setControllerD2ServiceName(VeniceControllerWrapper.D2_SERVICE_NAME).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setLocalD2ZkHosts(zkServerWrapper.getAddress()).setControllerRequestRetryCount(3);
        VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(controllerRequestRetryCount, metricsRepository);
        VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory2 = new VeniceChangelogConsumerClientFactory(ChangelogClientConfig.cloneConfig(controllerRequestRetryCount).setViewName(""), metricsRepository);
        VeniceChangelogConsumer changelogConsumer = veniceChangelogConsumerClientFactory.getChangelogConsumer(uniqueString);
        changelogConsumer.subscribeAll().get();
        VeniceSystemProducer closableProducer = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
        try {
            closableProducer.start();
            runSamzaStreamJob(closableProducer, uniqueString, null, 10, 10, 0);
            produceRecordWithLogicalTimestamp(closableProducer, uniqueString, 1000, 1000L, true);
            if (closableProducer != null) {
                closableProducer.close();
            }
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
            try {
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i)).get());
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                HashMap hashMap2 = new HashMap();
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                    Assert.assertEquals(hashMap2.size(), 100);
                });
                hashMap2.clear();
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                    Assert.assertEquals(hashMap2.size(), 21);
                    for (int i2 = 0; i2 < 10; i2++) {
                        String num = Integer.toString(i2);
                        ChangeEvent changeEvent = (ChangeEvent) hashMap2.get(num);
                        Assert.assertNotNull(changeEvent);
                        if (i2 == 0) {
                            Assert.assertNull(changeEvent.getPreviousValue());
                        } else {
                            Assert.assertTrue(((Utf8) changeEvent.getPreviousValue()).toString().contains(num));
                        }
                        Assert.assertEquals(((Utf8) changeEvent.getCurrentValue()).toString(), "stream_" + i2);
                    }
                    for (int i3 = 10; i3 < 20; i3++) {
                        ChangeEvent changeEvent2 = (ChangeEvent) hashMap2.get(Integer.toString(i3));
                        Assert.assertNotNull(changeEvent2);
                        Assert.assertNull(changeEvent2.getPreviousValue());
                        Assert.assertNull(changeEvent2.getCurrentValue());
                    }
                });
                defaultVPJProps.setProperty("source.kafka", "true");
                defaultVPJProps.setProperty("kafka.input.broker.url", this.clusterWrapper.getKafka().getAddress());
                defaultVPJProps.setProperty("kafka.input.max.records.per.mapper", "5");
                defaultVPJProps.put("rewind.time.in.seconds.override", 0);
                TestWriteUtils.runPushJob("Run repush job", defaultVPJProps);
                ControllerClient controllerClient4 = new ControllerClient(this.clusterName, this.childDatacenters.get(0).getControllerConnectString());
                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                    Assert.assertEquals(controllerClient4.getStore(uniqueString).getStore().getCurrentVersion(), 2);
                });
                this.clusterWrapper.refreshAllRouterMetaData();
                andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
                try {
                    TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                        for (int i2 = 0; i2 < 10; i2++) {
                            Utf8 utf8 = (Utf8) andStartGenericAvroClient.get(Integer.toString(i2)).get();
                            Assert.assertNotNull(utf8);
                            Assert.assertEquals(utf8.toString(), "stream_" + i2);
                        }
                        for (int i3 = 10; i3 < 20; i3++) {
                            Assert.assertNull((Utf8) andStartGenericAvroClient.get(Integer.toString(i3)).get());
                        }
                        for (int i4 = 20; i4 < 100; i4++) {
                            Utf8 utf82 = (Utf8) andStartGenericAvroClient.get(Integer.toString(i4)).get();
                            Assert.assertNotNull(utf82);
                            Assert.assertTrue(utf82.toString().contains(String.valueOf(i4).substring(0, 0)));
                        }
                    });
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    closableProducer = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
                    try {
                        closableProducer.start();
                        produceRecordWithLogicalTimestamp(closableProducer, uniqueString, 1000, 2L, false);
                        produceRecordWithLogicalTimestamp(closableProducer, uniqueString, 1000 + 1, 1L, false);
                        if (closableProducer != null) {
                            closableProducer.close();
                        }
                        AvroGenericStoreClient andStartGenericAvroClient2 = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
                        try {
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                                Assert.assertNotNull(andStartGenericAvroClient2.get(Integer.toString(i + 1)).get());
                            });
                            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                                Assert.assertNull(andStartGenericAvroClient2.get(Integer.toString(i)).get());
                            });
                            if (andStartGenericAvroClient2 != null) {
                                andStartGenericAvroClient2.close();
                            }
                            VeniceChangelogConsumer changelogConsumer2 = veniceChangelogConsumerClientFactory2.getChangelogConsumer(uniqueString);
                            changelogConsumer2.subscribeAll().get();
                            hashMap2.clear();
                            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                                pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                Object num = Integer.toString(i);
                                String num2 = Integer.toString(i + 1);
                                Assert.assertNull(hashMap2.get(num));
                                Assert.assertNotNull(hashMap2.get(num2));
                                Assert.assertEquals(((Utf8) ((ChangeEvent) hashMap2.get(num2)).getCurrentValue()).toString(), "stream_" + num2);
                            });
                            controllerClient2.sendEmptyPushAndWait(uniqueString, "Run empty push job", 1000L, 30000L);
                            LinkedList linkedList = new LinkedList();
                            Instant now = Instant.now();
                            linkedList.add(Long.valueOf(now.toEpochMilli()));
                            linkedList.add(Long.valueOf(now.minus(1L, (TemporalUnit) ChronoUnit.HOURS).toEpochMilli()));
                            MockCircularTime mockCircularTime = new MockCircularTime(linkedList);
                            closableProducer = veniceSystemFactory.getClosableProducer("venice", new MapConfig(samzaConfig), (MetricsRegistry) null);
                            try {
                                closableProducer.start();
                                runSamzaStreamJob(closableProducer, uniqueString, mockCircularTime, 10, 10, 20);
                                if (closableProducer != null) {
                                    closableProducer.close();
                                }
                                hashMap2.clear();
                                AtomicInteger atomicInteger = new AtomicInteger();
                                HashMap hashMap3 = new HashMap();
                                HashMap hashMap4 = new HashMap();
                                TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
                                    pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                    pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                    pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                    Assert.assertEquals(hashMap2.size(), 20);
                                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                                        atomicInteger.addAndGet(pollAfterImageEventsFromChangeCaptureConsumer(hashMap3, changelogConsumer2));
                                        Assert.assertEquals(hashMap3.size(), 91);
                                        hashMap4.putAll(hashMap3);
                                        hashMap3.clear();
                                    });
                                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                                        atomicInteger.addAndGet(pollAfterImageEventsFromChangeCaptureConsumer(hashMap3, changelogConsumer2));
                                        Assert.assertEquals(hashMap3.size(), 1);
                                        hashMap4.putAll(hashMap3);
                                        hashMap3.clear();
                                    });
                                    TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                                        atomicInteger.addAndGet(pollAfterImageEventsFromChangeCaptureConsumer(hashMap3, changelogConsumer2));
                                        Assert.assertEquals(hashMap3.size(), 20);
                                        hashMap4.putAll(hashMap3);
                                        hashMap3.clear();
                                    });
                                    atomicInteger.addAndGet(pollAfterImageEventsFromChangeCaptureConsumer(hashMap3, changelogConsumer2));
                                    Assert.assertEquals(hashMap3.size(), 0);
                                    Assert.assertEquals(atomicInteger.get(), 112);
                                    for (int i2 = 20; i2 < 40; i2++) {
                                        ChangeEvent changeEvent = (ChangeEvent) hashMap2.get(Integer.toString(i2));
                                        Assert.assertNotNull(changeEvent);
                                        Assert.assertNull(changeEvent.getPreviousValue());
                                        if (i2 < 20 || i2 >= 30) {
                                            Assert.assertNull(changeEvent.getCurrentValue());
                                        } else {
                                            Assert.assertEquals(((Utf8) changeEvent.getCurrentValue()).toString(), "stream_" + i2);
                                        }
                                    }
                                    for (int i3 = 0; i3 < 100; i3++) {
                                        Utf8 utf8 = (Utf8) hashMap4.get(Integer.toString(i3));
                                        if (i3 < 10 || (i3 >= 20 && i3 < 30)) {
                                            Assert.assertNotNull(utf8);
                                            Assert.assertEquals(utf8.toString(), "stream_" + i3);
                                        } else if (i3 < 40) {
                                            Assert.assertNull(utf8);
                                        } else {
                                            Assert.assertTrue(utf8.toString().contains(String.valueOf(i3).substring(0, 0)));
                                        }
                                    }
                                });
                                defaultVPJProps.setProperty("repush.ttl.enable", "true");
                                TestWriteUtils.runPushJob("Run repush job with TTL", defaultVPJProps);
                                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                                    Assert.assertEquals(controllerClient4.getStore(uniqueString).getStore().getCurrentVersion(), 4);
                                });
                                this.clusterWrapper.refreshAllRouterMetaData();
                                andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.clusterWrapper.getRandomRouterURL()).setMetricsRepository(metricsRepository));
                                int i2 = 0;
                                int i3 = 0;
                                for (int i4 = 20; i4 < 30; i4++) {
                                    try {
                                        if (andStartGenericAvroClient.get(Integer.toString(i4)).get() == null) {
                                            i3++;
                                        } else {
                                            i2++;
                                        }
                                    } finally {
                                        if (andStartGenericAvroClient != null) {
                                            try {
                                                andStartGenericAvroClient.close();
                                            } catch (Throwable th) {
                                                th.addSuppressed(th);
                                            }
                                        }
                                    }
                                }
                                Assert.assertEquals(i2, 5);
                                Assert.assertEquals(i3, 5);
                                for (int i5 = 30; i5 < 40; i5++) {
                                    Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i5)).get());
                                }
                                for (int i6 = 40; i6 < 100; i6++) {
                                    Assert.assertNull(andStartGenericAvroClient.get(Integer.toString(i6)).get());
                                }
                                if (andStartGenericAvroClient != null) {
                                    andStartGenericAvroClient.close();
                                }
                                hashMap2.clear();
                                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, true, () -> {
                                    pollChangeEventsFromChangeCaptureConsumer(hashMap2, changelogConsumer);
                                    Assert.assertEquals(hashMap2.size(), 0);
                                });
                                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                                    Assert.assertEquals(TestView.getInstance().getVersionSwapCountForStore(uniqueString), 3);
                                });
                                TestUtils.waitForNonDeterministicAssertion(5L, TimeUnit.SECONDS, () -> {
                                    Assert.assertEquals(TestView.getInstance().getRecordCountForStore(uniqueString), 85);
                                });
                                controllerClient2.disableAndDeleteStore(uniqueString);
                                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                                    MultiStoreTopicsResponse deletableStoreTopics = controllerClient.getDeletableStoreTopics();
                                    Assert.assertFalse(deletableStoreTopics.isError());
                                    Assert.assertEquals(deletableStoreTopics.getTopics().size(), 0);
                                });
                            } finally {
                                if (closableProducer != null) {
                                    try {
                                        closableProducer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                            }
                        } finally {
                            if (andStartGenericAvroClient2 != null) {
                                try {
                                    andStartGenericAvroClient2.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private void runSamzaStreamJob(VeniceSystemProducer veniceSystemProducer, String str, Time time, int i, int i2, int i3) {
        for (int i4 = i3; i4 < i3 + i; i4++) {
            IntegrationTestPushUtils.sendStreamingRecord(veniceSystemProducer, str, Integer.toString(i4), "stream_" + i4, time == null ? null : Long.valueOf(time.getMilliseconds()));
        }
        for (int i5 = i3 + i; i5 < i3 + i + i2; i5++) {
            IntegrationTestPushUtils.sendStreamingDeleteRecord(veniceSystemProducer, str, Integer.toString(i5), time == null ? null : Long.valueOf(time.getMilliseconds()));
        }
    }

    private void produceRecordWithLogicalTimestamp(VeniceSystemProducer veniceSystemProducer, String str, int i, long j, boolean z) {
        if (z) {
            IntegrationTestPushUtils.sendStreamingDeleteRecord(veniceSystemProducer, str, Integer.toString(i), Long.valueOf(j));
        } else {
            IntegrationTestPushUtils.sendStreamingRecord(veniceSystemProducer, str, Integer.toString(i), "stream_" + i, Long.valueOf(j));
        }
    }

    private Map<String, String> getSamzaConfig(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("systems.venice.push.type", Version.PushType.STREAM.toString());
        hashMap.put("systems.venice.store", str);
        hashMap.put("systems.venice.aggregate", "false");
        hashMap.put("venice.child.d2.zk.hosts", this.childDatacenters.get(0).getZkServerWrapper().getAddress());
        hashMap.put("venice.child.controller.d2.service", VeniceControllerWrapper.D2_SERVICE_NAME);
        hashMap.put("venice.parent.d2.zk.hosts", "dfd");
        hashMap.put("venice.parent.controller.d2.service", VeniceControllerWrapper.PARENT_D2_SERVICE_NAME);
        hashMap.put("deployment.id", Utils.getUniqueString("venice-push-id"));
        hashMap.put("ssl.enabled", "false");
        return hashMap;
    }
}
