package com.linkedin.venice.hadoop.input.kafka.chunk;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.input.kafka.avro.MapperValueType;
import com.linkedin.venice.hadoop.input.kafka.chunk.ChunkAssembler;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedKeySuffixSerializer;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.storage.protocol.ChunkId;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.io.BytesWritable;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler.class */
public class TestChunkAssembler {
    private final ChunkAssembler chunkAssembler = new ChunkAssembler(false);
    private final ChunkAssembler rmdChunkingEnabledChunkAssembler = new ChunkAssembler(true);
    private static final int VALUE_SCHEMA_ID = 1234;
    private static final int VALUE_SCHEMA_ID_2 = 2234;
    private static final int CHUNK_MANIFEST_SCHEMA_ID = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
    private static final int CHUNK_VALUE_SCHEMA_ID = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion();
    private static final ChunkedValueManifestSerializer CHUNKED_VALUE_MANIFEST_SERIALIZER = new ChunkedValueManifestSerializer(true);
    private static final ChunkedKeySuffixSerializer CHUNKED_KEY_SUFFIX_SERIALIZER = new ChunkedKeySuffixSerializer();
    private static final RecordDeserializer<KafkaInputMapperValue> KAFKA_INPUT_MAPPER_VALUE_AVRO_SPECIFIC_DESERIALIZER = FastSerializerDeserializerFactory.getFastAvroSpecificDeserializer(KafkaInputMapperValue.SCHEMA$, KafkaInputMapperValue.class);
    private static final RecordSerializer<KafkaInputMapperValue> KAFKA_INPUT_MAPPER_VALUE_AVRO_SPECIFIC_SERIALIZER = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(KafkaInputMapperValue.SCHEMA$);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/chunk/TestChunkAssembler$ChunkInfo.class */
    public static class ChunkInfo {
        private final int totalChunkCount;
        private final int eachCountSizeInBytes;

        public ChunkInfo(int i, int i2) {
            this.totalChunkCount = i;
            this.eachCountSizeInBytes = i2;
        }
    }

    @Test
    public void testAssembleOneCompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues(createChunkBytes, new ChunkInfo(10, 20), null, 12, 34, VALUE_SCHEMA_ID, 0).iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 200));
        Assert.assertEquals(assembleAndGetValue.getReplicationMetadataVersionId(), 1);
    }

    @Test
    public void testAssembleOneCompleteLargeValueWithRmdChunking() {
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        ChunkInfo chunkInfo2 = new ChunkInfo(20, 10);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.rmdChunkingEnabledChunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues(createChunkBytes, chunkInfo, chunkInfo2, 12, 34, VALUE_SCHEMA_ID, 0).iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, chunkInfo.totalChunkCount * chunkInfo.eachCountSizeInBytes));
        Assert.assertEquals(assembleAndGetValue.getReplicationMetadataVersionId(), 1);
    }

    @Test
    public void testNoCompleteLargeValueWithMissingManifest() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 0);
        createKafkaInputMapperValues.remove(0);
        Assert.assertNull(this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues.iterator()));
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Cannot assemble a large value. Missing .*")
    public void testNoCompleteLargeValueWithMissingChunk() {
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 0);
        createKafkaInputMapperValues.remove(ThreadLocalRandom.current().nextInt(createKafkaInputMapperValues.size() - 2) + 1);
        this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues.iterator());
    }

    @Test
    public void testOneCompleteLargeValueAndOneIncompleteLargeValue() {
        ChunkId chunkId = new ChunkId();
        chunkId.segmentNumber = 12;
        chunkId.messageSequenceNumber = 34;
        ChunkId chunkId2 = new ChunkId();
        chunkId2.segmentNumber = 22;
        chunkId2.messageSequenceNumber = 54;
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, chunkId.segmentNumber, chunkId.messageSequenceNumber, VALUE_SCHEMA_ID, 0);
        List<BytesWritable> createKafkaInputMapperValues2 = createKafkaInputMapperValues(createChunkBytes, new ChunkInfo(20, 15), null, chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, 11);
        createKafkaInputMapperValues2.remove(0);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues2);
        arrayList.addAll(createKafkaInputMapperValues);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 200));
        Assert.assertEquals(assembleAndGetValue.getReplicationMetadataVersionId(), 1);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Cannot assemble a large value. Missing .*")
    public void testOneCompleteLargeValueAndOneIncompleteLargeValueCase2() {
        ChunkId chunkId = new ChunkId();
        chunkId.segmentNumber = 12;
        chunkId.messageSequenceNumber = 34;
        ChunkId chunkId2 = new ChunkId();
        chunkId2.segmentNumber = 22;
        chunkId2.messageSequenceNumber = 54;
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        ChunkInfo chunkInfo2 = new ChunkInfo(20, 15);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, chunkId.segmentNumber, chunkId.messageSequenceNumber, VALUE_SCHEMA_ID, 0);
        List<BytesWritable> createKafkaInputMapperValues2 = createKafkaInputMapperValues(createChunkBytes, chunkInfo2, null, chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, 10);
        createKafkaInputMapperValues2.remove(ThreadLocalRandom.current().nextInt(createKafkaInputMapperValues2.size() - 2) + 1);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues2);
        arrayList.addAll(createKafkaInputMapperValues);
        this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
    }

    @Test
    public void testTwoCompleteLargeValues() {
        ChunkId chunkId = new ChunkId();
        chunkId.segmentNumber = 12;
        chunkId.messageSequenceNumber = 34;
        ChunkId chunkId2 = new ChunkId();
        chunkId2.segmentNumber = 22;
        chunkId2.messageSequenceNumber = 54;
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        ChunkInfo chunkInfo2 = new ChunkInfo(20, 15);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, chunkId.segmentNumber, chunkId.messageSequenceNumber, VALUE_SCHEMA_ID, 0);
        List<BytesWritable> createKafkaInputMapperValues2 = createKafkaInputMapperValues(createChunkBytes, chunkInfo2, null, chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, 10);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues2);
        arrayList.addAll(createKafkaInputMapperValues);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 300));
    }

    @Test
    public void testTwoCompleteLargeValuesWithOneDuplicatedChunk() {
        ChunkId chunkId = new ChunkId();
        chunkId.segmentNumber = 12;
        chunkId.messageSequenceNumber = 34;
        ChunkId chunkId2 = new ChunkId();
        chunkId2.segmentNumber = 22;
        chunkId2.messageSequenceNumber = 54;
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        ChunkInfo chunkInfo2 = new ChunkInfo(20, 15);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, chunkId.segmentNumber, chunkId.messageSequenceNumber, VALUE_SCHEMA_ID, 0);
        KafkaInputMapperValue deserialize = deserialize(createKafkaInputMapperValues.get(ThreadLocalRandom.current().nextInt(createKafkaInputMapperValues.size() - 1)));
        KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
        kafkaInputMapperValue.schemaId = deserialize.schemaId;
        kafkaInputMapperValue.offset = deserialize(createKafkaInputMapperValues.get(createKafkaInputMapperValues.size() - 1)).offset + 1;
        kafkaInputMapperValue.valueType = deserialize.valueType;
        kafkaInputMapperValue.value = deserialize.value;
        kafkaInputMapperValue.chunkedKeySuffix = deserialize.chunkedKeySuffix;
        kafkaInputMapperValue.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
        createKafkaInputMapperValues.add(serialize(kafkaInputMapperValue));
        List<BytesWritable> createKafkaInputMapperValues2 = createKafkaInputMapperValues(createChunkBytes, chunkInfo2, null, chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, 11);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues2);
        arrayList.addAll(createKafkaInputMapperValues);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 300));
    }

    @Test
    public void testTwoCompleteLargeValuesWithOneDuplicatedManifest() {
        ChunkId chunkId = new ChunkId();
        chunkId.segmentNumber = 12;
        chunkId.messageSequenceNumber = 34;
        ChunkId chunkId2 = new ChunkId();
        chunkId2.segmentNumber = 22;
        chunkId2.messageSequenceNumber = 54;
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        ChunkInfo chunkInfo2 = new ChunkInfo(20, 15);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, chunkId.segmentNumber, chunkId.messageSequenceNumber, VALUE_SCHEMA_ID, 0);
        KafkaInputMapperValue deserialize = deserialize(createKafkaInputMapperValues.get(createKafkaInputMapperValues.size() - 1));
        KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
        kafkaInputMapperValue.schemaId = deserialize.schemaId;
        kafkaInputMapperValue.offset = deserialize(createKafkaInputMapperValues.get(createKafkaInputMapperValues.size() - 1)).offset + 1;
        kafkaInputMapperValue.valueType = deserialize.valueType;
        kafkaInputMapperValue.value = deserialize.value;
        kafkaInputMapperValue.chunkedKeySuffix = deserialize.chunkedKeySuffix;
        kafkaInputMapperValue.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
        createKafkaInputMapperValues.add(serialize(kafkaInputMapperValue));
        List<BytesWritable> createKafkaInputMapperValues2 = createKafkaInputMapperValues(createChunkBytes, chunkInfo2, null, chunkId2.segmentNumber, chunkId2.messageSequenceNumber, VALUE_SCHEMA_ID_2, 11);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues2);
        arrayList.addAll(createKafkaInputMapperValues);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 300));
    }

    @Test
    public void testRegularValueAtTheEndWithCompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 0);
        createKafkaInputMapperValues.add(0, createRegularValue(TestChunkingUtils.createChunkBytes(100, 23), VALUE_SCHEMA_ID_2, 11, MapperValueType.PUT));
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(100, 23));
    }

    @Test
    public void testRegularValueAtTheEndWithIncompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 0);
        createKafkaInputMapperValues.remove(ThreadLocalRandom.current().nextInt(createKafkaInputMapperValues.size() - 2) + 1);
        createKafkaInputMapperValues.add(0, createRegularValue(TestChunkingUtils.createChunkBytes(100, 23), VALUE_SCHEMA_ID_2, 11, MapperValueType.PUT));
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(100, 23));
    }

    @Test
    public void testMultipleRegularValues() {
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(10, 10);
        byte[] createChunkBytes2 = TestChunkingUtils.createChunkBytes(20, 20);
        byte[] createChunkBytes3 = TestChunkingUtils.createChunkBytes(30, 30);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(createRegularValue(createChunkBytes, VALUE_SCHEMA_ID_2, 1, MapperValueType.PUT));
        arrayList.add(createRegularValue(createChunkBytes2, VALUE_SCHEMA_ID_2, 2, MapperValueType.PUT));
        arrayList.add(createRegularValue(createChunkBytes3, VALUE_SCHEMA_ID_2, 3, MapperValueType.PUT));
        Collections.reverse(arrayList);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(TestChunkingUtils.createChunkBytes(0, 5), arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), createChunkBytes3);
    }

    @Test
    public void testRegularAndDeleteValues() {
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(10, 10);
        byte[] createChunkBytes2 = TestChunkingUtils.createChunkBytes(30, 30);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(createRegularValue(createChunkBytes, VALUE_SCHEMA_ID_2, 1, MapperValueType.PUT));
        arrayList.add(createRegularValue(new byte[0], -1, 2, MapperValueType.DELETE));
        arrayList.add(createRegularValue(createChunkBytes2, VALUE_SCHEMA_ID_2, 3, MapperValueType.PUT));
        Collections.reverse(arrayList);
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(TestChunkingUtils.createChunkBytes(0, 5), arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), createChunkBytes2);
    }

    @Test
    public void testDeleteValueAtTheEndWithCompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 0);
        createKafkaInputMapperValues.add(0, createRegularValue(new byte[0], -1, 11, MapperValueType.DELETE));
        Assert.assertNull(this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues.iterator()));
    }

    @Test
    public void testDeleteValueAtTheEndWithIncompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        List<BytesWritable> createKafkaInputMapperValues = createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 0);
        createKafkaInputMapperValues.remove(ThreadLocalRandom.current().nextInt(createKafkaInputMapperValues.size() - 2) + 1);
        createKafkaInputMapperValues.add(0, createRegularValue(new byte[0], -1, 11, MapperValueType.DELETE));
        Assert.assertNull(this.chunkAssembler.assembleAndGetValue(createChunkBytes, createKafkaInputMapperValues.iterator()));
    }

    @Test
    public void testDeleteValueAndRegularValues() {
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(10, 10);
        byte[] createChunkBytes2 = TestChunkingUtils.createChunkBytes(20, 20);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(createRegularValue(createChunkBytes, VALUE_SCHEMA_ID_2, 1, MapperValueType.PUT));
        arrayList.add(createRegularValue(createChunkBytes2, VALUE_SCHEMA_ID_2, 2, MapperValueType.PUT));
        arrayList.add(createRegularValue(new byte[0], -1, 3, MapperValueType.DELETE));
        Collections.reverse(arrayList);
        Assert.assertNull(this.chunkAssembler.assembleAndGetValue(TestChunkingUtils.createChunkBytes(0, 5), arrayList.iterator()));
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Cannot assemble a large value. Missing .*")
    public void testRegularValueAndIncompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        ArrayList arrayList = new ArrayList(12);
        arrayList.addAll(createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 1));
        arrayList.remove(ThreadLocalRandom.current().nextInt(arrayList.size() - 2) + 1);
        arrayList.add(createRegularValue(TestChunkingUtils.createChunkBytes(100, 23), VALUE_SCHEMA_ID_2, 0, MapperValueType.PUT));
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID_2);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(100, 23));
    }

    @Test
    public void testRegularValueAndCompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 1));
        arrayList.add(createRegularValue(TestChunkingUtils.createChunkBytes(100, 23), VALUE_SCHEMA_ID_2, 0, MapperValueType.PUT));
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 200));
    }

    @Test
    public void testDeleteValueAndCompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 1));
        arrayList.add(createRegularValue(new byte[0], -1, 0, MapperValueType.DELETE));
        ChunkAssembler.ValueBytesAndSchemaId assembleAndGetValue = this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator());
        Assert.assertNotNull(assembleAndGetValue);
        Assert.assertEquals(assembleAndGetValue.getSchemaID(), VALUE_SCHEMA_ID);
        Assert.assertEquals(assembleAndGetValue.getBytes(), TestChunkingUtils.createChunkBytes(0, 200));
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Cannot assemble a large value. Missing .*")
    public void testDeleteValueAndIncompleteLargeValue() {
        new ChunkedValueManifest().keysWithChunkIdSuffix = new ArrayList(10);
        ChunkInfo chunkInfo = new ChunkInfo(10, 20);
        byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(0, 5);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(createKafkaInputMapperValues(createChunkBytes, chunkInfo, null, 12, 34, VALUE_SCHEMA_ID, 1));
        arrayList.remove(ThreadLocalRandom.current().nextInt(arrayList.size() - 2) + 1);
        arrayList.add(createRegularValue(new byte[0], -1, 0, MapperValueType.DELETE));
        Assert.assertNull(this.chunkAssembler.assembleAndGetValue(createChunkBytes, arrayList.iterator()));
    }

    private BytesWritable createRegularValue(byte[] bArr, int i, int i2, MapperValueType mapperValueType) {
        KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
        kafkaInputMapperValue.chunkedKeySuffix = ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX));
        kafkaInputMapperValue.schemaId = i;
        kafkaInputMapperValue.offset = i2;
        kafkaInputMapperValue.value = ByteBuffer.wrap(bArr);
        kafkaInputMapperValue.valueType = mapperValueType;
        kafkaInputMapperValue.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
        kafkaInputMapperValue.replicationMetadataVersionId = 1;
        return serialize(kafkaInputMapperValue);
    }

    private List<BytesWritable> createKafkaInputMapperValues(byte[] bArr, ChunkInfo chunkInfo, ChunkInfo chunkInfo2, int i, int i2, int i3, int i4) {
        ArrayList arrayList = chunkInfo2 == null ? new ArrayList(chunkInfo.totalChunkCount + 1) : new ArrayList(chunkInfo.totalChunkCount + chunkInfo2.totalChunkCount + 1);
        KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
        ChunkedValueManifest chunkedValueManifest = new ChunkedValueManifest();
        chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList(chunkInfo.totalChunkCount);
        ChunkedValueManifest chunkedValueManifest2 = null;
        int i5 = i4;
        int i6 = 0;
        for (int i7 = 0; i7 < chunkInfo.totalChunkCount; i7++) {
            byte[] createChunkBytes = TestChunkingUtils.createChunkBytes(i6, chunkInfo.eachCountSizeInBytes);
            KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
            kafkaInputMapperValue.valueType = MapperValueType.PUT;
            kafkaInputMapperValue.offset = i5;
            i5++;
            kafkaInputMapperValue.schemaId = CHUNK_VALUE_SCHEMA_ID;
            kafkaInputMapperValue.value = ByteBuffer.wrap(createChunkBytes);
            ChunkedKeySuffix createChunkedKeySuffix = TestChunkingUtils.createChunkedKeySuffix(i, i2, i7);
            kafkaInputMapperValue.chunkedKeySuffix = ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", createChunkedKeySuffix));
            kafkaInputMapperValue.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
            arrayList.add(serialize(kafkaInputMapperValue));
            i6 += chunkInfo.eachCountSizeInBytes;
            chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithChunkingSuffixSerializer.serializeChunkedKey(bArr, createChunkedKeySuffix));
        }
        if (chunkInfo2 != null) {
            chunkedValueManifest2 = new ChunkedValueManifest();
            chunkedValueManifest2.keysWithChunkIdSuffix = new ArrayList(chunkInfo2.totalChunkCount);
            int i8 = 0;
            for (int i9 = 0; i9 < chunkInfo2.totalChunkCount; i9++) {
                byte[] createChunkBytes2 = TestChunkingUtils.createChunkBytes(i8, chunkInfo2.eachCountSizeInBytes);
                KafkaInputMapperValue kafkaInputMapperValue2 = new KafkaInputMapperValue();
                kafkaInputMapperValue2.valueType = MapperValueType.PUT;
                kafkaInputMapperValue2.offset = i5;
                i5++;
                kafkaInputMapperValue2.schemaId = CHUNK_VALUE_SCHEMA_ID;
                kafkaInputMapperValue2.value = ByteBuffer.wrap(new byte[0]);
                ChunkedKeySuffix createChunkedKeySuffix2 = TestChunkingUtils.createChunkedKeySuffix(i, i2, i9);
                kafkaInputMapperValue2.chunkedKeySuffix = ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", createChunkedKeySuffix2));
                kafkaInputMapperValue2.replicationMetadataPayload = ByteBuffer.wrap(createChunkBytes2);
                arrayList.add(serialize(kafkaInputMapperValue2));
                i8 += chunkInfo2.eachCountSizeInBytes;
                chunkedValueManifest2.keysWithChunkIdSuffix.add(keyWithChunkingSuffixSerializer.serializeChunkedKey(bArr, createChunkedKeySuffix2));
            }
        }
        chunkedValueManifest.schemaId = i3;
        chunkedValueManifest.size = chunkInfo.totalChunkCount * chunkInfo.eachCountSizeInBytes;
        if (chunkInfo2 != null) {
            chunkedValueManifest2.schemaId = i3;
            chunkedValueManifest2.size = chunkInfo2.totalChunkCount * chunkInfo2.eachCountSizeInBytes;
        }
        KafkaInputMapperValue kafkaInputMapperValue3 = new KafkaInputMapperValue();
        kafkaInputMapperValue3.valueType = MapperValueType.PUT;
        kafkaInputMapperValue3.offset = i5;
        kafkaInputMapperValue3.schemaId = CHUNK_MANIFEST_SCHEMA_ID;
        kafkaInputMapperValue3.value = ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest));
        kafkaInputMapperValue3.chunkedKeySuffix = ByteBuffer.wrap(CHUNKED_KEY_SUFFIX_SERIALIZER.serialize("", KeyWithChunkingSuffixSerializer.NON_CHUNK_KEY_SUFFIX));
        kafkaInputMapperValue3.replicationMetadataPayload = chunkInfo2 == null ? ByteBuffer.wrap(new byte[0]) : ByteBuffer.wrap(CHUNKED_VALUE_MANIFEST_SERIALIZER.serialize("", chunkedValueManifest2));
        kafkaInputMapperValue3.replicationMetadataVersionId = 1;
        arrayList.add(serialize(kafkaInputMapperValue3));
        Collections.reverse(arrayList);
        return arrayList;
    }

    private KafkaInputMapperValue deserialize(BytesWritable bytesWritable) {
        return (KafkaInputMapperValue) KAFKA_INPUT_MAPPER_VALUE_AVRO_SPECIFIC_DESERIALIZER.deserialize(bytesWritable.copyBytes());
    }

    private BytesWritable serialize(KafkaInputMapperValue kafkaInputMapperValue) {
        return new BytesWritable(KAFKA_INPUT_MAPPER_VALUE_AVRO_SPECIFIC_SERIALIZER.serialize(kafkaInputMapperValue));
    }
}
