package com.linkedin.venice.writer;

import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.storage.protocol.ChunkId;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/venice/writer/VeniceWriterTest.class */
public class VeniceWriterTest {
    private PubSubBrokerWrapper pubSubBrokerWrapper;
    private TopicManager topicManager;
    private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory;
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @BeforeClass
    public void setUp() {
        this.pubSubBrokerWrapper = ServiceFactory.getPubSubBroker();
        this.pubSubConsumerAdapterFactory = IntegrationTestPushUtils.getVeniceConsumerFactory();
        this.topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, this.pubSubBrokerWrapper.getAddress(), this.pubSubTopicRepository).getTopicManager();
    }

    @AfterClass
    public void cleanUp() throws IOException {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.topicManager, this.pubSubBrokerWrapper});
    }

    private void testThreadSafety(int i, Consumer<VeniceWriter<KafkaKey, byte[], byte[]>> consumer) throws ExecutionException, InterruptedException {
        Map poll;
        String uniqueTopicString = TestUtils.getUniqueTopicString("topic-for-vw-thread-safety");
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(uniqueTopicString);
        this.topicManager.createTopic(topic, 1, 1, true);
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", this.pubSubBrokerWrapper.getAddress());
        properties.put("partitioner.class", DefaultVenicePartitioner.class.getName());
        ExecutorService executorService = null;
        try {
            VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(properties).createVeniceWriter(new VeniceWriterOptions.Builder(uniqueTopicString).setUseKafkaKeySerializer(true).setPartitionCount(1).build());
            try {
                executorService = Executors.newFixedThreadPool(i);
                Future[] futureArr = new Future[i];
                for (int i2 = 0; i2 < i; i2++) {
                    futureArr[i2] = executorService.submit(() -> {
                        consumer.accept(createVeniceWriter);
                    });
                }
                for (int i3 = 0; i3 < i; i3++) {
                    futureArr[i3].get();
                }
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
                TestUtils.shutdownExecutor(executorService);
                PubSubConsumerAdapter create = this.pubSubConsumerAdapterFactory.create(new VeniceProperties(properties), false, new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)), this.pubSubBrokerWrapper.getAddress());
                try {
                    PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, 0);
                    create.subscribe(pubSubTopicPartitionImpl, -1L);
                    int i4 = -1;
                    int i5 = -1;
                    do {
                        poll = create.poll(10000L);
                        if (poll.containsKey(pubSubTopicPartitionImpl)) {
                            Iterator it = ((List) poll.get(pubSubTopicPartitionImpl)).iterator();
                            while (it.hasNext()) {
                                ProducerMetadata producerMetadata = ((KafkaMessageEnvelope) ((PubSubMessage) it.next()).getValue()).producerMetadata;
                                int i6 = producerMetadata.segmentNumber;
                                int i7 = producerMetadata.messageSequenceNumber;
                                if (i6 == i5 && i7 == i4 + 1) {
                                    i4 = i7;
                                } else if (i6 == i5 + 1 && i7 == 0) {
                                    i5 = i6;
                                    i4 = i7;
                                } else {
                                    Assert.fail("DIV Error caught.\nLast segment Number: " + i5 + ". Current segment number: " + i6 + ".\nLast sequence Number: " + i4 + ". Current sequence number: " + i7 + ".");
                                }
                            }
                        }
                    } while (!poll.isEmpty());
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createVeniceWriter != null) {
                    try {
                        createVeniceWriter.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            TestUtils.shutdownExecutor(executorService);
            throw th5;
        }
    }

    @Test(invocationCount = 3)
    public void testThreadSafetyForPutMessages() throws ExecutionException, InterruptedException {
        testThreadSafety(100, veniceWriter -> {
            veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, (PubSubProducerCallback) null);
        });
    }

    @Test
    public void testCloseSegmentBasedOnElapsedTime() throws InterruptedException, ExecutionException, TimeoutException {
        PubSubProducerAdapter pubSubProducerAdapter = (PubSubProducerAdapter) Mockito.mock(PubSubProducerAdapter.class);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) Mockito.any()))).thenReturn(1);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) Mockito.any(), Mockito.anyInt(), (TimeUnit) Mockito.any()))).thenReturn(1);
        Mockito.when(pubSubProducerAdapter.sendMessage((String) Mockito.any(), (Integer) Mockito.any(), (KafkaKey) Mockito.any(), (KafkaMessageEnvelope) Mockito.any(), (PubSubMessageHeaders) Mockito.any(), (PubSubProducerCallback) Mockito.any())).thenReturn(future);
        Properties properties = new Properties();
        properties.put("venice.writer.max.elapsed.time.for.segment.in.ms", 0);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"string\"");
        VeniceWriter veniceWriter = new VeniceWriter(new VeniceWriterOptions.Builder("test").setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer).setWriteComputeSerializer(veniceAvroKafkaSerializer).setPartitioner(new DefaultVenicePartitioner()).setTime(SystemTime.INSTANCE).build(), new VeniceProperties(properties), pubSubProducerAdapter);
        for (int i = 0; i < 1000; i++) {
            veniceWriter.put(Integer.toString(i), Integer.toString(i), 1, (PubSubProducerCallback) null);
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaMessageEnvelope.class);
        ((PubSubProducerAdapter) Mockito.verify(pubSubProducerAdapter, Mockito.atLeast(1000))).sendMessage((String) Mockito.any(), (Integer) Mockito.any(), (KafkaKey) Mockito.any(), (KafkaMessageEnvelope) forClass.capture(), (PubSubMessageHeaders) Mockito.any(), (PubSubProducerCallback) Mockito.any());
        int i2 = -1;
        for (KafkaMessageEnvelope kafkaMessageEnvelope : forClass.getAllValues()) {
            if (i2 == -1) {
                i2 = kafkaMessageEnvelope.producerMetadata.segmentNumber;
            } else {
                Assert.assertEquals(kafkaMessageEnvelope.producerMetadata.segmentNumber, i2);
            }
        }
    }

    @Test
    public void testReplicationMetadataWrittenCorrectly() throws InterruptedException, ExecutionException, TimeoutException {
        PubSubProducerAdapter pubSubProducerAdapter = (PubSubProducerAdapter) Mockito.mock(PubSubProducerAdapter.class);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) Mockito.any()))).thenReturn(1);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) Mockito.any(), Mockito.anyInt(), (TimeUnit) Mockito.any()))).thenReturn(1);
        Mockito.when(pubSubProducerAdapter.sendMessage((String) Mockito.any(), (Integer) Mockito.any(), (KafkaKey) Mockito.any(), (KafkaMessageEnvelope) Mockito.any(), (PubSubMessageHeaders) Mockito.any(), (PubSubProducerCallback) Mockito.any())).thenReturn(future);
        Properties properties = new Properties();
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"string\"");
        VeniceWriter veniceWriter = new VeniceWriter(new VeniceWriterOptions.Builder("test").setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer).setWriteComputeSerializer(veniceAvroKafkaSerializer).setPartitioner(new DefaultVenicePartitioner()).setTime(SystemTime.INSTANCE).build(), new VeniceProperties(properties), pubSubProducerAdapter);
        long currentTimeMillis = System.currentTimeMillis();
        ByteBuffer wrap = ByteBuffer.wrap(new byte[]{10, 11});
        PutMetadata putMetadata = new PutMetadata(1, wrap);
        DeleteMetadata deleteMetadata = new DeleteMetadata(1, 1, wrap);
        veniceWriter.put(Integer.toString(1), Integer.toString(1), 1, (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, currentTimeMillis, (PutMetadata) null);
        veniceWriter.put(Integer.toString(2), Integer.toString(2), 1, (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, -2L, putMetadata);
        veniceWriter.update(Integer.toString(3), Integer.toString(2), 1, 1, (PubSubProducerCallback) null, currentTimeMillis);
        veniceWriter.delete(Integer.toString(4), (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, currentTimeMillis);
        veniceWriter.delete(Integer.toString(5), (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, deleteMetadata);
        veniceWriter.put(Integer.toString(6), Integer.toString(1), 1, (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaMessageEnvelope.class);
        ((PubSubProducerAdapter) Mockito.verify(pubSubProducerAdapter, Mockito.atLeast(2))).sendMessage((String) Mockito.any(), (Integer) Mockito.any(), (KafkaKey) Mockito.any(), (KafkaMessageEnvelope) forClass.capture(), (PubSubMessageHeaders) Mockito.any(), (PubSubProducerCallback) Mockito.any());
        Assert.assertEquals(((KafkaMessageEnvelope) forClass.getAllValues().get(0)).producerMetadata.logicalTimestamp, -1L);
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) forClass.getAllValues().get(1);
        KafkaMessageEnvelope kafkaMessageEnvelope2 = (KafkaMessageEnvelope) forClass.getAllValues().get(3);
        KafkaMessageEnvelope kafkaMessageEnvelope3 = (KafkaMessageEnvelope) forClass.getAllValues().get(4);
        Iterator it = Arrays.asList(kafkaMessageEnvelope, kafkaMessageEnvelope2, kafkaMessageEnvelope3).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((KafkaMessageEnvelope) it.next()).producerMetadata.logicalTimestamp, currentTimeMillis);
        }
        Put put = (Put) kafkaMessageEnvelope.payloadUnion;
        Assert.assertEquals(put.schemaId, 1);
        Assert.assertEquals(put.replicationMetadataVersionId, -1);
        Assert.assertEquals(put.replicationMetadataPayload, ByteBuffer.wrap(new byte[0]));
        Delete delete = (Delete) kafkaMessageEnvelope3.payloadUnion;
        Assert.assertEquals(delete.schemaId, -1);
        Assert.assertEquals(delete.replicationMetadataVersionId, -1);
        Assert.assertEquals(delete.replicationMetadataPayload, ByteBuffer.wrap(new byte[0]));
        KafkaMessageEnvelope kafkaMessageEnvelope4 = (KafkaMessageEnvelope) forClass.getAllValues().get(2);
        Assert.assertEquals(kafkaMessageEnvelope4.messageType, MessageType.PUT.getValue());
        Put put2 = (Put) kafkaMessageEnvelope4.payloadUnion;
        Assert.assertEquals(put2.schemaId, 1);
        Assert.assertEquals(put2.replicationMetadataVersionId, 1);
        Assert.assertEquals(put2.replicationMetadataPayload, ByteBuffer.wrap(new byte[]{10, 11}));
        Assert.assertEquals(kafkaMessageEnvelope4.producerMetadata.logicalTimestamp, -2L);
        KafkaMessageEnvelope kafkaMessageEnvelope5 = (KafkaMessageEnvelope) forClass.getAllValues().get(5);
        Assert.assertEquals(kafkaMessageEnvelope5.messageType, MessageType.DELETE.getValue());
        Delete delete2 = (Delete) kafkaMessageEnvelope5.payloadUnion;
        Assert.assertEquals(delete2.schemaId, 1);
        Assert.assertEquals(delete2.replicationMetadataVersionId, 1);
        Assert.assertEquals(delete2.replicationMetadataPayload, ByteBuffer.wrap(new byte[]{10, 11}));
        Assert.assertEquals(kafkaMessageEnvelope5.producerMetadata.logicalTimestamp, -2L);
        KafkaMessageEnvelope kafkaMessageEnvelope6 = (KafkaMessageEnvelope) forClass.getAllValues().get(6);
        Assert.assertEquals(kafkaMessageEnvelope6.messageType, MessageType.PUT.getValue());
        Assert.assertEquals(kafkaMessageEnvelope6.producerMetadata.logicalTimestamp, -2L);
    }

    @Test(timeOut = 10000)
    public void testReplicationMetadataChunking() throws ExecutionException, InterruptedException, TimeoutException {
        PubSubProducerAdapter pubSubProducerAdapter = (PubSubProducerAdapter) Mockito.mock(PubSubProducerAdapter.class);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) Mockito.any()))).thenReturn(1);
        Mockito.when(Integer.valueOf(pubSubProducerAdapter.getNumberOfPartitions((String) Mockito.any(), Mockito.anyInt(), (TimeUnit) Mockito.any()))).thenReturn(1);
        Mockito.when(pubSubProducerAdapter.sendMessage((String) Mockito.any(), (Integer) Mockito.any(), (KafkaKey) Mockito.any(), (KafkaMessageEnvelope) Mockito.any(), (PubSubMessageHeaders) Mockito.any(), (PubSubProducerCallback) Mockito.any())).thenReturn(future);
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"string\"");
        VeniceWriter veniceWriter = new VeniceWriter(new VeniceWriterOptions.Builder("test").setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer).setWriteComputeSerializer(veniceAvroKafkaSerializer).setPartitioner(new DefaultVenicePartitioner()).setTime(SystemTime.INSTANCE).setChunkingEnabled(true).setRmdChunkingEnabled(true).build(), new VeniceProperties(new Properties()), pubSubProducerAdapter);
        ByteBuffer wrap = ByteBuffer.wrap(new byte[]{10, 11});
        PutMetadata putMetadata = new PutMetadata(1, wrap);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 50000; i++) {
            sb.append("abcdefghabcdefghabcdefghabcdefgh");
        }
        String sb2 = sb.toString();
        veniceWriter.put(Integer.toString(1), sb2, 1, (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, -2L, putMetadata);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaKey.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaMessageEnvelope.class);
        ((PubSubProducerAdapter) Mockito.verify(pubSubProducerAdapter, Mockito.atLeast(2))).sendMessage((String) Mockito.any(), (Integer) Mockito.any(), (KafkaKey) forClass.capture(), (KafkaMessageEnvelope) forClass2.capture(), (PubSubMessageHeaders) Mockito.any(), (PubSubProducerCallback) Mockito.any());
        KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
        byte[] serialize = veniceAvroKafkaSerializer.serialize("test", Integer.toString(1));
        byte[] serialize2 = veniceAvroKafkaSerializer.serialize("test", sb2);
        byte[] array = wrap.array();
        int length = 972800 - serialize.length;
        Assert.assertEquals(forClass2.getAllValues().size(), 5);
        KafkaMessageEnvelope kafkaMessageEnvelope = (KafkaMessageEnvelope) forClass2.getAllValues().get(1);
        Assert.assertEquals(kafkaMessageEnvelope.messageType, MessageType.PUT.getValue());
        Assert.assertEquals(((Put) kafkaMessageEnvelope.payloadUnion).schemaId, -10);
        Assert.assertEquals(((Put) kafkaMessageEnvelope.payloadUnion).replicationMetadataVersionId, -1);
        Assert.assertEquals(((Put) kafkaMessageEnvelope.payloadUnion).replicationMetadataPayload, ByteBuffer.allocate(0));
        Assert.assertEquals(((Put) kafkaMessageEnvelope.payloadUnion).putValue.array().length, length + 4);
        Assert.assertEquals(kafkaMessageEnvelope.producerMetadata.logicalTimestamp, -1L);
        KafkaMessageEnvelope kafkaMessageEnvelope2 = (KafkaMessageEnvelope) forClass2.getAllValues().get(2);
        Assert.assertEquals(kafkaMessageEnvelope2.messageType, MessageType.PUT.getValue());
        Assert.assertEquals(((Put) kafkaMessageEnvelope2.payloadUnion).schemaId, -10);
        Assert.assertEquals(((Put) kafkaMessageEnvelope2.payloadUnion).replicationMetadataVersionId, -1);
        Assert.assertEquals(((Put) kafkaMessageEnvelope2.payloadUnion).replicationMetadataPayload, ByteBuffer.allocate(0));
        Assert.assertEquals(((Put) kafkaMessageEnvelope2.payloadUnion).putValue.array().length, (serialize2.length - length) + 4);
        Assert.assertEquals(kafkaMessageEnvelope2.producerMetadata.logicalTimestamp, -1L);
        ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true);
        ChunkedValueManifest chunkedValueManifest = new ChunkedValueManifest();
        chunkedValueManifest.schemaId = 1;
        chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList(2);
        chunkedValueManifest.size = serialize2.length;
        ChunkedKeySuffix chunkedKeySuffix = new ChunkedKeySuffix();
        chunkedKeySuffix.isChunk = true;
        chunkedKeySuffix.chunkId = new ChunkId();
        chunkedKeySuffix.chunkId.chunkIndex = 0;
        ProducerMetadata producerMetadata = kafkaMessageEnvelope.producerMetadata;
        chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID;
        chunkedKeySuffix.chunkId.segmentNumber = producerMetadata.segmentNumber;
        chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber;
        ByteBuffer serializeChunkedKey = keyWithChunkingSuffixSerializer.serializeChunkedKey(serialize, chunkedKeySuffix);
        chunkedValueManifest.keysWithChunkIdSuffix.add(serializeChunkedKey);
        Assert.assertEquals(((KafkaKey) forClass.getAllValues().get(1)).getKey(), new KafkaKey(MessageType.PUT, serializeChunkedKey.array()).getKey());
        chunkedKeySuffix.chunkId.chunkIndex = 1;
        ByteBuffer serializeChunkedKey2 = keyWithChunkingSuffixSerializer.serializeChunkedKey(serialize, chunkedKeySuffix);
        chunkedValueManifest.keysWithChunkIdSuffix.add(serializeChunkedKey2);
        Assert.assertEquals(((KafkaKey) forClass.getAllValues().get(2)).getKey(), new KafkaKey(MessageType.PUT, serializeChunkedKey2.array()).getKey());
        KafkaMessageEnvelope kafkaMessageEnvelope3 = (KafkaMessageEnvelope) forClass2.getAllValues().get(3);
        Assert.assertEquals(kafkaMessageEnvelope3.messageType, MessageType.PUT.getValue());
        Assert.assertEquals(((Put) kafkaMessageEnvelope3.payloadUnion).schemaId, -10);
        Assert.assertEquals(((Put) kafkaMessageEnvelope3.payloadUnion).replicationMetadataVersionId, -1);
        Assert.assertEquals(((Put) kafkaMessageEnvelope3.payloadUnion).putValue, ByteBuffer.allocate(0));
        Assert.assertEquals(((Put) kafkaMessageEnvelope3.payloadUnion).replicationMetadataPayload.array().length, array.length + 4);
        Assert.assertEquals(kafkaMessageEnvelope3.producerMetadata.logicalTimestamp, -1L);
        ChunkedValueManifest chunkedValueManifest2 = new ChunkedValueManifest();
        chunkedValueManifest2.schemaId = 1;
        chunkedValueManifest2.keysWithChunkIdSuffix = new ArrayList(1);
        chunkedValueManifest2.size = array.length;
        ChunkedKeySuffix chunkedKeySuffix2 = new ChunkedKeySuffix();
        chunkedKeySuffix2.isChunk = true;
        chunkedKeySuffix2.chunkId = new ChunkId();
        ProducerMetadata producerMetadata2 = kafkaMessageEnvelope3.producerMetadata;
        chunkedKeySuffix2.chunkId.producerGUID = producerMetadata2.producerGUID;
        chunkedKeySuffix2.chunkId.segmentNumber = producerMetadata2.segmentNumber;
        chunkedKeySuffix2.chunkId.messageSequenceNumber = producerMetadata2.messageSequenceNumber;
        chunkedKeySuffix2.chunkId.chunkIndex = 2;
        ByteBuffer serializeChunkedKey3 = keyWithChunkingSuffixSerializer.serializeChunkedKey(serialize, chunkedKeySuffix2);
        chunkedValueManifest2.keysWithChunkIdSuffix.add(serializeChunkedKey3);
        Assert.assertEquals(((KafkaKey) forClass.getAllValues().get(3)).getKey(), new KafkaKey(MessageType.PUT, serializeChunkedKey3.array()).getKey());
        Assert.assertEquals(((KafkaKey) forClass.getAllValues().get(4)).getKey(), new KafkaKey(MessageType.PUT, keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serialize)).getKey());
        KafkaMessageEnvelope kafkaMessageEnvelope4 = (KafkaMessageEnvelope) forClass2.getAllValues().get(4);
        Assert.assertEquals(kafkaMessageEnvelope4.messageType, MessageType.PUT.getValue());
        Assert.assertEquals(((Put) kafkaMessageEnvelope4.payloadUnion).schemaId, AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion());
        Assert.assertEquals(((Put) kafkaMessageEnvelope4.payloadUnion).replicationMetadataVersionId, putMetadata.getRmdVersionId());
        Assert.assertEquals(((Put) kafkaMessageEnvelope4.payloadUnion).replicationMetadataPayload, ByteBuffer.wrap(chunkedValueManifestSerializer.serialize("test", chunkedValueManifest2)));
        Assert.assertEquals(((Put) kafkaMessageEnvelope4.payloadUnion).putValue, ByteBuffer.wrap(chunkedValueManifestSerializer.serialize("test", chunkedValueManifest)));
        Assert.assertEquals(kafkaMessageEnvelope4.producerMetadata.logicalTimestamp, -2L);
    }
}
