package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.view.VeniceViewWriterFactoryTest;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.storage.protocol.ChunkId;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.class */
public class ActiveActiveStoreIngestionTaskTest {
    @Test
    public void testLeaderCanSendValueChunksIntoDrainer() throws ExecutionException, InterruptedException, TimeoutException {
        ActiveActiveStoreIngestionTask activeActiveStoreIngestionTask = (ActiveActiveStoreIngestionTask) Mockito.mock(ActiveActiveStoreIngestionTask.class);
        Mockito.when(activeActiveStoreIngestionTask.getHostLevelIngestionStats()).thenReturn((HostLevelIngestionStats) Mockito.mock(HostLevelIngestionStats.class));
        Mockito.when(activeActiveStoreIngestionTask.getVersionIngestionStats()).thenReturn((AggVersionedIngestionStats) Mockito.mock(AggVersionedIngestionStats.class));
        Mockito.when(activeActiveStoreIngestionTask.getVersionedDIVStats()).thenReturn((AggVersionedDIVStats) Mockito.mock(AggVersionedDIVStats.class));
        Mockito.when(activeActiveStoreIngestionTask.getKafkaVersionTopic()).thenReturn("test");
        Mockito.when(activeActiveStoreIngestionTask.createProducerCallback((PubSubMessage) ArgumentMatchers.any(), (PartitionConsumptionState) ArgumentMatchers.any(), (LeaderProducedRecordContext) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyString(), ArgumentMatchers.anyLong())).thenCallRealMethod();
        Mockito.when(activeActiveStoreIngestionTask.getProduceToTopicFunction((byte[]) ArgumentMatchers.any(), (ByteBuffer) ArgumentMatchers.any(), (ByteBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean())).thenCallRealMethod();
        Mockito.when(Integer.valueOf(activeActiveStoreIngestionTask.getRmdProtocolVersionID())).thenReturn(1);
        ((ActiveActiveStoreIngestionTask) Mockito.doCallRealMethod().when(activeActiveStoreIngestionTask)).produceToLocalKafka((PubSubMessage) ArgumentMatchers.any(), (PartitionConsumptionState) ArgumentMatchers.any(), (LeaderProducedRecordContext) ArgumentMatchers.any(), (BiConsumer) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        byte[] serializeNonChunkedKey = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey("foo".getBytes());
        PubSubProducerAdapter pubSubProducerAdapter = (PubSubProducerAdapter) Mockito.mock(PubSubProducerAdapter.class);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) ArgumentMatchers.any()))).thenReturn(1);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (TimeUnit) ArgumentMatchers.any()))).thenReturn(1);
        AtomicLong atomicLong = new AtomicLong(0L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaKey.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaMessageEnvelope.class);
        Mockito.when(pubSubProducerAdapter.sendMessage((String) ArgumentMatchers.eq("test"), (Integer) ArgumentMatchers.any(), (KafkaKey) forClass.capture(), (KafkaMessageEnvelope) forClass2.capture(), (PubSubMessageHeaders) ArgumentMatchers.any(), (PubSubProducerCallback) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            KafkaKey kafkaKey = (KafkaKey) invocationOnMock.getArgument(2);
            KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) invocationOnMock.getArgument(3);
            PubSubProducerCallback pubSubProducerCallback = (PubSubProducerCallback) invocationOnMock.getArgument(5);
            PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) Mockito.mock(PubSubProduceResult.class);
            atomicLong.addAndGet(1L);
            Mockito.when(Long.valueOf(pubSubProduceResult.getOffset())).thenReturn(Long.valueOf(atomicLong.get()));
            Mockito.when(Integer.valueOf(pubSubProduceResult.getSerializedSize())).thenReturn(Integer.valueOf(kafkaKey.getKeyLength() + (MessageType.valueOf(kafkaMessageEnvelope.messageType).equals(MessageType.PUT) ? ((Put) kafkaMessageEnvelope.payloadUnion).putValue.remaining() : 0)));
            pubSubProducerCallback.onCompletion(pubSubProduceResult, (Exception) null);
            return future;
        });
        VeniceWriter veniceWriter = new VeniceWriter(new VeniceWriterOptions.Builder("test").setPartitioner(new DefaultVenicePartitioner()).setTime(SystemTime.INSTANCE).setChunkingEnabled(true).build(), VeniceProperties.empty(), pubSubProducerAdapter);
        Mockito.when(activeActiveStoreIngestionTask.getVeniceWriter()).thenReturn(Lazy.of(() -> {
            return veniceWriter;
        }));
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 50000; i++) {
            sb.append("abcdefghabcdefghabcdefghabcdefgh");
        }
        byte[] bytes = sb.toString().getBytes();
        byte[] bArr = new byte[4 + bytes.length];
        ByteUtils.writeInt(bArr, 1, 0);
        System.arraycopy(bytes, 0, bArr, 4, bytes.length);
        ByteBuffer wrap = ByteBuffer.wrap(bArr, 4, bytes.length);
        ByteBuffer wrap2 = ByteBuffer.wrap(new byte[]{10, 11});
        PubSubMessage pubSubMessage = (PubSubMessage) Mockito.mock(PubSubMessage.class);
        Mockito.when((Long) pubSubMessage.getOffset()).thenReturn(100L);
        Put put = new Put();
        put.putValue = ByteUtils.prependIntHeaderToByteBuffer(wrap, 1, true);
        put.schemaId = 1;
        put.replicationMetadataVersionId = 1;
        put.replicationMetadataPayload = wrap2;
        activeActiveStoreIngestionTask.produceToLocalKafka(pubSubMessage, (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class), LeaderProducedRecordContext.newPutRecord(0, ((Long) pubSubMessage.getOffset()).longValue(), serializeNonChunkedKey, put), activeActiveStoreIngestionTask.getProduceToTopicFunction(serializeNonChunkedKey, wrap, wrap2, 1, true), 0, "kafkaUrl", 0, 0L);
        ((PubSubProducerAdapter) Mockito.verify(pubSubProducerAdapter, Mockito.times(4))).sendMessage((String) ArgumentMatchers.any(), (Integer) ArgumentMatchers.any(), (KafkaKey) ArgumentMatchers.any(), (KafkaMessageEnvelope) ArgumentMatchers.any(), (PubSubMessageHeaders) ArgumentMatchers.any(), (PubSubProducerCallback) ArgumentMatchers.any());
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(LeaderProducedRecordContext.class);
        ((ActiveActiveStoreIngestionTask) Mockito.verify(activeActiveStoreIngestionTask, Mockito.times(3))).produceToStoreBufferService((PubSubMessage) ArgumentMatchers.any(), (LeaderProducedRecordContext) forClass3.capture(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyString(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assert.assertEquals(((LeaderProducedRecordContext) forClass3.getAllValues().get(0)).getValueUnion(), ((KafkaMessageEnvelope) forClass2.getAllValues().get(1)).payloadUnion);
        Assert.assertEquals(((LeaderProducedRecordContext) forClass3.getAllValues().get(1)).getValueUnion(), ((KafkaMessageEnvelope) forClass2.getAllValues().get(2)).payloadUnion);
        Assert.assertEquals(((LeaderProducedRecordContext) forClass3.getAllValues().get(2)).getValueUnion(), ((KafkaMessageEnvelope) forClass2.getAllValues().get(3)).payloadUnion);
        Assert.assertEquals(((LeaderProducedRecordContext) forClass3.getAllValues().get(0)).getKeyBytes(), ((KafkaKey) forClass.getAllValues().get(1)).getKey());
        Assert.assertEquals(((LeaderProducedRecordContext) forClass3.getAllValues().get(1)).getKeyBytes(), ((KafkaKey) forClass.getAllValues().get(2)).getKey());
        Assert.assertEquals(((LeaderProducedRecordContext) forClass3.getAllValues().get(2)).getKeyBytes(), ((KafkaKey) forClass.getAllValues().get(3)).getKey());
    }

    @Test
    public void testReadingChunkedRmdFromStorage() {
        KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
        ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true);
        byte[] bytes = "foo".getBytes();
        byte[] bytes2 = "bar".getBytes();
        byte[] bytes3 = "ljl".getBytes();
        byte[] serializeNonChunkedKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(bytes);
        byte[] bArr = new byte[8];
        ByteUtils.writeInt(bArr, 2, 0);
        ByteUtils.writeInt(bArr, 666, 4);
        byte[] bArr2 = new byte[12];
        ByteUtils.writeInt(bArr2, 2, 0);
        ByteUtils.writeInt(bArr2, 666, 4);
        ByteUtils.writeInt(bArr2, 777, 8);
        byte[] bArr3 = new byte[24];
        ByteUtils.writeInt(bArr3, 2, 0);
        ByteUtils.writeInt(bArr3, 666, 4);
        ByteUtils.writeInt(bArr3, 777, 8);
        ByteUtils.writeInt(bArr3, 111, 12);
        ByteUtils.writeInt(bArr3, 222, 16);
        ByteUtils.writeInt(bArr3, 333, 20);
        byte[] bArr4 = new byte[12];
        ByteUtils.writeInt(bArr4, AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(), 0);
        ByteUtils.writeInt(bArr4, 666, 4);
        ByteUtils.writeInt(bArr4, 777, 8);
        byte[] bArr5 = new byte[16];
        ByteUtils.writeInt(bArr5, AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(), 0);
        ByteUtils.writeInt(bArr5, 111, 4);
        ByteUtils.writeInt(bArr5, 222, 8);
        ByteUtils.writeInt(bArr5, 333, 12);
        AbstractStorageEngine abstractStorageEngine = (AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class);
        ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        Mockito.when(readOnlySchemaRepository.getSupersetOrLatestValueSchema(VeniceViewWriterFactoryTest.TEST_STORE)).thenReturn(new SchemaEntry(2, "\"string\""));
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        Mockito.when(Boolean.valueOf(veniceServerConfig.isComputeFastAvroEnabled())).thenReturn(false);
        ActiveActiveStoreIngestionTask activeActiveStoreIngestionTask = (ActiveActiveStoreIngestionTask) Mockito.mock(ActiveActiveStoreIngestionTask.class);
        Mockito.when(Integer.valueOf(activeActiveStoreIngestionTask.getRmdProtocolVersionID())).thenReturn(1);
        Mockito.when(activeActiveStoreIngestionTask.getCompressor()).thenReturn(Lazy.of(NoopCompressor::new));
        Mockito.when(activeActiveStoreIngestionTask.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
        Mockito.when(activeActiveStoreIngestionTask.getStoreName()).thenReturn(VeniceViewWriterFactoryTest.TEST_STORE);
        Mockito.when(activeActiveStoreIngestionTask.getStorageEngine()).thenReturn(abstractStorageEngine);
        Mockito.when(activeActiveStoreIngestionTask.getSchemaRepo()).thenReturn(readOnlySchemaRepository);
        Mockito.when(activeActiveStoreIngestionTask.getServerConfig()).thenReturn(veniceServerConfig);
        Mockito.when(activeActiveStoreIngestionTask.getRmdWithValueSchemaByteBufferFromStorage(ArgumentMatchers.anyInt(), (byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenCallRealMethod();
        Mockito.when(Boolean.valueOf(activeActiveStoreIngestionTask.isChunked())).thenReturn(true);
        Mockito.when(activeActiveStoreIngestionTask.getHostLevelIngestionStats()).thenReturn((HostLevelIngestionStats) Mockito.mock(HostLevelIngestionStats.class));
        Mockito.when(abstractStorageEngine.getReplicationMetadata(1, serializeNonChunkedKey)).thenReturn(bArr);
        byte[] rmdWithValueSchemaByteBufferFromStorage = activeActiveStoreIngestionTask.getRmdWithValueSchemaByteBufferFromStorage(1, bytes, 0L);
        Assert.assertNotNull(rmdWithValueSchemaByteBufferFromStorage);
        Assert.assertEquals(rmdWithValueSchemaByteBufferFromStorage, bArr);
        ChunkedKeySuffix chunkedKeySuffix = new ChunkedKeySuffix();
        chunkedKeySuffix.isChunk = true;
        chunkedKeySuffix.chunkId = new ChunkId();
        ProducerMetadata producerMetadata = new ProducerMetadata(new GUID(), 1, 2, 100L, 200L);
        chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID;
        chunkedKeySuffix.chunkId.segmentNumber = producerMetadata.segmentNumber;
        chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber;
        chunkedKeySuffix.chunkId.chunkIndex = 0;
        ByteBuffer serializeChunkedKey = keyWithChunkingSuffixSerializer.serializeChunkedKey(bytes2, chunkedKeySuffix);
        ChunkedValueManifest chunkedValueManifest = new ChunkedValueManifest();
        chunkedValueManifest.schemaId = 2;
        chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList(1);
        chunkedValueManifest.size = 8;
        chunkedValueManifest.keysWithChunkIdSuffix.add(serializeChunkedKey);
        byte[] serialize = chunkedValueManifestSerializer.serialize("testStore_v1", chunkedValueManifest);
        byte[] bArr6 = new byte[4 + serialize.length];
        System.arraycopy(serialize, 0, bArr6, 4, serialize.length);
        ByteUtils.writeInt(bArr6, AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(), 0);
        byte[] serializeNonChunkedKey2 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(bytes2);
        byte[] array = serializeChunkedKey.array();
        Mockito.when(abstractStorageEngine.getReplicationMetadata(1, serializeNonChunkedKey2)).thenReturn(bArr6);
        Mockito.when(abstractStorageEngine.getReplicationMetadata(1, array)).thenReturn(bArr4);
        byte[] rmdWithValueSchemaByteBufferFromStorage2 = activeActiveStoreIngestionTask.getRmdWithValueSchemaByteBufferFromStorage(1, bytes2, 0L);
        Assert.assertNotNull(rmdWithValueSchemaByteBufferFromStorage2);
        Assert.assertEquals(rmdWithValueSchemaByteBufferFromStorage2, bArr2);
        ChunkedKeySuffix chunkedKeySuffix2 = new ChunkedKeySuffix();
        chunkedKeySuffix2.isChunk = true;
        chunkedKeySuffix2.chunkId = new ChunkId();
        chunkedKeySuffix2.chunkId.producerGUID = producerMetadata.producerGUID;
        chunkedKeySuffix2.chunkId.segmentNumber = producerMetadata.segmentNumber;
        chunkedKeySuffix2.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber;
        chunkedKeySuffix2.chunkId.chunkIndex = 0;
        ByteBuffer serializeChunkedKey2 = keyWithChunkingSuffixSerializer.serializeChunkedKey(bytes3, chunkedKeySuffix2);
        chunkedKeySuffix2.chunkId.chunkIndex = 1;
        ByteBuffer serializeChunkedKey3 = keyWithChunkingSuffixSerializer.serializeChunkedKey(bytes3, chunkedKeySuffix2);
        ChunkedValueManifest chunkedValueManifest2 = new ChunkedValueManifest();
        chunkedValueManifest2.schemaId = 2;
        chunkedValueManifest2.keysWithChunkIdSuffix = new ArrayList(2);
        chunkedValueManifest2.size = 20;
        chunkedValueManifest2.keysWithChunkIdSuffix.add(serializeChunkedKey2);
        chunkedValueManifest2.keysWithChunkIdSuffix.add(serializeChunkedKey3);
        byte[] serialize2 = chunkedValueManifestSerializer.serialize("testStore_v1", chunkedValueManifest2);
        byte[] bArr7 = new byte[4 + serialize2.length];
        System.arraycopy(serialize2, 0, bArr7, 4, serialize2.length);
        ByteUtils.writeInt(bArr7, AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(), 0);
        byte[] serializeNonChunkedKey3 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(bytes3);
        byte[] array2 = serializeChunkedKey2.array();
        byte[] array3 = serializeChunkedKey3.array();
        Mockito.when(abstractStorageEngine.getReplicationMetadata(1, serializeNonChunkedKey3)).thenReturn(bArr7);
        Mockito.when(abstractStorageEngine.getReplicationMetadata(1, array2)).thenReturn(bArr4);
        Mockito.when(abstractStorageEngine.getReplicationMetadata(1, array3)).thenReturn(bArr5);
        byte[] rmdWithValueSchemaByteBufferFromStorage3 = activeActiveStoreIngestionTask.getRmdWithValueSchemaByteBufferFromStorage(1, bytes3, 0L);
        Assert.assertNotNull(rmdWithValueSchemaByteBufferFromStorage3);
        Assert.assertEquals(rmdWithValueSchemaByteBufferFromStorage3, bArr3);
    }
}
