package com.linkedin.davinci.validation;

import com.linkedin.venice.exceptions.validation.DataValidationException;
import com.linkedin.venice.exceptions.validation.MissingDataException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.validation.ProducerTracker;
import com.linkedin.venice.kafka.validation.Segment;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/validation/KafkaDataIntegrityValidatorTest.class */
public class KafkaDataIntegrityValidatorTest {
    private static final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @Test
    public void testStatelessDIV() {
        String str = Utils.getUniqueString("TestStore") + "_v1";
        KafkaDataIntegrityValidator kafkaDataIntegrityValidator = new KafkaDataIntegrityValidator(str, TimeUnit.HOURS.toMillis(24L));
        GUID guid = GuidUtils.getGUID(VeniceProperties.empty());
        kafkaDataIntegrityValidator.checkMissingMessage(buildConsumerRecord(str, 0, 100L, guid, 0, 100, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(28L)), Optional.empty());
        kafkaDataIntegrityValidator.checkMissingMessage(buildConsumerRecord(str, 0, 101L, guid, 0, 101, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(27L)), Optional.empty());
        kafkaDataIntegrityValidator.checkMissingMessage(buildConsumerRecord(str, 0, 200L, guid, 0, 103, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(20L)), Optional.empty());
        PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> buildConsumerRecord = buildConsumerRecord(str, 0, 205L, guid, 0, 105, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(10L));
        Assert.assertThrows(MissingDataException.class, () -> {
            kafkaDataIntegrityValidator.checkMissingMessage(buildConsumerRecord, Optional.empty());
        });
        ProducerTracker.DIVErrorMetricCallback dIVErrorMetricCallback = (ProducerTracker.DIVErrorMetricCallback) Mockito.mock(ProducerTracker.DIVErrorMetricCallback.class);
        PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> buildConsumerRecord2 = buildConsumerRecord(str, 0, 206L, guid, 2, 1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(10L));
        Assert.assertThrows(MissingDataException.class, () -> {
            kafkaDataIntegrityValidator.checkMissingMessage(buildConsumerRecord2, Optional.of(dIVErrorMetricCallback));
        });
        ((ProducerTracker.DIVErrorMetricCallback) Mockito.verify(dIVErrorMetricCallback, Mockito.times(1))).execute((DataValidationException) Mockito.any());
    }

    private static PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> buildConsumerRecord(String str, int i, long j, GUID guid, int i2, int i3, long j2) {
        KafkaKey deserialize = new KafkaKeySerializer().deserialize(str, "key".getBytes());
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.messageType = MessageType.PUT.getValue();
        ProducerMetadata producerMetadata = new ProducerMetadata();
        producerMetadata.producerGUID = guid;
        Segment segment = new Segment(i, i2, CheckSumType.MD5);
        segment.setSequenceNumber(i3);
        producerMetadata.segmentNumber = segment.getSegmentNumber();
        producerMetadata.messageSequenceNumber = segment.getSequenceNumber();
        producerMetadata.messageTimestamp = j2;
        kafkaMessageEnvelope.producerMetadata = producerMetadata;
        kafkaMessageEnvelope.leaderMetadataFooter = new LeaderMetadata();
        kafkaMessageEnvelope.leaderMetadataFooter.upstreamOffset = -1L;
        Put put = new Put();
        put.putValue = ByteBuffer.wrap("value".getBytes());
        put.schemaId = 0;
        put.replicationMetadataVersionId = -1;
        put.replicationMetadataPayload = ByteBuffer.wrap(new byte[0]);
        kafkaMessageEnvelope.payloadUnion = put;
        return new ImmutablePubSubMessage(deserialize, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(str), i), j, j2, 0);
    }
}
