package com.linkedin.venice.kafka.validation;

import com.linkedin.venice.exceptions.validation.DuplicateDataException;
import com.linkedin.venice.exceptions.validation.MissingDataException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.EndOfSegment;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.StartOfSegment;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Optional;
import org.apache.avro.specific.FixedSize;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/kafka/validation/TestProducerTracker.class */
public class TestProducerTracker {
    private ProducerTracker producerTracker;
    private GUID guid;
    private String topic;
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private static final int CONTROL_MESSAGE_KAFKA_KEY_LENGTH = GUID.class.getAnnotation(FixedSize.class).value() + 64;

    @BeforeMethod(alwaysRun = true)
    public void methodSetUp() {
        String str = "test_guid_" + System.currentTimeMillis();
        this.guid = new GUID();
        this.guid.bytes(str.getBytes());
        this.topic = "test_topic_" + System.currentTimeMillis() + "_v1";
        this.producerTracker = new ProducerTracker(this.guid, this.topic);
    }

    private KafkaMessageEnvelope getKafkaMessageEnvelope(MessageType messageType, GUID guid, Segment segment, Optional<Integer> optional, Object obj) {
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.messageType = messageType.getValue();
        ProducerMetadata producerMetadata = new ProducerMetadata();
        producerMetadata.producerGUID = guid;
        producerMetadata.segmentNumber = segment.getSegmentNumber();
        if (optional.isPresent()) {
            producerMetadata.messageSequenceNumber = optional.get().intValue();
        } else {
            producerMetadata.messageSequenceNumber = segment.getAndIncrementSequenceNumber();
        }
        producerMetadata.messageTimestamp = System.currentTimeMillis();
        producerMetadata.logicalTimestamp = -1L;
        kafkaMessageEnvelope.producerMetadata = producerMetadata;
        kafkaMessageEnvelope.payloadUnion = obj;
        return kafkaMessageEnvelope;
    }

    private KafkaKey getControlMessageKey(KafkaMessageEnvelope kafkaMessageEnvelope) {
        return new KafkaKey(MessageType.CONTROL_MESSAGE, ByteBuffer.allocate(CONTROL_MESSAGE_KAFKA_KEY_LENGTH).put(kafkaMessageEnvelope.producerMetadata.producerGUID.bytes()).putInt(kafkaMessageEnvelope.producerMetadata.segmentNumber).putInt(kafkaMessageEnvelope.producerMetadata.messageSequenceNumber).array());
    }

    private KafkaKey getPutMessageKey(byte[] bArr) {
        return new KafkaKey(MessageType.PUT, bArr);
    }

    private Put getPutMessage(byte[] bArr) {
        Put put = new Put();
        put.putValue = ByteBuffer.wrap(bArr);
        put.schemaId = -1;
        return put;
    }

    private ControlMessage getStartOfSegment() {
        return getStartOfSegment(CheckSumType.NONE);
    }

    private ControlMessage getStartOfSegment(CheckSumType checkSumType) {
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue();
        StartOfSegment startOfSegment = new StartOfSegment();
        startOfSegment.upcomingAggregates = new ArrayList();
        startOfSegment.upcomingAggregates.add("baz");
        startOfSegment.checksumType = checkSumType.getValue();
        controlMessage.controlMessageUnion = startOfSegment;
        controlMessage.debugInfo = new HashMap();
        controlMessage.debugInfo.put("foo", "bar");
        return controlMessage;
    }

    private ControlMessage getEndOfSegment() {
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageType = ControlMessageType.END_OF_SEGMENT.getValue();
        controlMessage.controlMessageUnion = new EndOfSegment();
        return controlMessage;
    }

    @Test
    public void testSequenceNumber() {
        Segment segment = new Segment(0, 0, CheckSumType.NONE);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(this.topic), 0);
        KafkaMessageEnvelope kafkaMessageEnvelope = getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, this.guid, segment, Optional.empty(), getStartOfSegment());
        getControlMessageKey(kafkaMessageEnvelope);
        long j = 10 + 1;
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope, kafkaMessageEnvelope, pubSubTopicPartitionImpl, 10L, System.currentTimeMillis() + 1000, 0), false, Lazy.FALSE);
        KafkaMessageEnvelope kafkaMessageEnvelope2 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment, Optional.empty(), getPutMessage("first_message".getBytes()));
        getPutMessageKey("first_key".getBytes());
        long j2 = j + 1;
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope2, kafkaMessageEnvelope2, pubSubTopicPartitionImpl, j, System.currentTimeMillis() + 1000, 0), false, Lazy.FALSE);
        KafkaMessageEnvelope kafkaMessageEnvelope3 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment, Optional.of(100), getPutMessage("second_message".getBytes()));
        getPutMessageKey("second_key".getBytes());
        long j3 = j2 + 1;
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(kafkaMessageEnvelope3, kafkaMessageEnvelope3, pubSubTopicPartitionImpl, j2, System.currentTimeMillis() + 1000, 0);
        Assert.assertThrows(MissingDataException.class, () -> {
            this.producerTracker.validateMessage(immutablePubSubMessage, false, Lazy.FALSE);
        });
        KafkaMessageEnvelope kafkaMessageEnvelope4 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment, Optional.of(2), getPutMessage("third_message".getBytes()));
        getPutMessageKey("third_key".getBytes());
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope4, kafkaMessageEnvelope4, pubSubTopicPartitionImpl, j3, System.currentTimeMillis() + 1000, 0), false, Lazy.FALSE);
        KafkaMessageEnvelope kafkaMessageEnvelope5 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment, Optional.of(100), getPutMessage("fourth_message".getBytes()));
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(getPutMessageKey("fourth_key".getBytes()), kafkaMessageEnvelope5, pubSubTopicPartitionImpl, j3 + 1, System.currentTimeMillis() + 1000, 0), false, Lazy.TRUE);
    }

    @Test
    public void testSegmentNumber() {
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(this.topic), 0);
        Segment segment = new Segment(0, 0, CheckSumType.NONE);
        Segment segment2 = new Segment(0, 2, CheckSumType.NONE);
        KafkaMessageEnvelope kafkaMessageEnvelope = getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, this.guid, segment, Optional.empty(), getStartOfSegment());
        getControlMessageKey(kafkaMessageEnvelope);
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope, kafkaMessageEnvelope, pubSubTopicPartitionImpl, 10L, System.currentTimeMillis() + 1000, 0), true, Lazy.FALSE);
        KafkaMessageEnvelope kafkaMessageEnvelope2 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment2, Optional.of(5), getPutMessage("message".getBytes()));
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(getPutMessageKey("key".getBytes()), kafkaMessageEnvelope2, pubSubTopicPartitionImpl, 10 + 1, System.currentTimeMillis() + 1000, 0);
        Assert.assertThrows(MissingDataException.class, () -> {
            this.producerTracker.validateMessage(immutablePubSubMessage, true, Lazy.FALSE);
        });
        this.producerTracker.validateMessage(immutablePubSubMessage, true, Lazy.TRUE);
        Assert.assertThrows(DuplicateDataException.class, () -> {
            this.producerTracker.validateMessage(immutablePubSubMessage, true, Lazy.TRUE);
        });
        Assert.assertEquals(((Segment) this.producerTracker.segments.get(0)).getSegmentNumber(), 2);
        Assert.assertEquals(((Segment) this.producerTracker.segments.get(0)).getSequenceNumber(), 5);
    }

    @Test
    public void testDuplicateMsgsDetected() {
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(this.topic), 0);
        Segment segment = new Segment(0, 0, CheckSumType.MD5);
        KafkaMessageEnvelope kafkaMessageEnvelope = getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, this.guid, segment, Optional.empty(), getStartOfSegment());
        getControlMessageKey(kafkaMessageEnvelope);
        long j = 10 + 1;
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope, kafkaMessageEnvelope, pubSubTopicPartitionImpl, 10L, System.currentTimeMillis() + 1000, 0), true, Lazy.FALSE);
        Assert.assertEquals(((Segment) this.producerTracker.segments.get(0)).getSequenceNumber(), 0);
        KafkaMessageEnvelope kafkaMessageEnvelope2 = getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, this.guid, segment, Optional.of(5), getEndOfSegment());
        getControlMessageKey(kafkaMessageEnvelope2);
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope2, kafkaMessageEnvelope2, pubSubTopicPartitionImpl, j, System.currentTimeMillis() + 1000, 0), true, Lazy.TRUE);
        Assert.assertEquals(((Segment) this.producerTracker.segments.get(0)).getSequenceNumber(), 5);
        KafkaMessageEnvelope kafkaMessageEnvelope3 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment, Optional.of(1), getPutMessage("first_message".getBytes()));
        ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(getPutMessageKey("first_key".getBytes()), kafkaMessageEnvelope3, pubSubTopicPartitionImpl, j + 1, System.currentTimeMillis() + 1000, 0);
        Assert.assertThrows(DuplicateDataException.class, () -> {
            this.producerTracker.validateMessage(immutablePubSubMessage, true, Lazy.TRUE);
        });
        Assert.assertEquals(((Segment) this.producerTracker.segments.get(0)).getSequenceNumber(), 5);
    }

    @Test
    public void testMidSegmentCheckSumStates() {
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(this.topic), 0);
        Segment segment = new Segment(0, 0, CheckSumType.MD5);
        Segment segment2 = new Segment(0, 1, CheckSumType.MD5);
        OffsetRecord offsetRecord = TestUtils.getOffsetRecord(10L);
        KafkaMessageEnvelope kafkaMessageEnvelope = getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, this.guid, segment, Optional.empty(), getStartOfSegment(CheckSumType.MD5));
        getControlMessageKey(kafkaMessageEnvelope);
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(kafkaMessageEnvelope, kafkaMessageEnvelope, pubSubTopicPartitionImpl, 10L, System.currentTimeMillis() + 1000, 0), true, Lazy.FALSE);
        this.producerTracker.updateOffsetRecord(0, offsetRecord);
        Assert.assertEquals(offsetRecord.getProducerPartitionState(this.guid).checksumType, CheckSumType.MD5.getValue());
        KafkaMessageEnvelope kafkaMessageEnvelope2 = getKafkaMessageEnvelope(MessageType.PUT, this.guid, segment2, Optional.empty(), getPutMessage("first_message".getBytes()));
        this.producerTracker.validateMessage(new ImmutablePubSubMessage(getPutMessageKey("first_key".getBytes()), kafkaMessageEnvelope2, pubSubTopicPartitionImpl, 10 + 1, System.currentTimeMillis() + 1000, 0), true, Lazy.TRUE);
        this.producerTracker.updateOffsetRecord(0, offsetRecord);
        Assert.assertEquals(offsetRecord.getProducerPartitionState(this.guid).checksumType, CheckSumType.NONE.getValue());
        Assert.assertEquals(offsetRecord.getProducerPartitionState(this.guid).checksumState, ByteBuffer.wrap(new byte[0]));
    }
}
