package com.linkedin.venice.consumer;

import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/venice/consumer/ConsumerTest.class */
public class ConsumerTest {
    private static final String PRODUCER_METADATA_FIELD = "producerMetadata";
    private static final String MESSAGE_TYPE_FIELD = "messageType";
    private static final String TARGET_VERSION_FIELD = "targetVersion";
    private static final String PAYLOAD_UNION_FIELD = "payloadUnion";
    private static final String NEW_FIELD = "newField";
    private static final KafkaValueSerializer REGULAR_KAFKA_VALUE_SERIALIZER = new KafkaValueSerializer();
    private static final Schema NEW_PROTOCOL_SCHEMA = ConsumerIntegrationTest.NEW_PROTOCOL_SCHEMA;
    private static final int NEW_PROTOCOL_VERSION = ConsumerIntegrationTest.NEW_PROTOCOL_VERSION;

    @Test
    void testForwardCompatibility() {
        GenericRecord messageFromNewProtocol = getMessageFromNewProtocol();
        byte[] serializeNewProtocol = ConsumerIntegrationTest.serializeNewProtocol(messageFromNewProtocol);
        try {
            REGULAR_KAFKA_VALUE_SERIALIZER.deserialize("", serializeNewProtocol);
            Assert.fail("The regular serializer should have failed to decode the message encoded with the new version. The test may have regressed.");
        } catch (VeniceMessageException e) {
        }
        SchemaReader schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
        ((SchemaReader) Mockito.doReturn(NEW_PROTOCOL_SCHEMA).when(schemaReader)).getValueSchema(NEW_PROTOCOL_VERSION);
        KafkaValueSerializer kafkaValueSerializer = new KafkaValueSerializer();
        kafkaValueSerializer.setSchemaReader(schemaReader);
        Assert.assertEquals(kafkaValueSerializer.knownProtocols(), REGULAR_KAFKA_VALUE_SERIALIZER.knownProtocols(), "The obliviousDeserializer should not know the same as the REGULAR_KAFKA_VALUE_SERIALIZER before reading the new version.");
        Assert.assertFalse(kafkaValueSerializer.knownProtocols().contains(NEW_PROTOCOL_VERSION), "The obliviousDeserializer should not know about the new protocol ahead of time.");
        GenericRecord deserialize = kafkaValueSerializer.deserialize("", serializeNewProtocol);
        Assert.assertTrue(kafkaValueSerializer.knownProtocols().contains(NEW_PROTOCOL_VERSION), "The obliviousDeserializer should know about the new protocol after encountering it.");
        Arrays.asList(PRODUCER_METADATA_FIELD, MESSAGE_TYPE_FIELD, PAYLOAD_UNION_FIELD).stream().forEach(str -> {
            Assert.assertEquals(deserialize.get(str), messageFromNewProtocol.get(str), "Field '" + str + "' is not equal pre- and post-serialization.");
        });
        try {
            deserialize.get(NEW_FIELD);
            Assert.fail("The new field name should not be available because the reader does not want it.");
        } catch (Exception e2) {
            Assert.assertEquals(e2.getClass(), NullPointerException.class);
        }
        Assert.assertNotEquals(deserialize, messageFromNewProtocol, "The two records should not be completely equal pre- and post-serialization since the new field should be ignored.");
    }

    public static GenericRecord getMessageFromNewProtocol() {
        GenericData.Record record = new GenericData.Record(NEW_PROTOCOL_SCHEMA);
        ProducerMetadata producerMetadata = new ProducerMetadata();
        producerMetadata.producerGUID = GuidUtils.getGUID(VeniceProperties.empty());
        producerMetadata.messageTimestamp = System.currentTimeMillis();
        producerMetadata.segmentNumber = 0;
        producerMetadata.messageSequenceNumber = 0;
        record.put(PRODUCER_METADATA_FIELD, producerMetadata);
        record.put(MESSAGE_TYPE_FIELD, Integer.valueOf(MessageType.PUT.getValue()));
        Put put = new Put();
        put.schemaId = 1;
        put.putValue = ByteBuffer.allocate(1);
        put.replicationMetadataVersionId = -1;
        put.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
        record.put(PAYLOAD_UNION_FIELD, put);
        record.put(NEW_FIELD, 1);
        return record;
    }
}
