package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerIngestionProgressNotifier;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.MemoryBoundBlockingQueueTest;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskFactory;
import com.linkedin.davinci.notifier.LogNotifier;
import com.linkedin.davinci.notifier.PartitionPushStatusNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStoragePartition;
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackendTest;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.venice.exceptions.UnsubscribedTopicPartitionException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceIngestionTaskKilledException;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.exceptions.validation.CorruptDataException;
import com.linkedin.venice.exceptions.validation.FatalDataValidationException;
import com.linkedin.venice.exceptions.validation.MissingDataException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.TopicSwitch;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.kafka.validation.checksum.CheckSum;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.PartitionerConfigImpl;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.offsets.DeepCopyStorageMetadataService;
import com.linkedin.venice.offsets.InMemoryStorageMetadataService;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.UserPartitionAwarePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.unit.kafka.SimplePartitioner;
import com.linkedin.venice.unit.kafka.consumer.MockInMemoryConsumer;
import com.linkedin.venice.unit.kafka.consumer.poll.ArbitraryOrderingPollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.BlockingObserverPollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.CompositePollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.DuplicatingPollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.FilteringPollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.PollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.PubSubTopicPartitionOffset;
import com.linkedin.venice.unit.kafka.consumer.poll.RandomPollStrategy;
import com.linkedin.venice.unit.kafka.producer.MockInMemoryProducerAdapter;
import com.linkedin.venice.unit.kafka.producer.TransformingProducerAdapter;
import com.linkedin.venice.unit.matchers.ExceptionClassMatcher;
import com.linkedin.venice.unit.matchers.LongEqualOrGreaterThanMatcher;
import com.linkedin.venice.unit.matchers.NonEmptyStringMatcher;
import com.linkedin.venice.utils.ByteArray;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.TestMockTime;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.class */
public abstract class StoreIngestionTaskTest {
    private static final long READ_CYCLE_DELAY_MS = 5;
    private static final long TEST_TIMEOUT_MS = 5000;
    private static final int RUN_TEST_FUNCTION_TIMEOUT_SECONDS = 10;
    private static final long EMPTY_POLL_SLEEP_MS = 0;
    private InMemoryKafkaBroker inMemoryLocalKafkaBroker;
    private InMemoryKafkaBroker inMemoryRemoteKafkaBroker;
    private MockInMemoryConsumer inMemoryLocalKafkaConsumer;
    private MockInMemoryConsumer inMemoryRemoteKafkaConsumer;
    private VeniceWriterFactory mockWriterFactory;
    private VeniceWriter localVeniceWriter;
    private StorageEngineRepository mockStorageEngineRepository;
    private VeniceNotifier mockLogNotifier;
    private VeniceNotifier mockPartitionStatusNotifier;
    private VeniceNotifier mockLeaderFollowerStateModelNotifier;
    private List<Object[]> mockNotifierProgress;
    private List<Object[]> mockNotifierEOPReceived;
    private List<Object[]> mockNotifierCompleted;
    private List<Object[]> mockNotifierError;
    private StorageMetadataService mockStorageMetadataService;
    private AbstractStorageEngine mockAbstractStorageEngine;
    private EventThrottler mockBandwidthThrottler;
    private EventThrottler mockRecordsThrottler;
    private Map<String, EventThrottler> kafkaUrlToRecordsThrottler;
    private KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler;
    private ReadOnlySchemaRepository mockSchemaRepo;
    private ReadOnlyStoreRepository mockMetadataRepo;
    private PubSubConsumerAdapter mockLocalKafkaConsumer;
    private PubSubConsumerAdapter mockRemoteKafkaConsumer;
    private TopicManager mockTopicManager;
    private TopicManagerRepository mockTopicManagerRepository;
    private AggHostLevelIngestionStats mockAggStoreIngestionStats;
    private HostLevelIngestionStats mockStoreIngestionStats;
    private AggVersionedDIVStats mockVersionedDIVStats;
    private AggVersionedIngestionStats mockVersionedStorageIngestionStats;
    private StoreIngestionTask storeIngestionTaskUnderTest;
    private ExecutorService taskPollingService;
    private StoreBufferService storeBufferService;
    private AggKafkaConsumerService aggKafkaConsumerService;
    private BooleanSupplier isCurrentVersion;
    private Optional<HybridStoreConfig> hybridStoreConfig;
    private VeniceServerConfig veniceServerConfig;
    private RocksDBServerConfig rocksDBServerConfig;
    private KafkaConsumerService localKafkaConsumerService;
    private KafkaConsumerService remoteKafkaConsumerService;
    private StorePartitionDataReceiver localConsumedDataReceiver;
    private StorePartitionDataReceiver remoteConsumedDataReceiver;
    private static String storeNameWithoutVersionInfo;
    private String topic;
    private PubSubTopic pubSubTopic;
    private PubSubTopicPartition fooTopicPartition;
    private PubSubTopicPartition barTopicPartition;
    private static final int PARTITION_COUNT = 10;
    private static final Set<Integer> ALL_PARTITIONS;
    private static final int PARTITION_FOO = 1;
    private static final int PARTITION_BAR = 2;
    private static final int SCHEMA_ID = 1;
    private static final int EXISTING_SCHEMA_ID = 1;
    private static final int NON_EXISTING_SCHEMA_ID = 2;
    private static final Schema STRING_SCHEMA;
    private static final byte[] putKeyFoo;
    private static final byte[] putKeyFoo2;
    private static final byte[] putKeyBar;
    private static final byte[] putValue;
    private static final byte[] putValueToCorrupt;
    private static final byte[] deleteKeyFoo;
    private static final int REPLICATION_METADATA_VERSION_ID = 1;
    private static final Schema REPLICATION_METADATA_SCHEMA;
    private static final RecordSerializer REPLICATION_METADATA_SERIALIZER;
    private static final long PUT_KEY_FOO_TIMESTAMP = 2;
    private static final long DELETE_KEY_FOO_TIMESTAMP = 2;
    private static final long PUT_KEY_FOO_OFFSET = 1;
    private static final long DELETE_KEY_FOO_OFFSET = 2;
    private static final byte[] putKeyFooReplicationMetadataWithValueSchemaIdBytesDefault;
    private static final byte[] putKeyFooReplicationMetadataWithValueSchemaIdBytes;
    private static final byte[] deleteKeyFooReplicationMetadataWithValueSchemaIdBytes;
    private static final Logger LOGGER = LogManager.getLogger(StoreIngestionTaskTest.class);
    private static final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private static final KafkaPubSubMessageDeserializer pubSubDeserializer = new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new));
    private long databaseSyncBytesIntervalForTransactionalMode = PUT_KEY_FOO_OFFSET;
    private long databaseSyncBytesIntervalForDeferredWriteMode = 2;
    private boolean databaseChecksumVerificationEnabled = false;
    private KafkaConsumerServiceStats kafkaConsumerServiceStats = (KafkaConsumerServiceStats) Mockito.mock(KafkaConsumerServiceStats.class);
    private PubSubConsumerAdapterFactory mockFactory = (PubSubConsumerAdapterFactory) Mockito.mock(PubSubConsumerAdapterFactory.class);
    private Supplier<StoreVersionState> storeVersionStateSupplier = () -> {
        return new StoreVersionState();
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest$CorruptedKafkaProducerAdapter.class */
    public static class CorruptedKafkaProducerAdapter extends TransformingProducerAdapter {
        public CorruptedKafkaProducerAdapter(PubSubProducerAdapter pubSubProducerAdapter, String str, byte[] bArr) {
            super(pubSubProducerAdapter, (str2, kafkaKey, kafkaMessageEnvelope, i) -> {
                if (MessageType.valueOf(kafkaMessageEnvelope) == MessageType.PUT) {
                    Put put = (Put) kafkaMessageEnvelope.payloadUnion;
                    if (put.putValue.array() == bArr) {
                        put.putValue = ByteBuffer.wrap("CORRUPT_VALUE".getBytes());
                        kafkaMessageEnvelope.payloadUnion = put;
                    }
                }
                return new TransformingProducerAdapter.SendMessageParameters(str, kafkaKey, kafkaMessageEnvelope, i);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest$MockStoreVersionConfigs.class */
    public static class MockStoreVersionConfigs {
        Store store;
        Version version;
        VeniceStoreVersionConfig storeVersionConfig;

        private MockStoreVersionConfigs(Store store, Version version, VeniceStoreVersionConfig veniceStoreVersionConfig) {
            this.store = store;
            this.version = version;
            this.storeVersionConfig = veniceStoreVersionConfig;
        }
    }

    private static byte[] getRandomKey(Integer num) {
        String uniqueString = Utils.getUniqueString("KeyForPartition" + num);
        return ByteBuffer.allocate(uniqueString.length() + 1).put(num.byteValue()).put(uniqueString.getBytes()).array();
    }

    private static byte[] createReplicationMetadataWithValueSchemaId(long j, long j2, int i) {
        GenericData.Record record = new GenericData.Record(REPLICATION_METADATA_SCHEMA);
        record.put("timestamp", Long.valueOf(j));
        record.put("replication_checkpoint_vector", Collections.singletonList(Long.valueOf(j2)));
        ByteBuffer prependIntHeaderToByteBuffer = ByteUtils.prependIntHeaderToByteBuffer(ByteBuffer.wrap(REPLICATION_METADATA_SERIALIZER.serialize(record)), i, false);
        prependIntHeaderToByteBuffer.position(prependIntHeaderToByteBuffer.position() - 4);
        return ByteUtils.extractByteArray(prependIntHeaderToByteBuffer);
    }

    @BeforeClass(alwaysRun = true)
    public void suiteSetUp() throws Exception {
        this.taskPollingService = Executors.newFixedThreadPool(1);
        this.storeBufferService = new StoreBufferService(3, 10000L, 1000L, isStoreWriterBufferAfterLeaderLogicEnabled());
        this.storeBufferService.start();
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() throws Exception {
        TestUtils.shutdownExecutor(this.taskPollingService);
        this.storeBufferService.stop();
    }

    @AfterMethod(alwaysRun = true)
    public void methodCleanUp() throws Exception {
        if (this.localKafkaConsumerService != null) {
            this.localKafkaConsumerService.stopInner();
        }
        if (this.remoteKafkaConsumerService != null) {
            this.remoteKafkaConsumerService.stopInner();
        }
    }

    @BeforeMethod(alwaysRun = true)
    public void methodSetUp() throws Exception {
        this.aggKafkaConsumerService = (AggKafkaConsumerService) Mockito.mock(AggKafkaConsumerService.class);
        storeNameWithoutVersionInfo = Utils.getUniqueString("TestTopic");
        this.topic = Version.composeKafkaTopic(storeNameWithoutVersionInfo, 1);
        this.pubSubTopic = pubSubTopicRepository.getTopic(this.topic);
        this.fooTopicPartition = new PubSubTopicPartitionImpl(this.pubSubTopic, 1);
        this.barTopicPartition = new PubSubTopicPartitionImpl(this.pubSubTopic, 2);
        this.inMemoryLocalKafkaBroker = new InMemoryKafkaBroker("local");
        this.inMemoryLocalKafkaBroker.createTopic(this.topic, 10);
        this.inMemoryRemoteKafkaBroker = new InMemoryKafkaBroker("remote");
        this.inMemoryRemoteKafkaBroker.createTopic(this.topic, 10);
        this.localVeniceWriter = getVeniceWriter(new MockInMemoryProducerAdapter(this.inMemoryLocalKafkaBroker));
        this.mockStorageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        this.mockLogNotifier = (VeniceNotifier) Mockito.mock(LogNotifier.class);
        this.mockNotifierProgress = new ArrayList();
        ((VeniceNotifier) Mockito.doAnswer(invocationOnMock -> {
            this.mockNotifierProgress.add(invocationOnMock.getArguments());
            return null;
        }).when(this.mockLogNotifier)).progress(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong());
        this.mockNotifierEOPReceived = new ArrayList();
        ((VeniceNotifier) Mockito.doAnswer(invocationOnMock2 -> {
            this.mockNotifierEOPReceived.add(invocationOnMock2.getArguments());
            return null;
        }).when(this.mockLogNotifier)).endOfPushReceived(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong());
        this.mockNotifierCompleted = new ArrayList();
        ((VeniceNotifier) Mockito.doAnswer(invocationOnMock3 -> {
            this.mockNotifierCompleted.add(invocationOnMock3.getArguments());
            return null;
        }).when(this.mockLogNotifier)).completed(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong(), Mockito.anyString());
        this.mockNotifierError = new ArrayList();
        ((VeniceNotifier) Mockito.doAnswer(invocationOnMock4 -> {
            this.mockNotifierError.add(invocationOnMock4.getArguments());
            return null;
        }).when(this.mockLogNotifier)).error(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString(), (Exception) Mockito.any());
        this.mockPartitionStatusNotifier = (VeniceNotifier) Mockito.mock(PartitionPushStatusNotifier.class);
        this.mockLeaderFollowerStateModelNotifier = (VeniceNotifier) Mockito.mock(LeaderFollowerIngestionProgressNotifier.class);
        this.mockStorageMetadataService = (StorageMetadataService) Mockito.mock(StorageMetadataService.class);
        this.mockBandwidthThrottler = (EventThrottler) Mockito.mock(EventThrottler.class);
        this.mockRecordsThrottler = (EventThrottler) Mockito.mock(EventThrottler.class);
        this.mockSchemaRepo = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        this.mockMetadataRepo = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        this.mockLocalKafkaConsumer = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        this.mockRemoteKafkaConsumer = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        this.kafkaUrlToRecordsThrottler = new HashMap();
        this.kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(this.kafkaUrlToRecordsThrottler);
        this.mockTopicManager = (TopicManager) Mockito.mock(TopicManager.class);
        this.mockTopicManagerRepository = (TopicManagerRepository) Mockito.mock(TopicManagerRepository.class);
        ((TopicManagerRepository) Mockito.doReturn(this.mockTopicManager).when(this.mockTopicManagerRepository)).getTopicManager();
        this.mockAggStoreIngestionStats = (AggHostLevelIngestionStats) Mockito.mock(AggHostLevelIngestionStats.class);
        this.mockStoreIngestionStats = (HostLevelIngestionStats) Mockito.mock(HostLevelIngestionStats.class);
        ((AggHostLevelIngestionStats) Mockito.doReturn(this.mockStoreIngestionStats).when(this.mockAggStoreIngestionStats)).getStoreStats(Mockito.anyString());
        this.mockVersionedDIVStats = (AggVersionedDIVStats) Mockito.mock(AggVersionedDIVStats.class);
        this.mockVersionedStorageIngestionStats = (AggVersionedIngestionStats) Mockito.mock(AggVersionedIngestionStats.class);
        this.isCurrentVersion = () -> {
            return false;
        };
        this.hybridStoreConfig = Optional.empty();
        this.databaseChecksumVerificationEnabled = false;
        this.rocksDBServerConfig = (RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class);
        ((ReadOnlySchemaRepository) Mockito.doReturn(true).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 1);
        ((ReadOnlySchemaRepository) Mockito.doReturn(false).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 2);
        ((ReadOnlySchemaRepository) Mockito.doReturn(new RmdSchemaEntry(1, 1, REPLICATION_METADATA_SCHEMA)).when(this.mockSchemaRepo)).getReplicationMetadataSchema(storeNameWithoutVersionInfo, 1, 1);
        setDefaultStoreVersionStateSupplier();
    }

    private VeniceWriter getVeniceWriter(String str, PubSubProducerAdapter pubSubProducerAdapter, int i) {
        return new VeniceWriter(new VeniceWriterOptions.Builder(str).setKeySerializer(new DefaultSerializer()).setValueSerializer(new DefaultSerializer()).setWriteComputeSerializer(new DefaultSerializer()).setPartitioner(getVenicePartitioner(i)).setTime(SystemTime.INSTANCE).build(), VeniceProperties.empty(), pubSubProducerAdapter);
    }

    private VenicePartitioner getVenicePartitioner(int i) {
        VenicePartitioner simplePartitioner = new SimplePartitioner();
        return i == 1 ? simplePartitioner : new UserPartitionAwarePartitioner(simplePartitioner, i);
    }

    private VeniceWriter getVeniceWriter(PubSubProducerAdapter pubSubProducerAdapter) {
        return new VeniceWriter(new VeniceWriterOptions.Builder(this.topic).setKeySerializer(new DefaultSerializer()).setValueSerializer(new DefaultSerializer()).setWriteComputeSerializer(new DefaultSerializer()).setPartitioner(new SimplePartitioner()).setTime(SystemTime.INSTANCE).build(), VeniceProperties.empty(), pubSubProducerAdapter);
    }

    private VeniceWriter getCorruptedVeniceWriter(byte[] bArr, InMemoryKafkaBroker inMemoryKafkaBroker) {
        return getVeniceWriter(new CorruptedKafkaProducerAdapter(new MockInMemoryProducerAdapter(inMemoryKafkaBroker), this.topic, bArr));
    }

    private long getOffset(Future<PubSubProduceResult> future) throws ExecutionException, InterruptedException {
        return future.get().getOffset();
    }

    private void runTest(Set<Integer> set, Runnable runnable, boolean z) throws Exception {
        runTest(set, () -> {
        }, runnable, z);
    }

    private void runTest(Set<Integer> set, Runnable runnable, Runnable runnable2, boolean z) throws Exception {
        runTest(new RandomPollStrategy(), set, runnable, runnable2, this.hybridStoreConfig, false, Optional.empty(), z, 1, Collections.emptyMap(), veniceStoreVersionConfig -> {
        });
    }

    private void runTest(Set<Integer> set, Runnable runnable, Runnable runnable2, boolean z, Consumer<VeniceStoreVersionConfig> consumer) throws Exception {
        runTest(new RandomPollStrategy(), set, runnable, runnable2, this.hybridStoreConfig, false, Optional.empty(), z, 1, Collections.emptyMap(), consumer);
    }

    private void runTest(PollStrategy pollStrategy, Set<Integer> set, Runnable runnable, Runnable runnable2, boolean z) throws Exception {
        runTest(pollStrategy, set, runnable, runnable2, this.hybridStoreConfig, false, Optional.empty(), z, 1, Collections.emptyMap(), veniceStoreVersionConfig -> {
        });
    }

    private void runTest(PollStrategy pollStrategy, Set<Integer> set, Runnable runnable, Runnable runnable2, Optional<HybridStoreConfig> optional, boolean z, Optional<DiskUsage> optional2, boolean z2, int i, Map<String, Object> map) throws Exception {
        runTest(pollStrategy, set, runnable, runnable2, optional, z, optional2, z2, i, map, veniceStoreVersionConfig -> {
        });
    }

    private void runTest(PollStrategy pollStrategy, Set<Integer> set, Runnable runnable, Runnable runnable2, Optional<HybridStoreConfig> optional, boolean z, Optional<DiskUsage> optional2, boolean z2, int i, Map<String, Object> map, Consumer<VeniceStoreVersionConfig> consumer) throws Exception {
        VenicePartitioner venicePartitioner = getVenicePartitioner(1);
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(venicePartitioner.getClass().getName());
        partitionerConfigImpl.setAmplificationFactor(i);
        MockStoreVersionConfigs mockStoreVersionConfigs = setupStoreAndVersionMocks(10 / i, partitionerConfigImpl, optional, z, false, z2, consumer);
        Store store = mockStoreVersionConfigs.store;
        Version version = mockStoreVersionConfigs.version;
        VeniceStoreVersionConfig veniceStoreVersionConfig = mockStoreVersionConfigs.storeVersionConfig;
        StoreIngestionTaskFactory build = getIngestionTaskFactoryBuilder(pollStrategy, set, optional2, i, map, false).build();
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        this.storeIngestionTaskUnderTest = build.getNewIngestionTask(store, version, properties, this.isCurrentVersion, veniceStoreVersionConfig, PartitionUtils.getLeaderSubPartition(1, i), false, Optional.empty());
        Future<?> future = null;
        try {
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                this.storeIngestionTaskUnderTest.subscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopic, it.next().intValue()), Optional.empty());
            }
            runnable.run();
            future = this.taskPollingService.submit((Runnable) this.storeIngestionTaskUnderTest);
            runnable2.run();
            this.storeIngestionTaskUnderTest.close();
            if (future != null) {
                future.get(10L, TimeUnit.SECONDS);
            }
        } catch (Throwable th) {
            this.storeIngestionTaskUnderTest.close();
            if (future != null) {
                future.get(10L, TimeUnit.SECONDS);
            }
            throw th;
        }
    }

    private MockStoreVersionConfigs setupStoreAndVersionMocks(int i, PartitionerConfig partitionerConfig, Optional<HybridStoreConfig> optional, boolean z, boolean z2, boolean z3) {
        return setupStoreAndVersionMocks(i, partitionerConfig, optional, z, z2, z3, veniceStoreVersionConfig -> {
        });
    }

    private MockStoreVersionConfigs setupStoreAndVersionMocks(int i, PartitionerConfig partitionerConfig, Optional<HybridStoreConfig> optional, boolean z, boolean z2, boolean z3, Consumer<VeniceStoreVersionConfig> consumer) {
        boolean isPresent = optional.isPresent();
        HybridStoreConfig hybridStoreConfig = null;
        if (isPresent) {
            hybridStoreConfig = optional.get();
        }
        VeniceStoreVersionConfig defaultMockVeniceStoreVersionConfig = getDefaultMockVeniceStoreVersionConfig(consumer);
        Store store = (Store) Mockito.mock(Store.class);
        VersionImpl versionImpl = new VersionImpl(storeNameWithoutVersionInfo, 1, "1", i);
        ((Store) Mockito.doReturn(storeNameWithoutVersionInfo).when(store)).getName();
        versionImpl.setPartitionerConfig(partitionerConfig);
        ((Store) Mockito.doReturn(partitionerConfig).when(store)).getPartitionerConfig();
        versionImpl.setIncrementalPushEnabled(z);
        ((Store) Mockito.doReturn(Boolean.valueOf(z)).when(store)).isIncrementalPushEnabled();
        versionImpl.setHybridStoreConfig(hybridStoreConfig);
        ((Store) Mockito.doReturn(hybridStoreConfig).when(store)).getHybridStoreConfig();
        ((Store) Mockito.doReturn(Boolean.valueOf(isPresent)).when(store)).isHybrid();
        versionImpl.setBufferReplayEnabledForHybrid(true);
        versionImpl.setNativeReplicationEnabled(z2);
        ((Store) Mockito.doReturn(Boolean.valueOf(z2)).when(store)).isNativeReplicationEnabled();
        versionImpl.setPushStreamSourceAddress("");
        ((Store) Mockito.doReturn("").when(store)).getPushStreamSourceAddress();
        ((Store) Mockito.doReturn(false).when(store)).isWriteComputationEnabled();
        ((Store) Mockito.doReturn(1).when(store)).getPartitionCount();
        ((Store) Mockito.doReturn(false).when(store)).isHybridStoreDiskQuotaEnabled();
        ((Store) Mockito.doReturn(-1).when(store)).getCurrentVersion();
        ((Store) Mockito.doReturn(1).when(store)).getBootstrapToOnlineTimeoutInHours();
        versionImpl.setActiveActiveReplicationEnabled(z3);
        ((Store) Mockito.doReturn(Boolean.valueOf(z3)).when(store)).isActiveActiveReplicationEnabled();
        versionImpl.setRmdVersionId(1);
        ((Store) Mockito.doReturn(Optional.of(versionImpl)).when(store)).getVersion(Mockito.anyInt());
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStoreOrThrow(storeNameWithoutVersionInfo);
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStore(storeNameWithoutVersionInfo);
        return new MockStoreVersionConfigs(store, versionImpl, defaultMockVeniceStoreVersionConfig);
    }

    private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder(PollStrategy pollStrategy, Set<Integer> set, Optional<DiskUsage> optional, int i, Map<String, Object> map, Boolean bool) {
        DiskUsage diskUsage;
        ((StorageEngineRepository) Mockito.doReturn(new DeepCopyStorageEngine(this.mockAbstractStorageEngine)).when(this.mockStorageEngineRepository)).getLocalStorageEngine(this.topic);
        this.inMemoryLocalKafkaConsumer = new MockInMemoryConsumer(this.inMemoryLocalKafkaBroker, pollStrategy, this.mockLocalKafkaConsumer);
        this.inMemoryRemoteKafkaConsumer = new MockInMemoryConsumer(this.inMemoryRemoteKafkaBroker, pollStrategy, this.mockRemoteKafkaConsumer);
        ((PubSubConsumerAdapterFactory) Mockito.doAnswer(invocationOnMock -> {
            return ((VeniceProperties) invocationOnMock.getArgument(0, VeniceProperties.class)).toProperties().getProperty("kafka.bootstrap.servers").equals(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer()) ? this.inMemoryRemoteKafkaConsumer : this.inMemoryLocalKafkaConsumer;
        }).when(this.mockFactory)).create((VeniceProperties) Mockito.any(), ArgumentMatchers.anyBoolean(), (PubSubMessageDeserializer) Mockito.any(), (String) Mockito.any());
        this.mockWriterFactory = (VeniceWriterFactory) Mockito.mock(VeniceWriterFactory.class);
        ((VeniceWriterFactory) Mockito.doReturn((Object) null).when(this.mockWriterFactory)).createVeniceWriter((VeniceWriterOptions) Mockito.any());
        LOGGER.info("mockStorageMetadataService: {}", this.mockStorageMetadataService.getClass().getName());
        InternalAvroSpecificSerializer serializer = AvroProtocolDefinition.PARTITION_STATE.getSerializer();
        if (this.mockStorageMetadataService.getClass() != InMemoryStorageMetadataService.class) {
            IntListIterator it = PartitionUtils.getSubPartitions(set, i).iterator();
            while (it.hasNext()) {
                ((StorageMetadataService) Mockito.doReturn(new OffsetRecord(serializer)).when(this.mockStorageMetadataService)).getLastOffset(this.topic, ((Integer) it.next()).intValue());
            }
        }
        DeepCopyStorageMetadataService deepCopyStorageMetadataService = new DeepCopyStorageMetadataService(this.mockStorageMetadataService);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(Arrays.asList(this.mockLogNotifier, this.mockPartitionStatusNotifier, this.mockLeaderFollowerStateModelNotifier));
        if (optional.isPresent()) {
            diskUsage = optional.get();
        } else {
            diskUsage = (DiskUsage) Mockito.mock(DiskUsage.class);
            ((DiskUsage) Mockito.doReturn(false).when(diskUsage)).isDiskFull(Mockito.anyLong());
        }
        this.veniceServerConfig = buildVeniceServerConfig(new HashMap(map));
        MetricsRepository metricsRepository = (MetricsRepository) Mockito.mock(MetricsRepository.class);
        ((MetricsRepository) Mockito.doReturn((Sensor) Mockito.mock(Sensor.class)).when(metricsRepository)).sensor(Mockito.anyString(), (Sensor[]) Mockito.any());
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        this.localKafkaConsumerService = getConsumerAssignmentStrategy().constructor.construct(this.mockFactory, properties, 10L, 1, this.mockBandwidthThrottler, this.mockRecordsThrottler, this.kafkaClusterBasedRecordThrottler, metricsRepository, this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), 1000L, (TopicExistenceChecker) Mockito.mock(TopicExistenceChecker.class), bool.booleanValue(), pubSubDeserializer, SystemTime.INSTANCE, this.kafkaConsumerServiceStats, false);
        this.localKafkaConsumerService.start();
        Properties properties2 = new Properties();
        properties2.put("kafka.bootstrap.servers", this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        this.remoteKafkaConsumerService = getConsumerAssignmentStrategy().constructor.construct(this.mockFactory, properties2, 10L, 1, this.mockBandwidthThrottler, this.mockRecordsThrottler, this.kafkaClusterBasedRecordThrottler, metricsRepository, this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), 1000L, (TopicExistenceChecker) Mockito.mock(TopicExistenceChecker.class), bool.booleanValue(), pubSubDeserializer, SystemTime.INSTANCE, this.kafkaConsumerServiceStats, false);
        this.remoteKafkaConsumerService.start();
        ((EventThrottler) Mockito.doReturn(100L).when(this.mockBandwidthThrottler)).getMaxRatePerSecond();
        prepareAggKafkaConsumerServiceMock();
        return StoreIngestionTaskFactory.builder().setVeniceWriterFactory(this.mockWriterFactory).setStorageEngineRepository(this.mockStorageEngineRepository).setStorageMetadataService(deepCopyStorageMetadataService).setLeaderFollowerNotifiersQueue(concurrentLinkedQueue).setSchemaRepository(this.mockSchemaRepo).setMetadataRepository(this.mockMetadataRepo).setTopicManagerRepository(this.mockTopicManagerRepository).setHostLevelIngestionStats(this.mockAggStoreIngestionStats).setVersionedDIVStats(this.mockVersionedDIVStats).setVersionedIngestionStats(this.mockVersionedStorageIngestionStats).setStoreBufferService(this.storeBufferService).setServerConfig(this.veniceServerConfig).setDiskUsage(diskUsage).setAggKafkaConsumerService(this.aggKafkaConsumerService).setCompressorFactory(new StorageEngineBackedCompressorFactory(this.mockStorageMetadataService)).setPubSubTopicRepository(pubSubTopicRepository).setPartitionStateSerializer(serializer);
    }

    abstract KafkaConsumerService.ConsumerAssignmentStrategy getConsumerAssignmentStrategy();

    abstract boolean isStoreWriterBufferAfterLeaderLogicEnabled();

    private void prepareAggKafkaConsumerServiceMock() {
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock -> {
            KafkaConsumerService kafkaConsumerService;
            int i;
            String str = (String) invocationOnMock.getArgument(0, String.class);
            StoreIngestionTask storeIngestionTask = (StoreIngestionTask) invocationOnMock.getArgument(1, StoreIngestionTask.class);
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock.getArgument(2, PubSubTopicPartition.class);
            long longValue = ((Long) invocationOnMock.getArgument(3, Long.class)).longValue();
            boolean equals = str.equals(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
            if (equals) {
                kafkaConsumerService = this.localKafkaConsumerService;
                i = 0;
            } else {
                kafkaConsumerService = this.remoteKafkaConsumerService;
                i = 1;
            }
            StorePartitionDataReceiver storePartitionDataReceiver = new StorePartitionDataReceiver(storeIngestionTask, pubSubTopicPartition, str, i);
            kafkaConsumerService.startConsumptionIntoDataReceiver(pubSubTopicPartition, longValue, storePartitionDataReceiver);
            if (equals) {
                this.localConsumedDataReceiver = storePartitionDataReceiver;
                return null;
            }
            this.remoteConsumedDataReceiver = storePartitionDataReceiver;
            return null;
        }).when(this.aggKafkaConsumerService)).subscribeConsumerFor(Mockito.anyString(), (StoreIngestionTask) Mockito.any(), (PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock2 -> {
            PubSubTopic pubSubTopic = (PubSubTopic) invocationOnMock2.getArgument(0, PubSubTopic.class);
            return Boolean.valueOf(this.localKafkaConsumerService.hasAnySubscriptionFor(pubSubTopic) || this.remoteKafkaConsumerService.hasAnySubscriptionFor(pubSubTopic));
        }).when(this.aggKafkaConsumerService)).hasAnyConsumerAssignedForVersionTopic((PubSubTopic) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock3 -> {
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock3.getArgument(1, PubSubTopicPartition.class);
            return Boolean.valueOf(this.inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition) || this.inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition));
        }).when(this.aggKafkaConsumerService)).hasConsumerAssignedFor((PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock4 -> {
            String str = (String) invocationOnMock4.getArgument(0, String.class);
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock4.getArgument(2, PubSubTopicPartition.class);
            return str.equals(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer()) ? Boolean.valueOf(this.inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) : Boolean.valueOf(this.inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition));
        }).when(this.aggKafkaConsumerService)).hasConsumerAssignedFor(Mockito.anyString(), (PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock5 -> {
            PubSubTopic pubSubTopic = (PubSubTopic) invocationOnMock5.getArgument(0, PubSubTopic.class);
            Set set = (Set) invocationOnMock5.getArgument(1, Set.class);
            this.localKafkaConsumerService.batchUnsubscribe(pubSubTopic, set);
            this.remoteKafkaConsumerService.batchUnsubscribe(pubSubTopic, set);
            return null;
        }).when(this.aggKafkaConsumerService)).batchUnsubscribeConsumerFor((PubSubTopic) Mockito.any(), Mockito.anySet());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock6 -> {
            PubSubTopic pubSubTopic = (PubSubTopic) invocationOnMock6.getArgument(0, PubSubTopic.class);
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock6.getArgument(1, PubSubTopicPartition.class);
            if (this.inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                this.localKafkaConsumerService.unSubscribe(pubSubTopic, pubSubTopicPartition);
            }
            if (!this.inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                return null;
            }
            this.remoteKafkaConsumerService.unSubscribe(pubSubTopic, pubSubTopicPartition);
            return null;
        }).when(this.aggKafkaConsumerService)).unsubscribeConsumerFor((PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock7 -> {
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock7.getArgument(1, PubSubTopicPartition.class);
            if (this.inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                this.inMemoryLocalKafkaConsumer.resetOffset(pubSubTopicPartition);
            }
            if (!this.inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                return null;
            }
            this.inMemoryRemoteKafkaConsumer.resetOffset(pubSubTopicPartition);
            return null;
        }).when(this.aggKafkaConsumerService)).resetOffsetFor((PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock8 -> {
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock8.getArgument(1, PubSubTopicPartition.class);
            if (this.inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                this.inMemoryLocalKafkaConsumer.pause(pubSubTopicPartition);
            }
            if (!this.inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                return null;
            }
            this.inMemoryRemoteKafkaConsumer.pause(pubSubTopicPartition);
            return null;
        }).when(this.aggKafkaConsumerService)).pauseConsumerFor((PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock9 -> {
            return this.localKafkaConsumerService.hasAnySubscriptionFor((PubSubTopic) invocationOnMock9.getArgument(0, PubSubTopic.class)) ? Collections.singleton(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer()) : Collections.emptySet();
        }).when(this.aggKafkaConsumerService)).getKafkaUrlsFor((PubSubTopic) Mockito.any());
        ((AggKafkaConsumerService) Mockito.doAnswer(invocationOnMock10 -> {
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) invocationOnMock10.getArgument(1, PubSubTopicPartition.class);
            if (this.inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                this.inMemoryLocalKafkaConsumer.resume(pubSubTopicPartition);
            }
            if (!this.inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition)) {
                return null;
            }
            this.inMemoryRemoteKafkaConsumer.resume(pubSubTopicPartition);
            return null;
        }).when(this.aggKafkaConsumerService)).resumeConsumerFor((PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
    }

    void setDefaultStoreVersionStateSupplier() {
        setStoreVersionStateSupplier(new StoreVersionState());
    }

    void setupMockAbstractStorageEngine(AbstractStoragePartition abstractStoragePartition) {
        this.mockAbstractStorageEngine = (AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class);
        ((AbstractStorageEngine) Mockito.doReturn(abstractStoragePartition).when(this.mockAbstractStorageEngine)).createStoragePartition((StoragePartitionConfig) Mockito.any());
        ((AbstractStorageEngine) Mockito.doReturn(true).when(this.mockAbstractStorageEngine)).checkDatabaseIntegrity(Mockito.anyInt(), (Map) Mockito.any(), (StoragePartitionConfig) Mockito.any());
    }

    void setStoreVersionStateSupplier(StoreVersionState storeVersionState) {
        this.storeVersionStateSupplier = () -> {
            return storeVersionState;
        };
        AbstractStoragePartition abstractStoragePartition = (AbstractStoragePartition) Mockito.mock(AbstractStoragePartition.class);
        ((AbstractStoragePartition) Mockito.doReturn(AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer().serialize((String) null, storeVersionState)).when(abstractStoragePartition)).get((byte[]) Mockito.any(byte[].class));
        setupMockAbstractStorageEngine(abstractStoragePartition);
        ((AbstractStorageEngine) Mockito.doReturn(storeVersionState).when(this.mockAbstractStorageEngine)).getStoreVersionState();
        ((StorageMetadataService) Mockito.doReturn(storeVersionState).when(this.mockStorageMetadataService)).getStoreVersionState(this.topic);
    }

    void setStoreVersionStateSupplier(boolean z) {
        StoreVersionState storeVersionState = new StoreVersionState();
        storeVersionState.sorted = z;
        setStoreVersionStateSupplier(storeVersionState);
    }

    private PubSubTopicPartitionOffset getTopicPartitionOffsetPair(PubSubProduceResult pubSubProduceResult) {
        return new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(pubSubProduceResult.getTopic()), pubSubProduceResult.getPartition()), Long.valueOf(pubSubProduceResult.getOffset()));
    }

    private PubSubTopicPartitionOffset getTopicPartitionOffsetPair(String str, int i, long j) {
        return new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(str), i), Long.valueOf(j));
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testVeniceMessagesProcessing(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) this.localVeniceWriter.put(putKeyFoo, putValue, 1, 2L, (PubSubProducerCallback) null).get();
        PubSubProduceResult pubSubProduceResult2 = (PubSubProduceResult) this.localVeniceWriter.delete(deleteKeyFoo, 2L, (PubSubProducerCallback) null).get();
        LinkedList linkedList = new LinkedList();
        linkedList.add(new RandomPollStrategy());
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(getTopicPartitionOffsetPair(pubSubProduceResult));
        linkedList.add(new ArbitraryOrderingPollStrategy(linkedList2));
        runTest((PollStrategy) new CompositePollStrategy(linkedList), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            verifyPutAndDelete(1, z, true);
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).put(this.topic, 1, TestUtils.getOffsetRecord(pubSubProduceResult2.getOffset()));
            ((AggVersionedIngestionStats) Mockito.verify(this.mockVersionedStorageIngestionStats, Mockito.timeout(TEST_TIMEOUT_MS).atLeast(3))).recordConsumedRecordEndToEndProcessingLatency((String) Mockito.any(), Mockito.eq(1), Mockito.anyDouble(), Mockito.anyLong());
        }, z);
        ((AggKafkaConsumerService) Mockito.verify(this.aggKafkaConsumerService)).unsubscribeAll(this.pubSubTopic);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testAmplificationFactor(boolean z) throws Exception {
        this.inMemoryLocalKafkaBroker.createTopic(Version.composeRealTimeTopic(storeNameWithoutVersionInfo), 5);
        this.mockStorageMetadataService = new InMemoryStorageMetadataService();
        AbstractStoragePartition abstractStoragePartition = (AbstractStoragePartition) Mockito.mock(AbstractStoragePartition.class);
        ((AbstractStorageEngine) Mockito.doReturn(abstractStoragePartition).when(this.mockAbstractStorageEngine)).getPartitionOrThrow(Mockito.anyInt());
        ((AbstractStorageEngine) Mockito.doReturn(new ReentrantReadWriteLock()).when(this.mockAbstractStorageEngine)).getRWLockForPartitionOrThrow(Mockito.anyInt());
        ((AbstractStoragePartition) Mockito.doReturn(putKeyFooReplicationMetadataWithValueSchemaIdBytesDefault).when(abstractStoragePartition)).getReplicationMetadata(putKeyFoo);
        ((AbstractStoragePartition) Mockito.doReturn(deleteKeyFooReplicationMetadataWithValueSchemaIdBytes).when(abstractStoragePartition)).getReplicationMetadata(deleteKeyFoo);
        ((ReadOnlySchemaRepository) Mockito.doReturn(new SchemaEntry(1, "\"string\"")).when(this.mockSchemaRepo)).getSupersetOrLatestValueSchema(storeNameWithoutVersionInfo);
        VeniceWriter veniceWriter = getVeniceWriter(this.topic, new MockInMemoryProducerAdapter(this.inMemoryLocalKafkaBroker), 2);
        VeniceWriter veniceWriter2 = getVeniceWriter(Version.composeRealTimeTopic(storeNameWithoutVersionInfo), new MockInMemoryProducerAdapter(this.inMemoryLocalKafkaBroker), 1);
        runTest(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            veniceWriter.broadcastStartOfPush(new HashMap());
            veniceWriter.broadcastEndOfPush(new HashMap());
            ((VeniceWriterFactory) Mockito.doReturn(veniceWriter).when(this.mockWriterFactory)).createVeniceWriter((VeniceWriterOptions) Mockito.any(VeniceWriterOptions.class));
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).completed(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong());
            veniceWriter.broadcastTopicSwitch(Collections.singletonList(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer()), Version.composeRealTimeTopic(storeNameWithoutVersionInfo), Long.valueOf(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10L)), new HashMap());
            this.storeIngestionTaskUnderTest.promoteToLeader(this.fooTopicPartition, new LeaderFollowerPartitionStateModel.LeaderSessionIdChecker(PUT_KEY_FOO_OFFSET, new AtomicLong(PUT_KEY_FOO_OFFSET)));
            try {
                veniceWriter2.put(putKeyFoo, putValue, 1, 2L, (PubSubProducerCallback) null).get();
                veniceWriter2.delete(deleteKeyFoo, 2L, (PubSubProducerCallback) null).get();
                verifyPutAndDelete(2, z, false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, Optional.of(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), false, Optional.empty(), z, 2, Collections.singletonMap("server.promotion.to.leader.replica.delay.seconds", 3L));
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testMissingMessagesForTopicWithLogCompactionEnabled(boolean z) throws Exception {
        Mockito.when(Boolean.valueOf(this.mockTopicManager.isTopicCompactionEnabled(this.pubSubTopic))).thenReturn(true);
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) this.localVeniceWriter.put(putKeyFoo, putValueToCorrupt, 1).get();
        this.localVeniceWriter.put(putKeyFoo, putValue, 1).get();
        PubSubProduceResult pubSubProduceResult2 = (PubSubProduceResult) this.localVeniceWriter.put(putKeyFoo2, putValueToCorrupt, 1).get();
        PubSubProduceResult pubSubProduceResult3 = (PubSubProduceResult) this.localVeniceWriter.put(putKeyFoo2, putValue, 1).get();
        LinkedList linkedList = new LinkedList();
        linkedList.add(getTopicPartitionOffsetPair(this.topic, 1, -1L));
        linkedList.add(getTopicPartitionOffsetPair(this.topic, 1, EMPTY_POLL_SLEEP_MS));
        linkedList.add(getTopicPartitionOffsetPair(pubSubProduceResult));
        linkedList.add(getTopicPartitionOffsetPair(pubSubProduceResult2));
        runTest((PollStrategy) new ArbitraryOrderingPollStrategy(linkedList), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo2, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).put(this.topic, 1, TestUtils.getOffsetRecord(pubSubProduceResult3.getOffset()));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testVeniceMessagesProcessingWithExistingSchemaId(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        ((ReadOnlySchemaRepository) Mockito.doReturn(true).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 1);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            ((ReadOnlySchemaRepository) Mockito.verify(this.mockSchemaRepo, Mockito.timeout(TEST_TIMEOUT_MS))).hasValueSchema(storeNameWithoutVersionInfo, 1);
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).put(this.topic, 1, TestUtils.getOffsetRecord(offset));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testVeniceMessagesProcessingWithTemporarilyNotAvailableSchemaId(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 2);
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        Mockito.when(Boolean.valueOf(this.mockSchemaRepo.hasValueSchema(storeNameWithoutVersionInfo, 2))).thenReturn(false, new Boolean[]{false, true});
        ((ReadOnlySchemaRepository) Mockito.doReturn(true).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 1);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            ((ReadOnlySchemaRepository) Mockito.verify(this.mockSchemaRepo, Mockito.timeout(TEST_TIMEOUT_MS).atLeast(3))).hasValueSchema(storeNameWithoutVersionInfo, 2);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(2, putValue).serialize()));
            ((ReadOnlySchemaRepository) Mockito.verify(this.mockSchemaRepo, Mockito.timeout(TEST_TIMEOUT_MS))).hasValueSchema(storeNameWithoutVersionInfo, 1);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).put(this.topic, 1, TestUtils.getOffsetRecord(offset));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testVeniceMessagesProcessingWithNonExistingSchemaId(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 2);
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        ((ReadOnlySchemaRepository) Mockito.doReturn(false).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 2);
        ((ReadOnlySchemaRepository) Mockito.doReturn(true).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 1);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            ((ReadOnlySchemaRepository) Mockito.verify(this.mockSchemaRepo, Mockito.after(TEST_TIMEOUT_MS).never())).hasValueSchema(storeNameWithoutVersionInfo, 1);
            ((ReadOnlySchemaRepository) Mockito.verify(this.mockSchemaRepo, Mockito.atLeastOnce())).hasValueSchema(storeNameWithoutVersionInfo, 2);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).put(Mockito.eq(1), (byte[]) Mockito.any(), (byte[]) Mockito.any(byte[].class));
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.atMost(2))).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.any(OffsetRecord.class));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testReportStartWhenRestarting(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1, 2}), () -> {
            ((StorageMetadataService) Mockito.doReturn(TestUtils.getOffsetRecord(2L)).when(this.mockStorageMetadataService)).getLastOffset(Mockito.anyString(), Mockito.anyInt());
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).started(this.topic, 2);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testNotifier(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        long offset2 = getOffset(this.localVeniceWriter.put(putKeyBar, putValue, 1));
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1, 2}), () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).completed(this.topic, 1, offset + PUT_KEY_FOO_OFFSET, "STANDBY");
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).completed(this.topic, 2, offset2 + PUT_KEY_FOO_OFFSET, "STANDBY");
            ((VeniceNotifier) Mockito.verify(this.mockPartitionStatusNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).completed(this.topic, 1, offset + PUT_KEY_FOO_OFFSET, "STANDBY");
            ((VeniceNotifier) Mockito.verify(this.mockPartitionStatusNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).completed(this.topic, 2, offset2 + PUT_KEY_FOO_OFFSET, "STANDBY");
            ((VeniceNotifier) Mockito.verify(this.mockLeaderFollowerStateModelNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).catchUpVersionTopicOffsetLag(this.topic, 1);
            ((VeniceNotifier) Mockito.verify(this.mockLeaderFollowerStateModelNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).catchUpVersionTopicOffsetLag(this.topic, 2);
            ((VeniceNotifier) Mockito.verify(this.mockLeaderFollowerStateModelNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).completed(this.topic, 1, offset + PUT_KEY_FOO_OFFSET, "STANDBY");
            ((VeniceNotifier) Mockito.verify(this.mockLeaderFollowerStateModelNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).completed(this.topic, 2, offset2 + PUT_KEY_FOO_OFFSET, "STANDBY");
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService)).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.eq(TestUtils.getOffsetRecord(offset + PUT_KEY_FOO_OFFSET, true)));
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService)).put((String) Mockito.eq(this.topic), Mockito.eq(2), (OffsetRecord) Mockito.eq(TestUtils.getOffsetRecord(offset2 + PUT_KEY_FOO_OFFSET, true)));
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 1);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 2);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).endOfPushReceived(this.topic, 2, offset2);
            ((VeniceNotifier) Mockito.verify(this.mockPartitionStatusNotifier, Mockito.atLeastOnce())).started(this.topic, 1);
            ((VeniceNotifier) Mockito.verify(this.mockPartitionStatusNotifier, Mockito.atLeastOnce())).started(this.topic, 2);
            ((VeniceNotifier) Mockito.verify(this.mockPartitionStatusNotifier, Mockito.atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
            ((VeniceNotifier) Mockito.verify(this.mockPartitionStatusNotifier, Mockito.atLeastOnce())).endOfPushReceived(this.topic, 2, offset2);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testReadyToServePartition(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            Store store = (Store) Mockito.mock(Store.class);
            ((Store) Mockito.doReturn(true).when(store)).isHybrid();
            ((Store) Mockito.doReturn(Optional.of(new VersionImpl("storeName", 1))).when(store)).getVersion(1);
            ((Store) Mockito.doReturn(storeNameWithoutVersionInfo).when(store)).getName();
            ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStoreOrThrow(storeNameWithoutVersionInfo);
        }, () -> {
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).preparePartitionForReading(2);
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.atLeastOnce())).preparePartitionForReading(1);
            });
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testReadyToServePartitionValidateIngestionSuccess(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        Store store = (Store) Mockito.mock(Store.class);
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStore(storeNameWithoutVersionInfo);
        ((Store) Mockito.doReturn(false).when(store)).isHybrid();
        ((Store) Mockito.doReturn(storeNameWithoutVersionInfo).when(store)).getName();
        this.mockAbstractStorageEngine.addStoragePartition(1);
        AbstractStoragePartition abstractStoragePartition = (AbstractStoragePartition) Mockito.mock(AbstractStoragePartition.class);
        ((AbstractStorageEngine) Mockito.doReturn(abstractStoragePartition).when(this.mockAbstractStorageEngine)).getPartitionOrThrow(1);
        ((AbstractStoragePartition) Mockito.doReturn(true).when(abstractStoragePartition)).validateBatchIngestion();
        new StoragePartitionConfig(this.topic, 1);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).preparePartitionForReading(1);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testReadyToServePartitionWriteOnly(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        Store store = (Store) Mockito.mock(Store.class);
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStore(storeNameWithoutVersionInfo);
        ((Store) Mockito.doReturn(true).when(store)).isHybrid();
        this.mockAbstractStorageEngine.addStoragePartition(1);
        ((Store) Mockito.doReturn(storeNameWithoutVersionInfo).when(store)).getName();
        new StoragePartitionConfig(this.topic, 1).setWriteOnlyConfig(true);
        new StoragePartitionConfig(this.topic, 2);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).preparePartitionForReading(1);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).preparePartitionForReading(2);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testResetPartition(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1).get();
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            this.storeIngestionTaskUnderTest.resetPartitionConsumptionOffset(this.fooTopicPartition);
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).clearOffset(this.topic, 1);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS).times(2))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testResetPartitionAfterUnsubscription(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1).get();
        ((PubSubConsumerAdapter) Mockito.doThrow(new Throwable[]{new UnsubscribedTopicPartitionException(this.fooTopicPartition)}).when(this.mockLocalKafkaConsumer)).resetOffset(this.fooTopicPartition);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            this.storeIngestionTaskUnderTest.unSubscribePartition(this.fooTopicPartition);
            this.storeIngestionTaskUnderTest.resetPartitionConsumptionOffset(this.fooTopicPartition);
            ((PubSubConsumerAdapter) Mockito.verify(this.mockLocalKafkaConsumer, Mockito.timeout(TEST_TIMEOUT_MS))).unSubscribe(this.fooTopicPartition);
            ((PubSubConsumerAdapter) Mockito.verify(this.mockLocalKafkaConsumer, Mockito.timeout(TEST_TIMEOUT_MS).times(0))).resetOffset(this.fooTopicPartition);
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).clearOffset(this.topic, 1);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testDetectionOfMissingRecord(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        long offset2 = getOffset(this.localVeniceWriter.put(putKeyBar, putValue, 1));
        this.localVeniceWriter.put(putKeyBar, putValue, 1);
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest((PollStrategy) new FilteringPollStrategy(new RandomPollStrategy(), Utils.setOf(new PubSubTopicPartitionOffset[]{new PubSubTopicPartitionOffset(this.barTopicPartition, Long.valueOf(offset2))})), Utils.setOf(new Integer[]{1, 2}), () -> {
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).error((String) Mockito.eq(this.topic), Mockito.eq(2), (String) Mockito.argThat(new NonEmptyStringMatcher()), (Exception) Mockito.argThat(new ExceptionClassMatcher(MissingDataException.class)));
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 1);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 2);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testSkippingOfDuplicateRecord(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        long offset2 = getOffset(this.localVeniceWriter.put(putKeyBar, putValue, 1));
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest((PollStrategy) new DuplicatingPollStrategy(new RandomPollStrategy(), Utils.mutableSetOf(new PubSubTopicPartitionOffset[]{new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(this.pubSubTopic, 2), Long.valueOf(offset2))})), Utils.setOf(new Integer[]{1, 2}), () -> {
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).endOfPushReceived(this.topic, 2, offset2);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.after(TEST_TIMEOUT_MS).never())).endOfPushReceived(this.topic, 2, offset2 + PUT_KEY_FOO_OFFSET);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 1);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 2);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testThrottling(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        this.localVeniceWriter.delete(deleteKeyFoo, (PubSubProducerCallback) null);
        runTest((PollStrategy) new RandomPollStrategy(1), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            ((EventThrottler) Mockito.verify(this.mockRecordsThrottler, Mockito.timeout(TEST_TIMEOUT_MS).times(4))).maybeThrottle(1.0d);
            ((EventThrottler) Mockito.verify(this.mockBandwidthThrottler, Mockito.timeout(TEST_TIMEOUT_MS).times(4))).maybeThrottle(Mockito.anyDouble());
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testBadMessageTypesFailFast(boolean z) throws Exception {
        int i = 99;
        try {
            MessageType.valueOf(99);
            Assert.fail("The message type 99 is valid. This test needs to be updated in order to send an invalid message type...");
        } catch (VeniceMessageException e) {
        }
        try {
            ControlMessageType.valueOf(99);
            Assert.fail("The control message type 99 is valid. This test needs to be updated in order to send an invalid control message type...");
        } catch (VeniceMessageException e2) {
        }
        this.localVeniceWriter = getVeniceWriter(new TransformingProducerAdapter(new MockInMemoryProducerAdapter(this.inMemoryLocalKafkaBroker), (str, kafkaKey, kafkaMessageEnvelope, i2) -> {
            switch (i2) {
                case ObjectCacheBackendTest.STORE_VERSION /* 1 */:
                    kafkaMessageEnvelope.messageType = i;
                    break;
                case 2:
                    if (MessageType.valueOf(kafkaMessageEnvelope) == MessageType.CONTROL_MESSAGE) {
                        ControlMessage controlMessage = (ControlMessage) kafkaMessageEnvelope.payloadUnion;
                        if (ControlMessageType.valueOf(controlMessage) == ControlMessageType.START_OF_SEGMENT) {
                            controlMessage.controlMessageType = i;
                            kafkaMessageEnvelope.payloadUnion = controlMessage;
                            break;
                        }
                    }
                    break;
            }
            return new TransformingProducerAdapter.SendMessageParameters(this.topic, kafkaKey, kafkaMessageEnvelope, i2);
        }));
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        this.localVeniceWriter.put(putKeyBar, putValue, 1);
        runTest(Utils.setOf(new Integer[]{1, 2}), () -> {
            ((KafkaConsumerServiceStats) Mockito.verify(this.kafkaConsumerServiceStats, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).recordPollError();
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testCorruptMessagesDoNotFailFastAfterEOP(boolean z) throws Exception {
        VeniceWriter veniceWriter = getVeniceWriter(new MockInMemoryProducerAdapter(this.inMemoryLocalKafkaBroker));
        VeniceWriter corruptedVeniceWriter = getCorruptedVeniceWriter(putValueToCorrupt, this.inMemoryLocalKafkaBroker);
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(veniceWriter.put(putKeyBar, putValue, 1));
        veniceWriter.close();
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        this.isCurrentVersion = () -> {
            return true;
        };
        getOffset(corruptedVeniceWriter.put(putKeyBar, putValueToCorrupt, 1));
        long offset2 = getOffset(corruptedVeniceWriter.put(putKeyBar, putValue, 1));
        corruptedVeniceWriter.close();
        LOGGER.info("lastOffsetBeforeEOP: {}, lastOffset: {}", Long.valueOf(offset), Long.valueOf(offset2));
        try {
            runTest(Utils.setOf(new Integer[]{2}), () -> {
                TestUtils.waitForNonDeterministicCompletion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> {
                    for (Object[] objArr : this.mockNotifierProgress) {
                        if (objArr[0].equals(this.topic) && objArr[1].equals(2) && ((Long) objArr[2]).longValue() >= offset) {
                            return true;
                        }
                    }
                    return false;
                });
                TestUtils.waitForNonDeterministicCompletion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> {
                    for (Object[] objArr : this.mockNotifierCompleted) {
                        if (objArr[0].equals(this.topic) && objArr[1].equals(2) && ((Long) objArr[2]).longValue() > offset) {
                            return true;
                        }
                    }
                    return false;
                });
                for (Object[] objArr : this.mockNotifierError) {
                    Assert.assertFalse(objArr[0].equals(this.topic) && objArr[1].equals(2) && ((String) objArr[2]).length() > 0 && (objArr[3] instanceof CorruptDataException));
                }
            }, z);
        } catch (VerifyError e) {
            StringBuilder sb = new StringBuilder();
            URL[] uRLs = ((URLClassLoader) ClassLoader.getSystemClassLoader()).getURLs();
            sb.append("VerifyError, possibly from junit or mockito version conflict. \nPrinting junit on classpath:\n");
            Arrays.asList(uRLs).stream().filter(url -> {
                return url.getFile().contains("junit");
            }).forEach(url2 -> {
                sb.append(url2 + "\n");
            });
            sb.append("Printing mockito on classpath:\n");
            Arrays.asList(uRLs).stream().filter(url3 -> {
                return url3.getFile().contains("mockito");
            }).forEach(url4 -> {
                sb.append(url4 + "\n");
            });
            throw new VeniceException(sb.toString(), e);
        }
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testDIVErrorMessagesNotFailFastAfterEOP(boolean z) throws Exception {
        VeniceWriter corruptedVeniceWriter = getCorruptedVeniceWriter(putValueToCorrupt, this.inMemoryLocalKafkaBroker);
        corruptedVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(corruptedVeniceWriter.put(putKeyBar, putValue, 1));
        corruptedVeniceWriter.broadcastEndOfPush(new HashMap());
        this.isCurrentVersion = () -> {
            return true;
        };
        getOffset(corruptedVeniceWriter.put(putKeyFoo, putValueToCorrupt, 1));
        corruptedVeniceWriter.endSegment(1, true);
        long offset2 = getOffset(corruptedVeniceWriter.put(putKeyFoo, putValue, 1));
        long offset3 = getOffset(corruptedVeniceWriter.put(putKeyFoo2, putValue, 1));
        corruptedVeniceWriter.close();
        FilteringPollStrategy filteringPollStrategy = new FilteringPollStrategy(new RandomPollStrategy(), Utils.setOf(new PubSubTopicPartitionOffset[]{new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(this.pubSubTopic, 1), Long.valueOf(offset2))}));
        LOGGER.info("lastOffsetBeforeEOP: {}, lastOffset: {}", Long.valueOf(offset), Long.valueOf(offset3));
        runTest((PollStrategy) filteringPollStrategy, Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            for (Object[] objArr : this.mockNotifierError) {
                Assert.assertFalse(objArr[0].equals(this.topic) && objArr[1].equals(1) && ((String) objArr[2]).length() > 0 && (objArr[3] instanceof FatalDataValidationException));
            }
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, invocationCount = 100, skipFailedInvocations = true)
    public void testCorruptMessagesFailFast(boolean z) throws Exception {
        VeniceWriter corruptedVeniceWriter = getCorruptedVeniceWriter(putValueToCorrupt, this.inMemoryLocalKafkaBroker);
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(corruptedVeniceWriter.put(putKeyFoo, putValue, 1));
        getOffset(corruptedVeniceWriter.put(putKeyBar, putValueToCorrupt, 1));
        corruptedVeniceWriter.close();
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1, 2}), () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).completed((String) Mockito.eq(this.topic), Mockito.eq(1), LongEqualOrGreaterThanMatcher.get(offset), (String) Mockito.eq("STANDBY"));
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).error((String) Mockito.eq(this.topic), Mockito.eq(2), (String) Mockito.argThat(new NonEmptyStringMatcher()), (Exception) Mockito.argThat(new ExceptionClassMatcher(CorruptDataException.class)));
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).completed((String) Mockito.eq(this.topic), Mockito.eq(2), Mockito.anyLong());
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testSubscribeCompletedPartition(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.doReturn(TestUtils.getOffsetRecord(100L, true)).when(this.mockStorageMetadataService)).getLastOffset(this.topic, 1);
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).completed(this.topic, 1, 100L, "STANDBY");
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testSubscribeCompletedPartitionUnsubscribe(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put("server.unsub.after.batch.push", true);
        runTest(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), () -> {
            Store store = (Store) Mockito.mock(Store.class);
            ((Store) Mockito.doReturn(1).when(store)).getCurrentVersion();
            ((Store) Mockito.doReturn(Optional.of(new VersionImpl("storeName", 1, Version.numberBasedDummyPushId(1)))).when(store)).getVersion(1);
            ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStoreOrThrow(storeNameWithoutVersionInfo);
            ((StorageMetadataService) Mockito.doReturn(TestUtils.getOffsetRecord(100L, true)).when(this.mockStorageMetadataService)).getLastOffset(this.topic, 1);
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(10000L))).completed(this.topic, 1, 100L, "STANDBY");
            ((AggKafkaConsumerService) Mockito.verify(this.aggKafkaConsumerService, Mockito.timeout(10000L))).batchUnsubscribeConsumerFor(this.pubSubTopic, Collections.singleton(this.fooTopicPartition));
            ((AggKafkaConsumerService) Mockito.verify(this.aggKafkaConsumerService, Mockito.never())).unsubscribeConsumerFor(this.pubSubTopic, this.barTopicPartition);
            ((PubSubConsumerAdapter) Mockito.verify(this.mockLocalKafkaConsumer, Mockito.timeout(10000L))).batchUnsubscribe(Collections.singleton(this.fooTopicPartition));
            ((PubSubConsumerAdapter) Mockito.verify(this.mockLocalKafkaConsumer, Mockito.never())).unSubscribe(this.barTopicPartition);
        }, this.hybridStoreConfig, false, Optional.empty(), z, 1, hashMap);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testCompleteCalledWhenUnsubscribeAfterBatchPushDisabled(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            Store store = (Store) Mockito.mock(Store.class);
            this.storeIngestionTaskUnderTest.unSubscribePartition(this.fooTopicPartition);
            ((Store) Mockito.doReturn(1).when(store)).getCurrentVersion();
            ((Store) Mockito.doReturn(Optional.of(new VersionImpl("storeName", 1, Version.numberBasedDummyPushId(1)))).when(store)).getVersion(1);
            ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStoreOrThrow(storeNameWithoutVersionInfo);
            ((StorageMetadataService) Mockito.doReturn(TestUtils.getOffsetRecord(10L, true)).when(this.mockStorageMetadataService)).getLastOffset(this.topic, 1);
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).completed(this.topic, 1, 10L, "STANDBY");
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testUnsubscribeConsumption(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).started(this.topic, 1);
            this.storeIngestionTaskUnderTest.unSubscribePartition(this.fooTopicPartition);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).stopped(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong());
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testKillConsumption(boolean z) throws Exception {
        Thread thread = new Thread(() -> {
            do {
                this.localVeniceWriter.put(putKeyFoo, putValue, 1);
                this.localVeniceWriter.put(putKeyBar, putValue, 1);
            } while (!Utils.sleep(READ_CYCLE_DELAY_MS));
        });
        try {
            runTest(Utils.setOf(new Integer[]{1, 2}), () -> {
                this.localVeniceWriter.broadcastStartOfPush(new HashMap());
                thread.start();
            }, () -> {
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).started(this.topic, 1);
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).started(this.topic, 2);
                this.storeIngestionTaskUnderTest.kill();
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).error((String) Mockito.eq(this.topic), Mockito.eq(1), (String) Mockito.argThat(new NonEmptyStringMatcher()), (Exception) Mockito.argThat(new ExceptionClassMatcher(VeniceIngestionTaskKilledException.class)));
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).error((String) Mockito.eq(this.topic), Mockito.eq(2), (String) Mockito.argThat(new NonEmptyStringMatcher()), (Exception) Mockito.argThat(new ExceptionClassMatcher(VeniceIngestionTaskKilledException.class)));
                TestUtils.waitForNonDeterministicCompletion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> {
                    return !this.storeIngestionTaskUnderTest.isRunning();
                });
            }, z);
        } finally {
            TestUtils.shutdownThread(thread);
        }
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testKillActionPriority(boolean z) throws Exception {
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            this.localVeniceWriter.broadcastStartOfPush(new HashMap());
            this.localVeniceWriter.put(putKeyFoo, putValue, 1);
            this.storeIngestionTaskUnderTest.resetPartitionConsumptionOffset(this.fooTopicPartition);
            this.storeIngestionTaskUnderTest.kill();
        }, () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.after(TEST_TIMEOUT_MS).never())).getLastOffset(this.topic, 1);
            TestUtils.waitForNonDeterministicCompletion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> {
                return !this.storeIngestionTaskUnderTest.consumerHasAnySubscription();
            });
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).clearOffset(this.topic, 1);
            TestUtils.waitForNonDeterministicCompletion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> {
                return !this.storeIngestionTaskUnderTest.isRunning();
            });
        }, z);
    }

    private byte[] getNumberedKey(int i) {
        return ByteBuffer.allocate(putKeyFoo.length + 4).put(putKeyFoo).putInt(i).array();
    }

    private byte[] getNumberedValue(int i) {
        return ByteBuffer.allocate(putValue.length + 4).put(putValue).putInt(i).array();
    }

    @Test(dataProvider = "Two-True-and-False", invocationCount = 3, skipFailedInvocations = true, dataProviderClass = DataProviderUtils.class)
    public void testDataValidationCheckPointing(boolean z, boolean z2) throws Exception {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        setStoreVersionStateSupplier(z);
        this.localVeniceWriter.broadcastStartOfPush(z, new HashMap());
        for (int i = 0; i < 1000; i++) {
            byte[] numberedKey = getNumberedKey(i);
            byte[] numberedValue = getNumberedValue(i);
            PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) this.localVeniceWriter.put(numberedKey, numberedValue, 1).get();
            hashMap.put(Integer.valueOf(pubSubProduceResult.getPartition()), Long.valueOf(pubSubProduceResult.getOffset()));
            hashMap2.put(new Pair(Integer.valueOf(pubSubProduceResult.getPartition()), new ByteArray(numberedKey)), new ByteArray(numberedValue));
        }
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        Assert.assertEquals(hashMap2.size(), 1000, "We did not produce as many unique records as we expected!");
        Assert.assertFalse(hashMap.isEmpty(), "There should be at least one partition getting anything published into it!");
        Set<Integer> of = Utils.setOf(new Integer[]{1});
        AtomicInteger atomicInteger = new AtomicInteger(0);
        runTest((PollStrategy) new BlockingObserverPollStrategy(new RandomPollStrategy(false), pubSubTopicPartitionOffset -> {
            if (pubSubTopicPartitionOffset == null || pubSubTopicPartitionOffset.getOffset() == null) {
                LOGGER.info("Received null OffsetRecord!");
            } else {
                if (atomicInteger.incrementAndGet() % 100 != 0) {
                    LOGGER.info("TopicPartition: {}, Offset: {}", pubSubTopicPartitionOffset.getPubSubTopicPartition(), pubSubTopicPartitionOffset.getOffset());
                    return;
                }
                LOGGER.info("Restarting consumer after consuming {} messages so far.", Integer.valueOf(atomicInteger.get()));
                of.stream().forEach(num -> {
                    this.storeIngestionTaskUnderTest.unSubscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopic, num.intValue()));
                });
                of.stream().forEach(num2 -> {
                    this.storeIngestionTaskUnderTest.subscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopic, num2.intValue()), Optional.empty());
                });
            }
        }), of, () -> {
        }, () -> {
            hashMap.entrySet().stream().filter(entry -> {
                return of.contains(entry.getKey());
            }).forEach(entry2 -> {
                int intValue = ((Integer) entry2.getKey()).intValue();
                long longValue = ((Long) entry2.getValue()).longValue();
                LOGGER.info("Verifying completed was called for partition {} and offset {} or greater.", Integer.valueOf(intValue), Long.valueOf(longValue));
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(10000L).atLeastOnce())).completed((String) Mockito.eq(this.topic), Mockito.eq(intValue), LongEqualOrGreaterThanMatcher.get(longValue), (String) Mockito.eq("STANDBY"));
            });
            of.stream().forEach(num -> {
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).error((String) Mockito.eq(this.topic), ((Integer) Mockito.eq(num)).intValue(), Mockito.anyString(), (Exception) Mockito.any());
            });
            of.stream().forEach(num2 -> {
                PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(this.topic), num2.intValue());
                ((PubSubConsumerAdapter) Mockito.verify(this.mockLocalKafkaConsumer, Mockito.timeout(10000L).atLeast(11))).subscribe((PubSubTopicPartition) Mockito.eq(pubSubTopicPartitionImpl), Mockito.anyLong());
                ((PubSubConsumerAdapter) Mockito.verify(this.mockLocalKafkaConsumer, Mockito.timeout(10000L).atLeast(10))).unSubscribe((PubSubTopicPartition) Mockito.eq(pubSubTopicPartitionImpl));
                if (z) {
                    StoragePartitionConfig storagePartitionConfig = new StoragePartitionConfig(this.topic, num2.intValue());
                    storagePartitionConfig.setDeferredWrite(true);
                    ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.atLeast(1))).beginBatchWrite((StoragePartitionConfig) Mockito.eq(storagePartitionConfig), (Map) Mockito.any(), (Optional) Mockito.eq(Optional.empty()));
                    ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.times(1))).endBatchWrite(new StoragePartitionConfig(this.topic, num2.intValue()));
                }
            });
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.atLeast(hashMap2.size()))).put(Mockito.anyInt(), (byte[]) Mockito.any(), (ByteBuffer) Mockito.any(ByteBuffer.class));
            hashMap2.entrySet().stream().forEach(entry3 -> {
                ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.atLeastOnce())).put(((Integer) ((Pair) entry3.getKey()).getFirst()).intValue(), ((ByteArray) ((Pair) entry3.getKey()).getSecond()).get(), ByteBuffer.wrap(ValueRecord.create(1, ((ByteArray) entry3.getValue()).get()).serialize()));
            });
            of.stream().forEach(num3 -> {
                Assert.assertNotNull(this.storeIngestionTaskUnderTest.getPartitionConsumptionState(num3.intValue()));
                Assert.assertTrue(this.storeIngestionTaskUnderTest.getPartitionConsumptionState(num3.intValue()).getLatestProcessedUpstreamRTOffsetMap().isEmpty());
            });
        }, z2);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testKillAfterPartitionIsCompleted(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.after(TEST_TIMEOUT_MS).never())).error((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.anyString(), (Exception) Mockito.any());
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).started(this.topic, 1);
            this.storeIngestionTaskUnderTest.kill();
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testNeverReportProgressBeforeStart(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        runTest((PollStrategy) new RandomPollStrategy(1), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.after(TEST_TIMEOUT_MS).never())).progress(this.topic, 1, EMPTY_POLL_SLEEP_MS);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeastOnce())).started(this.topic, 1);
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.after(TEST_TIMEOUT_MS).never())).progress((String) Mockito.any(), Mockito.anyInt(), Mockito.anyInt());
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testOffsetPersistent(boolean z) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(Long.valueOf(READ_CYCLE_DELAY_MS));
        }
        this.databaseSyncBytesIntervalForTransactionalMode = 1000L;
        try {
            this.localVeniceWriter.broadcastStartOfPush(new HashMap());
            this.localVeniceWriter.broadcastEndOfPush(new HashMap());
            runTest(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), () -> {
            }, () -> {
                ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS).times(2))).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.any());
            }, Optional.of(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), false, Optional.empty(), z, 1, Collections.emptyMap());
            this.databaseSyncBytesIntervalForTransactionalMode = PUT_KEY_FOO_OFFSET;
        } catch (Throwable th) {
            this.databaseSyncBytesIntervalForTransactionalMode = PUT_KEY_FOO_OFFSET;
            throw th;
        }
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testVeniceMessagesProcessingWithSortedInput(boolean z) throws Exception {
        setStoreVersionStateSupplier(true);
        this.localVeniceWriter.broadcastStartOfPush(true, new HashMap());
        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) this.localVeniceWriter.put(putKeyFoo, putValue, 1).get();
        PubSubProduceResult pubSubProduceResult2 = (PubSubProduceResult) this.localVeniceWriter.delete(deleteKeyFoo, (PubSubProducerCallback) null).get();
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            verifyPutAndDelete(1, z, true);
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).put(this.topic, 1, TestUtils.getOffsetRecord(pubSubProduceResult2.getOffset() + PUT_KEY_FOO_OFFSET, true));
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.times(1))).put(this.topic, 1, TestUtils.getOffsetRecord(pubSubProduceResult.getOffset() - PUT_KEY_FOO_OFFSET));
            StoragePartitionConfig storagePartitionConfig = new StoragePartitionConfig(this.topic, 1);
            storagePartitionConfig.setDeferredWrite(true);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.times(1))).beginBatchWrite((StoragePartitionConfig) Mockito.eq(storagePartitionConfig), (Map) Mockito.any(), (Optional) Mockito.eq(Optional.empty()));
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.times(1))).endBatchWrite(new StoragePartitionConfig(this.topic, 1));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testVeniceMessagesProcessingWithSortedInputVerifyChecksum(boolean z) throws Exception {
        this.databaseChecksumVerificationEnabled = true;
        ((RocksDBServerConfig) Mockito.doReturn(false).when(this.rocksDBServerConfig)).isRocksDBPlainTableFormatEnabled();
        setStoreVersionStateSupplier(true);
        this.localVeniceWriter.broadcastStartOfPush(true, new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1).get();
        Optional checkSum = CheckSum.getInstance(CheckSumType.MD5);
        ((CheckSum) checkSum.get()).update(putKeyFoo);
        ((CheckSum) checkSum.get()).update(1);
        ((CheckSum) checkSum.get()).update(putValue);
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(TEST_TIMEOUT_MS))).put(1, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            StoragePartitionConfig storagePartitionConfig = new StoragePartitionConfig(this.topic, 1);
            storagePartitionConfig.setDeferredWrite(true);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Optional.class);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.times(1))).beginBatchWrite((StoragePartitionConfig) Mockito.eq(storagePartitionConfig), (Map) Mockito.any(), (Optional) forClass.capture());
            Optional optional = (Optional) forClass.getValue();
            Assert.assertTrue(optional.isPresent());
            Assert.assertTrue(Arrays.equals((byte[]) ((Supplier) optional.get()).get(), ((CheckSum) checkSum.get()).getCheckSum()));
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testDelayedTransitionToOnlineInHybridMode(boolean z) throws Exception {
        this.mockStorageMetadataService = new InMemoryStorageMetadataService();
        this.hybridStoreConfig = Optional.of(new HybridStoreConfigImpl(10L, 20L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP));
        runTest(ALL_PARTITIONS, () -> {
            this.localVeniceWriter.broadcastStartOfPush(Collections.emptyMap());
            for (int i = 0; i < 100; i++) {
                try {
                    this.localVeniceWriter.put(getNumberedKey(i), getNumberedValue(i), 1).get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new VeniceException(e);
                }
            }
            this.localVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeast(ALL_PARTITIONS.size()))).started((String) Mockito.eq(this.topic), Mockito.anyInt());
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).completed(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong());
            this.localVeniceWriter.broadcastTopicSwitch(Collections.singletonList(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer()), Version.composeRealTimeTopic(storeNameWithoutVersionInfo), Long.valueOf(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10L)), Collections.emptyMap());
            for (int i = 0; i < 100; i++) {
                try {
                    this.localVeniceWriter.put(getNumberedKey(i), getNumberedValue(i), 1).get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new VeniceException(e);
                }
            }
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS).atLeast(ALL_PARTITIONS.size()))).completed(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong(), Mockito.anyString());
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testStoreIngestionTaskRespectsDiskUsage(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        DiskUsage diskUsage = (DiskUsage) Mockito.mock(DiskUsage.class);
        ((DiskUsage) Mockito.doReturn(true).when(diskUsage)).isDiskFull(Mockito.anyLong());
        ((DiskUsage) Mockito.doReturn("mock disk full disk usage").when(diskUsage)).getDiskStatus();
        ((ReadOnlySchemaRepository) Mockito.doReturn(true).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 1);
        runTest(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                if (!this.mockNotifierEOPReceived.isEmpty()) {
                    LOGGER.info("EOP was received, and therefore this test cannot perform its assertions.");
                    return;
                }
                Assert.assertFalse(this.mockNotifierError.isEmpty(), "Disk Usage should have triggered an ingestion error");
                String str = (String) this.mockNotifierError.stream().map(objArr -> {
                    return ((Exception) objArr[3]).getMessage();
                }).collect(Collectors.joining());
                Assert.assertTrue(str.contains("Disk is full"), "Expecting disk full error, found following error messages instead: " + str);
            });
        }, Optional.empty(), false, Optional.of(diskUsage), z, 1, Collections.emptyMap());
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, invocationCount = MemoryBoundBlockingQueueTest.MeasurableObject.SIZE)
    public void testIncrementalPush(boolean z) throws Exception {
        setStoreVersionStateSupplier(true);
        this.localVeniceWriter.broadcastStartOfPush(true, new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        String valueOf = String.valueOf(System.currentTimeMillis());
        this.localVeniceWriter.broadcastStartOfIncrementalPush(valueOf, new HashMap());
        long offset2 = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        this.localVeniceWriter.broadcastEndOfIncrementalPush(valueOf, new HashMap());
        runTest(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), () -> {
        }, () -> {
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService)).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.eq(TestUtils.getOffsetRecord(offset + PUT_KEY_FOO_OFFSET, true)));
                ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService)).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.eq(TestUtils.getOffsetRecord(offset2 - PUT_KEY_FOO_OFFSET, true)));
                ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService)).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.eq(TestUtils.getOffsetRecord(offset2 + PUT_KEY_FOO_OFFSET, true)));
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 1);
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).endOfIncrementalPushReceived(this.topic, 1, offset2, valueOf);
            });
        }, Optional.of(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), true, Optional.empty(), z, 1, Collections.emptyMap());
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testSchemaCacheWarming(boolean z) throws Exception {
        setStoreVersionStateSupplier(true);
        this.localVeniceWriter.broadcastStartOfPush(true, new HashMap());
        long offset = getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        SchemaEntry schemaEntry = new SchemaEntry(1, STRING_SCHEMA);
        runTest(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), () -> {
            Store store = (Store) Mockito.mock(Store.class);
            ((Store) Mockito.doReturn(true).when(store)).isReadComputationEnabled();
            ((ReadOnlySchemaRepository) Mockito.doReturn(true).when(this.mockSchemaRepo)).hasValueSchema(storeNameWithoutVersionInfo, 1);
            ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(this.mockMetadataRepo)).getStoreOrThrow(storeNameWithoutVersionInfo);
            ((ReadOnlySchemaRepository) Mockito.doReturn(schemaEntry).when(this.mockSchemaRepo)).getValueSchema(Mockito.anyString(), Mockito.anyInt());
            ((Store) Mockito.doReturn(Optional.of(new VersionImpl("storeName", 1))).when(store)).getVersion(1);
        }, () -> {
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).started(this.topic, 1);
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.atLeastOnce())).endOfPushReceived(this.topic, 1, offset);
                ((VeniceNotifier) Mockito.verify(this.mockLogNotifier)).completed((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.longThat(l -> {
                    return l.longValue() == offset + PUT_KEY_FOO_OFFSET || l.longValue() == offset + 2;
                }), (String) Mockito.eq("STANDBY"));
            });
        }, Optional.empty(), false, Optional.empty(), z, 1, Collections.singletonMap("server.num.schema.fast.class.warmup", 1));
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testReportErrorWithEmptyPcsMap(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        ((AggVersionedIngestionStats) Mockito.doThrow(new Throwable[]{new VeniceException("fake exception")}).when(this.mockVersionedStorageIngestionStats)).resetIngestionTaskPushTimeoutGauge(Mockito.anyString(), Mockito.anyInt());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).error((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.anyString(), (Exception) Mockito.any());
        }, z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 25000)
    public void testPartitionExceptionIsolation(boolean z) throws Exception {
        this.localVeniceWriter.broadcastStartOfPush(new HashMap());
        getOffset(this.localVeniceWriter.put(putKeyFoo, putValue, 1));
        long offset = getOffset(this.localVeniceWriter.put(putKeyBar, putValue, 1));
        this.localVeniceWriter.broadcastEndOfPush(new HashMap());
        ((AbstractStorageEngine) Mockito.doThrow(new Throwable[]{new VeniceException("fake storage engine exception")}).when(this.mockAbstractStorageEngine)).put(Mockito.eq(1), (byte[]) Mockito.any(), (ByteBuffer) Mockito.any(ByteBuffer.class));
        runTest(Utils.setOf(new Integer[]{1, 2}), () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).completed((String) Mockito.eq(this.topic), Mockito.eq(2), LongEqualOrGreaterThanMatcher.get(offset), (String) Mockito.eq("STANDBY"));
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).error((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.anyString(), (Exception) Mockito.any());
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).completed((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.anyLong());
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).stopped((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.anyLong());
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.never())).error((String) Mockito.eq(this.topic), Mockito.eq(2), Mockito.anyString(), (Exception) Mockito.any());
            Assert.assertTrue(this.storeIngestionTaskUnderTest.isRunning(), "The StoreIngestionTask should still be running");
            Assert.assertNull(this.storeIngestionTaskUnderTest.getPartitionIngestionExceptionList().get(1), "Exception for the errored partition should be cleared after unsubscription");
        }, z);
        for (int i = 0; i < 10000; i++) {
            this.storeIngestionTaskUnderTest.setIngestionException(0, new VeniceException("new fake looooooooooooooooong exception"));
        }
    }

    private VeniceServerConfig buildVeniceServerConfig(Map<String, Object> map) {
        PropertyBuilder propertyBuilder = new PropertyBuilder();
        propertyBuilder.put("cluster.name", "");
        propertyBuilder.put("zookeeper.address", "");
        propertyBuilder.put("server.promotion.to.leader.replica.delay.seconds", 500L);
        propertyBuilder.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        propertyBuilder.put("server.hybrid.quota.enforcement.enabled", false);
        propertyBuilder.put("server.database.checksum.verification.enabled", Boolean.valueOf(this.databaseChecksumVerificationEnabled));
        propertyBuilder.put("server.local.consumer.config.prefix.", VeniceProperties.empty());
        propertyBuilder.put("server.remote.consumer.config.prefix.", VeniceProperties.empty());
        Objects.requireNonNull(propertyBuilder);
        map.forEach(propertyBuilder::put);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", "dev");
        hashMap2.put("url", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        hashMap.put(String.valueOf(0), hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("name", "remote");
        hashMap3.put("url", this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        hashMap.put(String.valueOf(1), hashMap3);
        return new VeniceServerConfig(propertyBuilder.build(), hashMap);
    }

    private void verifyPutAndDelete(int i, boolean z, boolean z2) {
        VenicePartitioner venicePartitioner = getVenicePartitioner(i);
        int partitionId = venicePartitioner.getPartitionId(putKeyFoo, 10);
        int partitionId2 = venicePartitioner.getPartitionId(deleteKeyFoo, 10);
        if (!z || z2) {
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(100000L))).put(partitionId, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()));
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(100000L))).delete(partitionId2, deleteKeyFoo);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).putWithReplicationMetadata(partitionId, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()), putKeyFooReplicationMetadataWithValueSchemaIdBytes);
            ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).deleteWithReplicationMetadata(partitionId2, deleteKeyFoo, deleteKeyFooReplicationMetadataWithValueSchemaIdBytes);
            return;
        }
        ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(100000L))).putWithReplicationMetadata(partitionId, putKeyFoo, ByteBuffer.wrap(ValueRecord.create(1, putValue).serialize()), putKeyFooReplicationMetadataWithValueSchemaIdBytes);
        ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.timeout(100000L))).deleteWithReplicationMetadata(partitionId2, deleteKeyFoo, deleteKeyFooReplicationMetadataWithValueSchemaIdBytes);
        ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).put(Mockito.eq(partitionId), (byte[]) Mockito.eq(putKeyFoo), (ByteBuffer) Mockito.any(ByteBuffer.class));
        ((AbstractStorageEngine) Mockito.verify(this.mockAbstractStorageEngine, Mockito.never())).delete(Mockito.eq(partitionId2), (byte[]) Mockito.eq(deleteKeyFoo));
    }

    @Test
    public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, InterruptedException {
        VenicePartitioner venicePartitioner = getVenicePartitioner(1);
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(venicePartitioner.getClass().getName());
        partitionerConfigImpl.setAmplificationFactor(1);
        MockStoreVersionConfigs mockStoreVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfigImpl, Optional.of(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), false, false, true);
        Store store = mockStoreVersionConfigs.store;
        Version version = mockStoreVersionConfigs.version;
        VeniceStoreVersionConfig veniceStoreVersionConfig = mockStoreVersionConfigs.storeVersionConfig;
        HashMap hashMap = new HashMap();
        hashMap.put("server.enable.live.config.based.kafka.throttling", true);
        StoreIngestionTaskFactory build = getIngestionTaskFactoryBuilder(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), Optional.empty(), 1, hashMap, true).build();
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        this.storeIngestionTaskUnderTest = build.getNewIngestionTask(store, version, properties, this.isCurrentVersion, veniceStoreVersionConfig, PartitionUtils.getLeaderSubPartition(1, 1), false, Optional.empty());
        AtomicLong atomicLong = new AtomicLong(10L);
        TestMockTime testMockTime = new TestMockTime();
        EventThrottler eventThrottler = new EventThrottler(testMockTime, -1L, 1000L, "local_throttler", true, EventThrottler.REJECT_STRATEGY);
        Objects.requireNonNull(atomicLong);
        EventThrottler eventThrottler2 = new EventThrottler(testMockTime, atomicLong::get, 1000L, "remote_throttler", true, EventThrottler.REJECT_STRATEGY);
        this.kafkaUrlToRecordsThrottler.put(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), eventThrottler);
        this.kafkaUrlToRecordsThrottler.put(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer(), eventThrottler2);
        String composeRealTimeTopic = Version.composeRealTimeTopic(storeNameWithoutVersionInfo);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(composeRealTimeTopic), 1);
        this.inMemoryLocalKafkaBroker.createTopic(composeRealTimeTopic, 2);
        this.inMemoryRemoteKafkaBroker.createTopic(composeRealTimeTopic, 2);
        this.aggKafkaConsumerService.subscribeConsumerFor(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), this.storeIngestionTaskUnderTest, pubSubTopicPartitionImpl, EMPTY_POLL_SLEEP_MS);
        this.aggKafkaConsumerService.subscribeConsumerFor(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer(), this.storeIngestionTaskUnderTest, pubSubTopicPartitionImpl, EMPTY_POLL_SLEEP_MS);
        VeniceWriter veniceWriter = getVeniceWriter(composeRealTimeTopic, new MockInMemoryProducerAdapter(this.inMemoryLocalKafkaBroker), 1);
        VeniceWriter veniceWriter2 = getVeniceWriter(composeRealTimeTopic, new MockInMemoryProducerAdapter(this.inMemoryRemoteKafkaBroker), 1);
        long j = READ_CYCLE_DELAY_MS;
        for (int i = 0; i < READ_CYCLE_DELAY_MS; i++) {
            veniceWriter.put(putKeyFoo, putValue, 1, 2L, (PubSubProducerCallback) null).get();
        }
        for (int i2 = 0; i2 < READ_CYCLE_DELAY_MS; i2++) {
            veniceWriter2.put(putKeyFoo, putValue, 1, 2L, (PubSubProducerCallback) null).get();
        }
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertEquals(this.localConsumedDataReceiver.receivedRecordsCount(), j);
            Assert.assertEquals(this.remoteConsumedDataReceiver.receivedRecordsCount(), j);
        });
        atomicLong.set(EMPTY_POLL_SLEEP_MS);
        for (int i3 = 0; i3 < READ_CYCLE_DELAY_MS; i3++) {
            veniceWriter.put(putKeyFoo, putValue, 1, 2L, (PubSubProducerCallback) null).get();
        }
        for (int i4 = 0; i4 < READ_CYCLE_DELAY_MS; i4++) {
            veniceWriter2.put(putKeyFoo, putValue, 1, 2L, (PubSubProducerCallback) null).get();
        }
        Long valueOf = Long.valueOf(READ_CYCLE_DELAY_MS * 2);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            Assert.assertTrue(this.localConsumedDataReceiver.receivedRecordsCount() == valueOf.longValue());
            Assert.assertTrue(this.remoteConsumedDataReceiver.receivedRecordsCount() == j);
        });
        int size = Mockito.mockingDetails(this.mockRemoteKafkaConsumer).getInvocations().size();
        atomicLong.set(10L);
        testMockTime.sleep(1000L);
        Assert.assertEquals(size, Mockito.mockingDetails(this.mockRemoteKafkaConsumer).getInvocations().size(), "Remote consumer should not poll for new records but return previously cached records");
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testIsReadyToServe(boolean z) {
        VenicePartitioner venicePartitioner = getVenicePartitioner(1);
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(venicePartitioner.getClass().getName());
        partitionerConfigImpl.setAmplificationFactor(1);
        MockStoreVersionConfigs mockStoreVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfigImpl, Optional.of(new HybridStoreConfigImpl(100L, 100L, 100L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), false, true, true);
        Store store = mockStoreVersionConfigs.store;
        Version version = mockStoreVersionConfigs.version;
        VeniceStoreVersionConfig veniceStoreVersionConfig = mockStoreVersionConfigs.storeVersionConfig;
        StoreIngestionTaskFactory build = getIngestionTaskFactoryBuilder(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), Optional.empty(), 1, new HashMap(), false).setIsDaVinciClient(z).setAggKafkaConsumerService(this.aggKafkaConsumerService).build();
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        ((TopicManagerRepository) Mockito.doReturn(this.mockTopicManager).when(this.mockTopicManagerRepository)).getTopicManager(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        ((TopicManagerRepository) Mockito.doReturn(topicManager).when(this.mockTopicManagerRepository)).getTopicManager(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopic((PubSubTopic) Mockito.any());
        ((TopicManager) Mockito.doReturn(true).when(topicManager)).containsTopic((PubSubTopic) Mockito.any());
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        this.storeIngestionTaskUnderTest = build.getNewIngestionTask(store, version, properties, this.isCurrentVersion, veniceStoreVersionConfig, PartitionUtils.getLeaderSubPartition(1, 1), false, Optional.empty());
        String composeRealTimeTopic = Version.composeRealTimeTopic(store.getName());
        PubSubTopic topic = pubSubTopicRepository.getTopic(composeRealTimeTopic);
        TopicSwitch topicSwitch = new TopicSwitch();
        topicSwitch.sourceKafkaServers = new ArrayList();
        topicSwitch.sourceKafkaServers.add(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        topicSwitch.sourceTopicName = composeRealTimeTopic;
        TopicSwitchWrapper topicSwitchWrapper = new TopicSwitchWrapper(topicSwitch, pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString()));
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord)).getLocalVersionTopicOffset();
        ((OffsetRecord) Mockito.doReturn(topic).when(offsetRecord)).getLeaderTopic((PubSubTopicRepository) Mockito.any());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord)).getUpstreamOffset(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord)).getUpstreamOffset(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        OffsetRecord offsetRecord2 = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord2)).getLocalVersionTopicOffset();
        ((OffsetRecord) Mockito.doReturn(topic).when(offsetRecord2)).getLeaderTopic((PubSubTopicRepository) Mockito.any());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord2)).getUpstreamOffset(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord2)).getUpstreamOffset(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(System.currentTimeMillis() - 3600000)).when(offsetRecord2)).getLatestProducerProcessingTimeInMs();
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState)).isEndOfPushReceived();
        Assert.assertFalse(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState));
        PartitionConsumptionState partitionConsumptionState2 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState2)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState2)).isComplete();
        Assert.assertTrue(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState2));
        PartitionConsumptionState partitionConsumptionState3 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState3)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState3)).isComplete();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState3)).isWaitingForReplicationLag();
        Assert.assertTrue(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState3));
        PartitionConsumptionState partitionConsumptionState4 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState4)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState4)).isComplete();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState4)).isWaitingForReplicationLag();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState4)).isHybrid();
        ((PartitionConsumptionState) Mockito.doReturn((Object) null).when(partitionConsumptionState4)).getTopicSwitch();
        Assert.assertFalse(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState4));
        PartitionConsumptionState partitionConsumptionState5 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState5)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState5)).isComplete();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState5)).isWaitingForReplicationLag();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState5)).isHybrid();
        ((PartitionConsumptionState) Mockito.doReturn(topicSwitchWrapper).when(partitionConsumptionState5)).getTopicSwitch();
        ((PartitionConsumptionState) Mockito.doReturn(offsetRecord).when(partitionConsumptionState5)).getOffsetRecord();
        ((TopicManager) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(this.mockTopicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((TopicManager) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(topicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((PartitionConsumptionState) Mockito.doReturn(0).when(partitionConsumptionState5)).getPartition();
        ((PartitionConsumptionState) Mockito.doReturn(0).when(partitionConsumptionState5)).getUserPartition();
        this.storeIngestionTaskUnderTest.setPartitionConsumptionState(0, partitionConsumptionState5);
        ((PartitionConsumptionState) Mockito.doReturn(LeaderFollowerStateType.LEADER).when(partitionConsumptionState5)).getLeaderFollowerState();
        Assert.assertTrue(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState5));
        PartitionConsumptionState partitionConsumptionState6 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState6)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState6)).isComplete();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState6)).isWaitingForReplicationLag();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState6)).isHybrid();
        ((PartitionConsumptionState) Mockito.doReturn(topicSwitchWrapper).when(partitionConsumptionState6)).getTopicSwitch();
        ((PartitionConsumptionState) Mockito.doReturn(offsetRecord2).when(partitionConsumptionState6)).getOffsetRecord();
        ((TopicManager) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(this.mockTopicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((TopicManager) Mockito.doReturn(150L).when(topicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((AggKafkaConsumerService) Mockito.doReturn(150L).when(this.aggKafkaConsumerService)).getLatestOffsetFor(Mockito.anyString(), (PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        Assert.assertEquals(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState6), z);
        PartitionConsumptionState partitionConsumptionState7 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState7)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState7)).isComplete();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState7)).isWaitingForReplicationLag();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState7)).isHybrid();
        ((PartitionConsumptionState) Mockito.doReturn(topicSwitchWrapper).when(partitionConsumptionState7)).getTopicSwitch();
        ((PartitionConsumptionState) Mockito.doReturn(offsetRecord2).when(partitionConsumptionState7)).getOffsetRecord();
        ((TopicManager) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(this.mockTopicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((TopicManager) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(topicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((TopicManager) Mockito.doReturn(Long.valueOf(System.currentTimeMillis() - 172800000)).when(this.mockTopicManager)).getProducerTimestampOfLastDataRecord((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((TopicManager) Mockito.doReturn(Long.valueOf(System.currentTimeMillis())).when(topicManager)).getProducerTimestampOfLastDataRecord((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        Assert.assertEquals(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState7), z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testActiveActiveStoreIsReadyToServe(boolean z) {
        VenicePartitioner venicePartitioner = getVenicePartitioner(1);
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(venicePartitioner.getClass().getName());
        partitionerConfigImpl.setAmplificationFactor(1);
        MockStoreVersionConfigs mockStoreVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfigImpl, Optional.of(new HybridStoreConfigImpl(100L, 100L, -1L, DataReplicationPolicy.ACTIVE_ACTIVE, BufferReplayPolicy.REWIND_FROM_EOP)), false, true, true);
        Store store = mockStoreVersionConfigs.store;
        Version version = mockStoreVersionConfigs.version;
        VeniceStoreVersionConfig veniceStoreVersionConfig = mockStoreVersionConfigs.storeVersionConfig;
        StoreIngestionTaskFactory build = getIngestionTaskFactoryBuilder(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), Optional.empty(), 1, new HashMap(), false).setIsDaVinciClient(z).setAggKafkaConsumerService(this.aggKafkaConsumerService).build();
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        ((TopicManagerRepository) Mockito.doReturn(this.mockTopicManager).when(this.mockTopicManagerRepository)).getTopicManager(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        ((TopicManagerRepository) Mockito.doReturn(topicManager).when(this.mockTopicManagerRepository)).getTopicManager(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        ((TopicManager) Mockito.doReturn(true).when(this.mockTopicManager)).containsTopic((PubSubTopic) Mockito.any());
        ((TopicManager) Mockito.doReturn(true).when(topicManager)).containsTopic((PubSubTopic) Mockito.any());
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        this.storeIngestionTaskUnderTest = build.getNewIngestionTask(store, version, properties, this.isCurrentVersion, veniceStoreVersionConfig, PartitionUtils.getLeaderSubPartition(1, 1), false, Optional.empty());
        String composeRealTimeTopic = Version.composeRealTimeTopic(store.getName());
        PubSubTopic topic = pubSubTopicRepository.getTopic(composeRealTimeTopic);
        TopicSwitch topicSwitch = new TopicSwitch();
        topicSwitch.sourceKafkaServers = new ArrayList();
        topicSwitch.sourceKafkaServers.add(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        topicSwitch.sourceKafkaServers.add(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        topicSwitch.sourceTopicName = composeRealTimeTopic;
        TopicSwitchWrapper topicSwitchWrapper = new TopicSwitchWrapper(topicSwitch, pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString()));
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord)).getLocalVersionTopicOffset();
        ((OffsetRecord) Mockito.doReturn(topic).when(offsetRecord)).getLeaderTopic((PubSubTopicRepository) Mockito.any());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord)).getUpstreamOffset(this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        ((OffsetRecord) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(offsetRecord)).getUpstreamOffset(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState)).isEndOfPushReceived();
        ((PartitionConsumptionState) Mockito.doReturn(false).when(partitionConsumptionState)).isComplete();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState)).isWaitingForReplicationLag();
        ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState)).isHybrid();
        ((PartitionConsumptionState) Mockito.doReturn(topicSwitchWrapper).when(partitionConsumptionState)).getTopicSwitch();
        ((PartitionConsumptionState) Mockito.doReturn(offsetRecord).when(partitionConsumptionState)).getOffsetRecord();
        ((TopicManager) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(this.mockTopicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((TopicManager) Mockito.doReturn(150L).when(topicManager)).getPartitionLatestOffsetAndRetry((PubSubTopicPartition) Mockito.any(), Mockito.anyInt());
        ((AggKafkaConsumerService) Mockito.doReturn(150L).when(this.aggKafkaConsumerService)).getLatestOffsetFor(Mockito.anyString(), (PubSubTopic) Mockito.any(), (PubSubTopicPartition) Mockito.any());
        ((PartitionConsumptionState) Mockito.doReturn(0).when(partitionConsumptionState)).getPartition();
        ((PartitionConsumptionState) Mockito.doReturn(0).when(partitionConsumptionState)).getUserPartition();
        this.storeIngestionTaskUnderTest.setPartitionConsumptionState(0, partitionConsumptionState);
        Assert.assertEquals(this.storeIngestionTaskUnderTest.isReadyToServe(partitionConsumptionState), z);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testProcessTopicSwitch(boolean z) {
        VenicePartitioner venicePartitioner = getVenicePartitioner(1);
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(venicePartitioner.getClass().getName());
        partitionerConfigImpl.setAmplificationFactor(1);
        MockStoreVersionConfigs mockStoreVersionConfigs = setupStoreAndVersionMocks(2, partitionerConfigImpl, Optional.of(new HybridStoreConfigImpl(100L, 100L, 100L, DataReplicationPolicy.AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP)), false, true, false);
        Store store = mockStoreVersionConfigs.store;
        Version version = mockStoreVersionConfigs.version;
        VeniceStoreVersionConfig veniceStoreVersionConfig = mockStoreVersionConfigs.storeVersionConfig;
        ((StorageEngineRepository) Mockito.doReturn(new DeepCopyStorageEngine(this.mockAbstractStorageEngine)).when(this.mockStorageEngineRepository)).getLocalStorageEngine(this.topic);
        StoreIngestionTaskFactory build = getIngestionTaskFactoryBuilder(new RandomPollStrategy(), Utils.setOf(new Integer[]{1}), Optional.empty(), 1, new HashMap(), false).setIsDaVinciClient(z).build();
        int leaderSubPartition = PartitionUtils.getLeaderSubPartition(1, 1);
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.inMemoryLocalKafkaBroker.getKafkaBootstrapServer());
        this.storeIngestionTaskUnderTest = build.getNewIngestionTask(store, version, properties, this.isCurrentVersion, veniceStoreVersionConfig, leaderSubPartition, false, Optional.empty());
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        ((TopicManagerRepository) Mockito.doReturn(topicManager).when(this.mockTopicManagerRepository)).getTopicManager(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        TopicSwitch topicSwitch = new TopicSwitch();
        topicSwitch.sourceKafkaServers = new ArrayList();
        topicSwitch.sourceKafkaServers.add(this.inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
        topicSwitch.sourceTopicName = Version.composeRealTimeTopic(store.getName());
        topicSwitch.rewindStartTimestamp = System.currentTimeMillis();
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageUnion = topicSwitch;
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn((OffsetRecord) Mockito.mock(OffsetRecord.class)).when(partitionConsumptionState)).getOffsetRecord();
        ((PartitionConsumptionState) Mockito.doReturn(1).when(partitionConsumptionState)).getUserPartition();
        ((PartitionConsumptionState) Mockito.doReturn(1).when(partitionConsumptionState)).getPartition();
        this.storeIngestionTaskUnderTest.getStatusReportAdapter().initializePartitionReportStatus(1);
        this.storeIngestionTaskUnderTest.processTopicSwitch(controlMessage, 1, 10L, partitionConsumptionState);
        ((TopicManager) Mockito.verify(topicManager, z ? Mockito.never() : Mockito.times(1))).getPartitionOffsetByTime((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testUpdateConsumedUpstreamRTOffsetMapDuringRTSubscription(boolean z) {
        String uniqueString = Utils.getUniqueString("store");
        String composeKafkaTopic = Version.composeKafkaTopic(uniqueString, 1);
        VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
        ((VeniceStoreVersionConfig) Mockito.doReturn(composeKafkaTopic).when(veniceStoreVersionConfig)).getStoreVersionName();
        Version version = (Version) Mockito.mock(Version.class);
        ((Version) Mockito.doReturn(1).when(version)).getPartitionCount();
        ((Version) Mockito.doReturn(VersionStatus.STARTED).when(version)).getStatus();
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        Store store = (Store) Mockito.mock(Store.class);
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(readOnlyStoreRepository)).getStoreOrThrow((String) Mockito.eq(uniqueString));
        ((Store) Mockito.doReturn(false).when(store)).isHybridStoreDiskQuotaEnabled();
        ((Store) Mockito.doReturn(Optional.of(version)).when(store)).getVersion(1);
        ((Version) Mockito.doReturn(Boolean.valueOf(z)).when(version)).isActiveActiveReplicationEnabled();
        Properties properties = (Properties) Mockito.mock(Properties.class);
        ((Properties) Mockito.doReturn("localhost").when(properties)).getProperty((String) Mockito.eq("kafka.bootstrap.servers"));
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        VeniceProperties veniceProperties = (VeniceProperties) Mockito.mock(VeniceProperties.class);
        ((VeniceProperties) Mockito.doReturn(true).when(veniceProperties)).isEmpty();
        ((VeniceServerConfig) Mockito.doReturn(veniceProperties).when(veniceServerConfig)).getKafkaConsumerConfigsForLocalConsumption();
        ((VeniceServerConfig) Mockito.doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterUrlToIdMap();
        ((VeniceServerConfig) Mockito.doReturn(Int2ObjectMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterIdToUrlMap();
        LeaderFollowerStoreIngestionTask newIngestionTask = TestUtils.getStoreIngestionTaskBuilder(uniqueString).setTopicManagerRepository(this.mockTopicManagerRepository).setStorageMetadataService(this.mockStorageMetadataService).setMetadataRepository(readOnlyStoreRepository).setTopicManagerRepository(this.mockTopicManagerRepository).setServerConfig(veniceServerConfig).setPubSubTopicRepository(pubSubTopicRepository).build().getNewIngestionTask(store, version, properties, () -> {
            return true;
        }, veniceStoreVersionConfig, 0, false, Optional.empty());
        TopicSwitch topicSwitch = new TopicSwitch();
        topicSwitch.sourceKafkaServers = Collections.singletonList("localhost");
        topicSwitch.sourceTopicName = "test_rt";
        topicSwitch.rewindStartTimestamp = System.currentTimeMillis();
        TopicSwitchWrapper topicSwitchWrapper = new TopicSwitchWrapper(topicSwitch, pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString()));
        PubSubTopic topic = pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString());
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        ((PartitionConsumptionState) Mockito.doReturn(LeaderFollowerStateType.IN_TRANSITION_FROM_STANDBY_TO_LEADER).when(partitionConsumptionState)).getLeaderFollowerState();
        ((PartitionConsumptionState) Mockito.doReturn(topicSwitchWrapper).when(partitionConsumptionState)).getTopicSwitch();
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(pubSubTopicRepository.getTopic("test_rt")).when(offsetRecord)).getLeaderTopic((PubSubTopicRepository) Mockito.any());
        ((PartitionConsumptionState) Mockito.doReturn(1000L).when(partitionConsumptionState)).getLeaderOffset(Mockito.anyString(), (PubSubTopicRepository) Mockito.any());
        ((PartitionConsumptionState) Mockito.doReturn(offsetRecord).when(partitionConsumptionState)).getOffsetRecord();
        newIngestionTask.startConsumingAsLeaderInTransitionFromStandby(partitionConsumptionState);
        ((PartitionConsumptionState) Mockito.verify(partitionConsumptionState, Mockito.times(1))).updateLeaderConsumedUpstreamRTOffset((String) Mockito.eq(z ? "localhost" : ""), Mockito.eq(1000L));
        PubSubTopic topic2 = pubSubTopicRepository.getTopic("test_rt");
        Supplier supplier = () -> {
            PartitionConsumptionState partitionConsumptionState2 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
            ((PartitionConsumptionState) Mockito.doReturn(LeaderFollowerStateType.LEADER).when(partitionConsumptionState2)).getLeaderFollowerState();
            ((PartitionConsumptionState) Mockito.doReturn(topicSwitchWrapper).when(partitionConsumptionState2)).getTopicSwitch();
            OffsetRecord offsetRecord2 = (OffsetRecord) Mockito.mock(OffsetRecord.class);
            ((OffsetRecord) Mockito.doReturn(topic2).when(offsetRecord2)).getLeaderTopic((PubSubTopicRepository) Mockito.any());
            System.out.println(offsetRecord2.getLeaderTopic((PubSubTopicRepository) null));
            ((OffsetRecord) Mockito.doReturn(1000L).when(offsetRecord2)).getUpstreamOffset(Mockito.anyString());
            if (z) {
                ((PartitionConsumptionState) Mockito.doReturn(1000L).when(partitionConsumptionState2)).getLatestProcessedUpstreamRTOffsetWithNoDefault(Mockito.anyString());
            } else {
                ((PartitionConsumptionState) Mockito.doReturn(1000L).when(partitionConsumptionState2)).getLatestProcessedUpstreamRTOffset(Mockito.anyString());
            }
            ((PartitionConsumptionState) Mockito.doReturn(offsetRecord2).when(partitionConsumptionState2)).getOffsetRecord();
            System.out.println("inside mock" + offsetRecord2.getLeaderTopic((PubSubTopicRepository) null));
            return partitionConsumptionState2;
        };
        PartitionConsumptionState partitionConsumptionState2 = (PartitionConsumptionState) supplier.get();
        newIngestionTask.leaderExecuteTopicSwitch(partitionConsumptionState2, topicSwitch, topic);
        ((PartitionConsumptionState) Mockito.verify(partitionConsumptionState2, Mockito.times(1))).updateLeaderConsumedUpstreamRTOffset((String) Mockito.eq(z ? "localhost" : ""), Mockito.eq(1000L));
        Supplier supplier2 = () -> {
            PartitionConsumptionState partitionConsumptionState3 = (PartitionConsumptionState) supplier.get();
            if (z) {
                ((PartitionConsumptionState) Mockito.doReturn(-1L).when(partitionConsumptionState3)).getLatestProcessedUpstreamRTOffsetWithNoDefault(Mockito.anyString());
            } else {
                ((PartitionConsumptionState) Mockito.doReturn(-1L).when(partitionConsumptionState3)).getLatestProcessedUpstreamRTOffset(Mockito.anyString());
            }
            ((PartitionConsumptionState) Mockito.doReturn(new PubSubTopicPartitionImpl(topic2, 0)).when(partitionConsumptionState3)).getSourceTopicPartition((PubSubTopic) Mockito.any());
            return partitionConsumptionState3;
        };
        PartitionConsumptionState partitionConsumptionState3 = (PartitionConsumptionState) supplier2.get();
        newIngestionTask.leaderExecuteTopicSwitch(partitionConsumptionState3, topicSwitch, topic);
        ((PartitionConsumptionState) Mockito.verify(partitionConsumptionState3, Mockito.never())).updateLeaderConsumedUpstreamRTOffset(Mockito.anyString(), Mockito.anyLong());
        PartitionConsumptionState partitionConsumptionState4 = (PartitionConsumptionState) supplier2.get();
        topicSwitch.rewindStartTimestamp = EMPTY_POLL_SLEEP_MS;
        newIngestionTask.leaderExecuteTopicSwitch(partitionConsumptionState4, topicSwitch, topic);
        ((PartitionConsumptionState) Mockito.verify(partitionConsumptionState4, Mockito.never())).updateLeaderConsumedUpstreamRTOffset(Mockito.anyString(), Mockito.anyLong());
    }

    @Test
    public void testLeaderShouldSubscribeToCorrectVTOffset() {
        StoreIngestionTaskFactory.Builder builder = (StoreIngestionTaskFactory.Builder) Mockito.mock(StoreIngestionTaskFactory.Builder.class);
        StorageEngineRepository storageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        ((StorageEngineRepository) Mockito.doReturn(new DeepCopyStorageEngine(this.mockAbstractStorageEngine)).when(storageEngineRepository)).getLocalStorageEngine(Mockito.anyString());
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(storageEngineRepository).when(builder)).getStorageEngineRepository();
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(VeniceProperties.empty()).when(veniceServerConfig)).getKafkaConsumerConfigsForLocalConsumption();
        ((VeniceServerConfig) Mockito.doReturn(VeniceProperties.empty()).when(veniceServerConfig)).getKafkaConsumerConfigsForRemoteConsumption();
        ((VeniceServerConfig) Mockito.doReturn(Object2IntMaps.emptyMap()).when(veniceServerConfig)).getKafkaClusterUrlToIdMap();
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(veniceServerConfig).when(builder)).getServerConfig();
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(Mockito.mock(ReadOnlyStoreRepository.class)).when(builder)).getMetadataRepo();
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(Mockito.mock(ReadOnlySchemaRepository.class)).when(builder)).getSchemaRepo();
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(Mockito.mock(AggKafkaConsumerService.class)).when(builder)).getAggKafkaConsumerService();
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(this.mockAggStoreIngestionStats).when(builder)).getIngestionStats();
        ((StoreIngestionTaskFactory.Builder) Mockito.doReturn(pubSubTopicRepository).when(builder)).getPubSubTopicRepository();
        Version version = (Version) Mockito.mock(Version.class);
        ((Version) Mockito.doReturn(1).when(version)).getPartitionCount();
        ((Version) Mockito.doReturn((Object) null).when(version)).getPartitionerConfig();
        ((Version) Mockito.doReturn(VersionStatus.ONLINE).when(version)).getStatus();
        ((Version) Mockito.doReturn(true).when(version)).isNativeReplicationEnabled();
        ((Version) Mockito.doReturn("localhost").when(version)).getPushStreamSourceAddress();
        Store store = (Store) Mockito.mock(Store.class);
        ((Store) Mockito.doReturn(Optional.of(version)).when(store)).getVersion(Mockito.eq(1));
        VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
        ((VeniceStoreVersionConfig) Mockito.doReturn("testStore_v1").when(veniceStoreVersionConfig)).getStoreVersionName();
        LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = (LeaderFollowerStoreIngestionTask) Mockito.spy(new LeaderFollowerStoreIngestionTask(builder, store, version, (Properties) Mockito.mock(Properties.class), (BooleanSupplier) Mockito.mock(BooleanSupplier.class), veniceStoreVersionConfig, -1, false, Optional.empty()));
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(pubSubTopicRepository.getTopic("testStore_v1")).when(offsetRecord)).getLeaderTopic((PubSubTopicRepository) Mockito.any());
        PartitionConsumptionState partitionConsumptionState = new PartitionConsumptionState(0, 1, offsetRecord, false);
        partitionConsumptionState.updateLatestProcessedLocalVersionTopicOffset(100L);
        partitionConsumptionState.updateLatestProcessedUpstreamVersionTopicOffset(200L);
        ((LeaderFollowerStoreIngestionTask) Mockito.doCallRealMethod().when(leaderFollowerStoreIngestionTask)).startConsumingAsLeader((PartitionConsumptionState) Mockito.any());
        ((LeaderFollowerStoreIngestionTask) Mockito.doReturn(false).when(leaderFollowerStoreIngestionTask)).shouldNewLeaderSwitchToRemoteConsumption((PartitionConsumptionState) Mockito.any());
        HashSet hashSet = new HashSet();
        hashSet.add("localhost");
        ((LeaderFollowerStoreIngestionTask) Mockito.doReturn(hashSet).when(leaderFollowerStoreIngestionTask)).getConsumptionSourceKafkaAddress((PartitionConsumptionState) Mockito.any());
        partitionConsumptionState.setConsumeRemotely(false);
        leaderFollowerStoreIngestionTask.startConsumingAsLeader(partitionConsumptionState);
        ((LeaderFollowerStoreIngestionTask) Mockito.verify(leaderFollowerStoreIngestionTask, Mockito.times(1))).consumerSubscribe((PubSubTopicPartition) Mockito.any(), Mockito.eq(100L), Mockito.anyString());
        partitionConsumptionState.setConsumeRemotely(true);
        leaderFollowerStoreIngestionTask.startConsumingAsLeader(partitionConsumptionState);
        ((LeaderFollowerStoreIngestionTask) Mockito.verify(leaderFollowerStoreIngestionTask, Mockito.times(1))).consumerSubscribe((PubSubTopicPartition) Mockito.any(), Mockito.eq(200L), Mockito.anyString());
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testWrappedInterruptExceptionDuringGracefulShutdown(boolean z) throws Exception {
        this.hybridStoreConfig = Optional.of(new HybridStoreConfigImpl(10L, 20L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP));
        VeniceException veniceException = new VeniceException("Wrapped interruptedException", new InterruptedException());
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.doReturn(TestUtils.getOffsetRecord(PUT_KEY_FOO_OFFSET, true)).when(this.mockStorageMetadataService)).getLastOffset(this.topic, 1);
            ((AggKafkaConsumerService) Mockito.doThrow(new Throwable[]{veniceException}).when(this.aggKafkaConsumerService)).unsubscribeConsumerFor((PubSubTopic) Mockito.eq(this.pubSubTopic), (PubSubTopicPartition) Mockito.any());
        }, () -> {
            ((VeniceNotifier) Mockito.verify(this.mockLogNotifier, Mockito.timeout(TEST_TIMEOUT_MS))).restarted((String) Mockito.eq(this.topic), Mockito.eq(1), Mockito.anyLong());
            this.storeIngestionTaskUnderTest.close();
            ((AggKafkaConsumerService) Mockito.verify(this.aggKafkaConsumerService, Mockito.timeout(TEST_TIMEOUT_MS))).unsubscribeConsumerFor((PubSubTopic) Mockito.eq(this.pubSubTopic), (PubSubTopicPartition) Mockito.any());
        }, z);
        Assert.assertEquals(this.mockNotifierError.size(), 0);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testOffsetSyncBeforeGracefulShutDown(boolean z) throws Exception {
        this.localVeniceWriter.put(putKeyFoo, putValue, 1);
        this.localVeniceWriter.put(putKeyFoo2, putValue, 1);
        this.hybridStoreConfig = Optional.of(new HybridStoreConfigImpl(10L, 20L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP));
        runTest(Utils.setOf(new Integer[]{1}), () -> {
            ((StorageMetadataService) Mockito.doReturn(TestUtils.getOffsetRecord(EMPTY_POLL_SLEEP_MS, true)).when(this.mockStorageMetadataService)).getLastOffset(this.topic, 1);
        }, () -> {
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS))).getLastOffset(this.topic, 1);
            PartitionConsumptionState partitionConsumptionState = this.storeIngestionTaskUnderTest.getPartitionConsumptionState(1);
            OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
            Assert.assertEquals(partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), EMPTY_POLL_SLEEP_MS);
            Assert.assertEquals(offsetRecord.getLocalVersionTopicOffset(), EMPTY_POLL_SLEEP_MS);
            ((HostLevelIngestionStats) Mockito.verify(this.mockStoreIngestionStats, Mockito.timeout(TEST_TIMEOUT_MS).times(2))).recordTotalRecordsConsumed();
            Assert.assertEquals(partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), 2L);
            Assert.assertEquals(offsetRecord.getLocalVersionTopicOffset(), EMPTY_POLL_SLEEP_MS);
            this.storeIngestionTaskUnderTest.close();
            ((StorageMetadataService) Mockito.verify(this.mockStorageMetadataService, Mockito.timeout(TEST_TIMEOUT_MS).times(1))).put((String) Mockito.eq(this.topic), Mockito.eq(1), (OffsetRecord) Mockito.any());
            Assert.assertEquals(offsetRecord.getLocalVersionTopicOffset(), 2L);
        }, z, veniceStoreVersionConfig -> {
            ((VeniceStoreVersionConfig) Mockito.doReturn(100000L).when(veniceStoreVersionConfig)).getDatabaseSyncBytesIntervalForTransactionalMode();
        });
        Assert.assertEquals(this.mockNotifierError.size(), 0);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testProduceToStoreBufferService(boolean z) throws Exception {
        byte[] bArr = new byte[1];
        KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, bArr);
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.messageType = MessageType.PUT.getValue();
        Put put = new Put();
        put.putValue = ByteBuffer.allocate(10);
        put.putValue.position(4);
        put.replicationMetadataPayload = ByteBuffer.allocate(10);
        kafkaMessageEnvelope.payloadUnion = put;
        kafkaMessageEnvelope.producerMetadata = new ProducerMetadata();
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(kafkaKey, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(this.pubSubTopic, 1), EMPTY_POLL_SLEEP_MS, EMPTY_POLL_SLEEP_MS, 0);
        HostLevelIngestionStats hostLevelIngestionStats = (HostLevelIngestionStats) Mockito.mock(HostLevelIngestionStats.class);
        Mockito.when(this.mockAggStoreIngestionStats.getStoreStats(Mockito.anyString())).thenReturn(hostLevelIngestionStats);
        LeaderProducedRecordContext leaderProducedRecordContext = (LeaderProducedRecordContext) Mockito.mock(LeaderProducedRecordContext.class);
        Mockito.when(leaderProducedRecordContext.getMessageType()).thenReturn(MessageType.PUT);
        Mockito.when(leaderProducedRecordContext.getValueUnion()).thenReturn(put);
        Mockito.when(leaderProducedRecordContext.getKeyBytes()).thenReturn(bArr);
        runTest(Collections.singleton(1), () -> {
            TestUtils.waitForNonDeterministicAssertion(READ_CYCLE_DELAY_MS, TimeUnit.SECONDS, () -> {
                Assert.assertTrue(this.storeIngestionTaskUnderTest.hasAnySubscription());
            });
            Runnable runnable = () -> {
                try {
                    this.storeIngestionTaskUnderTest.produceToStoreBufferService(immutablePubSubMessage, leaderProducedRecordContext, 1, this.localKafkaConsumerService.kafkaUrl, System.nanoTime(), System.currentTimeMillis());
                } catch (InterruptedException e) {
                    throw new VeniceException(e);
                }
            };
            verifyStats(hostLevelIngestionStats, 0, 0);
            runnable.run();
            int i = 0 + 1;
            int i2 = 0 + 1;
            verifyStats(hostLevelIngestionStats, i, i2);
            this.storeIngestionTaskUnderTest.disableMetricsEmission();
            runnable.run();
            int i3 = i2 + 1;
            verifyStats(hostLevelIngestionStats, i, i3);
            this.storeIngestionTaskUnderTest.enableMetricsEmission();
            runnable.run();
            int i4 = i + 1;
            verifyStats(hostLevelIngestionStats, i4, i3 + 1);
            long currentTimeMillis = System.currentTimeMillis();
            long j = 10000;
            ((AggVersionedIngestionStats) Mockito.verify(this.mockVersionedStorageIngestionStats, Mockito.timeout(1000L).times(i4 + 1))).recordConsumedRecordEndToEndProcessingLatency(Mockito.anyString(), Mockito.anyInt(), ArgumentMatchers.doubleThat(d -> {
                return d.doubleValue() >= 0.0d && d.doubleValue() < 1000.0d;
            }), ArgumentMatchers.longThat(l -> {
                return l.longValue() > currentTimeMillis - j && l.longValue() < currentTimeMillis + j;
            }));
        }, z);
    }

    private void verifyStats(HostLevelIngestionStats hostLevelIngestionStats, int i, int i2) {
        ((HostLevelIngestionStats) Mockito.verify(hostLevelIngestionStats, Mockito.times(i))).recordConsumerRecordsQueuePutLatency(Mockito.anyDouble(), Mockito.anyLong());
        ((HostLevelIngestionStats) Mockito.verify(hostLevelIngestionStats, Mockito.timeout(1000L).times(i2))).recordTotalRecordsConsumed();
        ((HostLevelIngestionStats) Mockito.verify(hostLevelIngestionStats, Mockito.timeout(1000L).times(i2))).recordTotalBytesConsumed(Mockito.anyLong());
        ((AggVersionedIngestionStats) Mockito.verify(this.mockVersionedStorageIngestionStats, Mockito.timeout(1000L).times(i2))).recordRecordsConsumed(Mockito.anyString(), Mockito.anyInt());
        ((AggVersionedIngestionStats) Mockito.verify(this.mockVersionedStorageIngestionStats, Mockito.timeout(1000L).times(i2))).recordBytesConsumed(Mockito.anyString(), Mockito.anyInt(), Mockito.anyLong());
    }

    @Test
    public void testShouldPersistRecord() throws Exception {
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage((Object) null, (Object) null, new PubSubTopicPartitionImpl(this.pubSubTopic, 1), EMPTY_POLL_SLEEP_MS, EMPTY_POLL_SLEEP_MS, 0);
        runTest(Collections.singleton(1), () -> {
            Assert.assertFalse(this.storeIngestionTaskUnderTest.shouldPersistRecord(immutablePubSubMessage, (PartitionConsumptionState) null));
        }, false);
        HashMap hashMap = new HashMap();
        hashMap.put("freeze.ingestion.if.ready.to.serve.or.local.data.exists", true);
        Supplier supplier = () -> {
            PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
            Mockito.when(Boolean.valueOf(partitionConsumptionState.isSubscribed())).thenReturn(true);
            Mockito.when(Boolean.valueOf(partitionConsumptionState.isErrorReported())).thenReturn(false);
            Mockito.when(Boolean.valueOf(partitionConsumptionState.isCompletionReported())).thenReturn(true);
            return partitionConsumptionState;
        };
        runTest(new RandomPollStrategy(), Collections.singleton(1), () -> {
        }, () -> {
            Assert.assertFalse(this.storeIngestionTaskUnderTest.shouldPersistRecord(immutablePubSubMessage, (PartitionConsumptionState) supplier.get()));
        }, this.hybridStoreConfig, false, Optional.empty(), false, 1, hashMap);
        runTest(Collections.singleton(1), () -> {
            PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) supplier.get();
            PubSubTopic topic = pubSubTopicRepository.getTopic("blah_v1");
            OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
            Mockito.when(offsetRecord.getLeaderTopic((PubSubTopicRepository) Mockito.any())).thenReturn(topic);
            Mockito.when(partitionConsumptionState.getOffsetRecord()).thenReturn(offsetRecord);
            Mockito.when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER);
            Assert.assertFalse(this.storeIngestionTaskUnderTest.shouldPersistRecord(immutablePubSubMessage, partitionConsumptionState));
        }, false);
        runTest(Collections.singleton(1), () -> {
            PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) supplier.get();
            ImmutablePubSubMessage immutablePubSubMessage2 = new ImmutablePubSubMessage((Object) null, (Object) null, new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("blah_v1"), 1), EMPTY_POLL_SLEEP_MS, EMPTY_POLL_SLEEP_MS, 0);
            Mockito.when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.STANDBY);
            Assert.assertFalse(this.storeIngestionTaskUnderTest.shouldPersistRecord(immutablePubSubMessage2, partitionConsumptionState));
        }, false);
    }

    @Test
    public void testShouldProduceToVersionTopic() throws Exception {
        runTest(Collections.singleton(1), () -> {
            PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
            LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask = this.storeIngestionTaskUnderTest;
            Mockito.when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.STANDBY);
            Assert.assertFalse(leaderFollowerStoreIngestionTask.shouldProduceToVersionTopic(partitionConsumptionState));
            Mockito.when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER);
            OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
            Mockito.when(offsetRecord.getLeaderTopic((PubSubTopicRepository) Mockito.any())).thenReturn(this.pubSubTopic);
            Mockito.when(partitionConsumptionState.getOffsetRecord()).thenReturn(offsetRecord);
            Assert.assertFalse(leaderFollowerStoreIngestionTask.shouldProduceToVersionTopic(partitionConsumptionState));
            Mockito.when(offsetRecord.getLeaderTopic((PubSubTopicRepository) Mockito.any())).thenReturn(pubSubTopicRepository.getTopic("blah_rt"));
            Assert.assertTrue(leaderFollowerStoreIngestionTask.shouldProduceToVersionTopic(partitionConsumptionState));
            Mockito.when(offsetRecord.getLeaderTopic((PubSubTopicRepository) Mockito.any())).thenReturn(this.pubSubTopic);
            Mockito.when(Boolean.valueOf(partitionConsumptionState.consumeRemotely())).thenReturn(true);
            Assert.assertTrue(leaderFollowerStoreIngestionTask.shouldProduceToVersionTopic(partitionConsumptionState));
        }, false);
    }

    private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(Consumer<VeniceStoreVersionConfig> consumer) {
        VeniceStoreVersionConfig veniceStoreVersionConfig = (VeniceStoreVersionConfig) Mockito.mock(VeniceStoreVersionConfig.class);
        ((VeniceStoreVersionConfig) Mockito.doReturn(this.topic).when(veniceStoreVersionConfig)).getStoreVersionName();
        ((VeniceStoreVersionConfig) Mockito.doReturn(0).when(veniceStoreVersionConfig)).getTopicOffsetCheckIntervalMs();
        ((VeniceStoreVersionConfig) Mockito.doReturn(Long.valueOf(READ_CYCLE_DELAY_MS)).when(veniceStoreVersionConfig)).getKafkaReadCycleDelayMs();
        ((VeniceStoreVersionConfig) Mockito.doReturn(Long.valueOf(EMPTY_POLL_SLEEP_MS)).when(veniceStoreVersionConfig)).getKafkaEmptyPollSleepMs();
        ((VeniceStoreVersionConfig) Mockito.doReturn(Long.valueOf(this.databaseSyncBytesIntervalForTransactionalMode)).when(veniceStoreVersionConfig)).getDatabaseSyncBytesIntervalForTransactionalMode();
        ((VeniceStoreVersionConfig) Mockito.doReturn(Long.valueOf(this.databaseSyncBytesIntervalForDeferredWriteMode)).when(veniceStoreVersionConfig)).getDatabaseSyncBytesIntervalForDeferredWriteMode();
        ((VeniceStoreVersionConfig) Mockito.doReturn(false).when(veniceStoreVersionConfig)).isReadOnlyForBatchOnlyStoreEnabled();
        consumer.accept(veniceStoreVersionConfig);
        return veniceStoreVersionConfig;
    }

    static {
        StoreIngestionTask.SCHEMA_POLLING_DELAY_MS = 100L;
        IngestionNotificationDispatcher.PROGRESS_REPORT_INTERVAL = -1L;
        ALL_PARTITIONS = new HashSet();
        for (int i = 0; i < 10; i++) {
            ALL_PARTITIONS.add(Integer.valueOf(i));
        }
        STRING_SCHEMA = Schema.parse("\"string\"");
        putKeyFoo = getRandomKey(1);
        putKeyFoo2 = getRandomKey(1);
        putKeyBar = getRandomKey(2);
        putValue = new VeniceAvroKafkaSerializer(STRING_SCHEMA).serialize((String) null, "TestValuePut");
        putValueToCorrupt = "Please corrupt me!".getBytes(StandardCharsets.UTF_8);
        deleteKeyFoo = getRandomKey(1);
        REPLICATION_METADATA_SCHEMA = RmdSchemaGenerator.generateMetadataSchema(STRING_SCHEMA, 1);
        REPLICATION_METADATA_SERIALIZER = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(REPLICATION_METADATA_SCHEMA);
        putKeyFooReplicationMetadataWithValueSchemaIdBytesDefault = createReplicationMetadataWithValueSchemaId(PUT_KEY_FOO_OFFSET, PUT_KEY_FOO_OFFSET, 1);
        putKeyFooReplicationMetadataWithValueSchemaIdBytes = createReplicationMetadataWithValueSchemaId(2L, PUT_KEY_FOO_OFFSET, 1);
        deleteKeyFooReplicationMetadataWithValueSchemaIdBytes = createReplicationMetadataWithValueSchemaId(2L, 2L, 1);
    }
}
