package com.linkedin.venice.kafka.validation;

import com.linkedin.venice.annotation.Threadsafe;
import com.linkedin.venice.exceptions.validation.CorruptDataException;
import com.linkedin.venice.exceptions.validation.DataValidationException;
import com.linkedin.venice.exceptions.validation.DuplicateDataException;
import com.linkedin.venice.exceptions.validation.ImproperlyStartedSegmentException;
import com.linkedin.venice.exceptions.validation.IncomingDataAfterSegmentEndedException;
import com.linkedin.venice.exceptions.validation.MissingDataException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.EndOfSegment;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.StartOfSegment;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Threadsafe
/* loaded from: input_file:com/linkedin/venice/kafka/validation/ProducerTracker.class */
public class ProducerTracker {
    private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter(65536, RedundantExceptionFilter.DEFAULT_NO_REDUNDANT_EXCEPTION_DURATION_MS);
    private final GUID producerGUID;
    private final String topicName;
    protected final ConcurrentMap<Integer, Segment> segments = new VeniceConcurrentHashMap();
    protected final ConcurrentMap<Integer, ReentrantLock> partitionLocks = new VeniceConcurrentHashMap();
    private final Logger logger = LogManager.getLogger(toString());

    /* loaded from: input_file:com/linkedin/venice/kafka/validation/ProducerTracker$DIVErrorMetricCallback.class */
    public interface DIVErrorMetricCallback {
        void execute(DataValidationException dataValidationException);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/kafka/validation/ProducerTracker$DataFaultType.class */
    public enum DataFaultType {
        DUPLICATE(str -> {
            return new DuplicateDataException(str);
        }),
        MISSING(str2 -> {
            return new MissingDataException(str2);
        }),
        CORRUPT(str3 -> {
            return new CorruptDataException(str3);
        }),
        UNREGISTERED_PRODUCER(str4 -> {
            return new ImproperlyStartedSegmentException(str4);
        });

        final Function<String, DataValidationException> exceptionSupplier;

        DataFaultType(Function function) {
            this.exceptionSupplier = function;
        }

        DataValidationException getNewException(Segment segment, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage) {
            return getNewException(segment, pubSubMessage, Optional.empty());
        }

        DataValidationException getNewException(Segment segment, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, Optional<String> optional) {
            String valueOf;
            String valueOf2;
            ProducerMetadata producerMetadata = pubSubMessage.getValue().producerMetadata;
            MessageType valueOf3 = MessageType.valueOf(pubSubMessage.getValue());
            String name = valueOf3.name();
            if (MessageType.CONTROL_MESSAGE.equals(valueOf3)) {
                name = name + " (" + ControlMessageType.valueOf((ControlMessage) pubSubMessage.getValue().payloadUnion).name() + ")";
            }
            if (segment == null) {
                valueOf2 = "N/A (null segment)";
                valueOf = "N/A (null segment)";
            } else {
                valueOf = String.valueOf(segment.getSegmentNumber());
                valueOf2 = String.valueOf(segment.getSequenceNumber());
            }
            StringBuilder sb = new StringBuilder();
            sb.append(name() + " data detected for producer GUID: ").append(GuidUtils.getHexFromGuid(producerMetadata.producerGUID)).append("; message type: " + name).append("; partition: " + pubSubMessage.getTopicPartition().getPartitionNumber());
            if (segment != null) {
                sb.append(",; previous successful offset (in same segment): " + segment.getLastSuccessfulOffset());
            }
            sb.append("; incoming offset: " + pubSubMessage.getOffset()).append(";previous segment: " + valueOf).append("; incoming segment: " + producerMetadata.segmentNumber).append("; previous sequence number: " + valueOf2).append("; incoming sequence number: " + producerMetadata.messageSequenceNumber).append("; consumer record timestamp: " + pubSubMessage.getPubSubMessageTime() + " (" + new Date(pubSubMessage.getPubSubMessageTime()).toString() + ")").append("; producer timestamp: " + producerMetadata.messageTimestamp + " (" + new Date(producerMetadata.messageTimestamp).toString() + ")");
            if (pubSubMessage.getValue().leaderMetadataFooter != null) {
                sb.append("; leader metadata's upstream offset: " + pubSubMessage.getValue().leaderMetadataFooter.upstreamOffset).append("; leader metadata's host name: " + ((Object) pubSubMessage.getValue().leaderMetadataFooter.hostName));
            }
            if (segment != null) {
                sb.append("; aggregates: " + printMap(segment.getAggregates())).append("; debugInfo: " + printMap(segment.getDebugInfo()));
            }
            sb.append((String) optional.map(str -> {
                return "; extra info: " + str;
            }).orElse(""));
            return this.exceptionSupplier.apply(sb.toString());
        }

        private <K, V> String printMap(Map<K, V> map) {
            StringBuilder sb = new StringBuilder();
            sb.append(VectorFormat.DEFAULT_PREFIX);
            for (Map.Entry<K, V> entry : map.entrySet()) {
                sb.append("\n\t");
                sb.append(entry.getKey());
                sb.append(entry.getValue());
            }
            if (!map.isEmpty()) {
                sb.append("\n");
            }
            sb.append("}");
            return sb.toString();
        }
    }

    public ProducerTracker(GUID guid, String str) {
        this.producerGUID = guid;
        this.topicName = str;
    }

    public final String toString() {
        return ProducerTracker.class.getSimpleName() + "(GUID: " + ByteUtils.toHexString(this.producerGUID.bytes()) + ", topic: " + this.topicName + ")";
    }

    public ReentrantLock getPartitionLock(int i) {
        return this.partitionLocks.computeIfAbsent(Integer.valueOf(i), num -> {
            return new ReentrantLock();
        });
    }

    public void clearPartition(int i) {
        ReentrantLock partitionLock = getPartitionLock(i);
        partitionLock.lock();
        try {
            this.segments.remove(Integer.valueOf(i));
        } finally {
            partitionLock.unlock();
        }
    }

    public void setPartitionState(int i, ProducerPartitionState producerPartitionState) {
        setPartitionState(i, new Segment(i, producerPartitionState));
    }

    private void setPartitionState(int i, Segment segment) {
        ReentrantLock partitionLock = getPartitionLock(i);
        partitionLock.lock();
        try {
            if (this.segments.containsKey(Integer.valueOf(i))) {
                this.logger.info("{} will overwrite previous state for partition: {}, Previous state: {}, New state: {}", this, Integer.valueOf(i), this.segments.get(Integer.valueOf(i)), segment);
            } else {
                this.logger.info("{} will set state for partition: {}, New state: {}", this, Integer.valueOf(i), segment);
            }
            this.segments.put(Integer.valueOf(i), segment);
            partitionLock.unlock();
        } catch (Throwable th) {
            partitionLock.unlock();
            throw th;
        }
    }

    public void cloneProducerStates(int i, ProducerTracker producerTracker) {
        if (this.segments.containsKey(Integer.valueOf(i))) {
            ReentrantLock partitionLock = getPartitionLock(i);
            partitionLock.lock();
            try {
                producerTracker.setPartitionState(i, new Segment(this.segments.get(Integer.valueOf(i))));
                partitionLock.unlock();
            } catch (Throwable th) {
                partitionLock.unlock();
                throw th;
            }
        }
    }

    public void updateOffsetRecord(int i, OffsetRecord offsetRecord) {
        if (this.segments.containsKey(Integer.valueOf(i))) {
            ReentrantLock partitionLock = getPartitionLock(i);
            partitionLock.lock();
            try {
                Segment segment = this.segments.get(Integer.valueOf(i));
                ProducerPartitionState producerPartitionState = offsetRecord.getProducerPartitionState(this.producerGUID);
                if (producerPartitionState == null) {
                    producerPartitionState = new ProducerPartitionState();
                    producerPartitionState.aggregates = segment.getAggregates();
                    producerPartitionState.debugInfo = segment.getDebugInfo();
                }
                producerPartitionState.checksumType = segment.getCheckSumType().getValue();
                producerPartitionState.checksumState = ByteBuffer.wrap(segment.getCheckSumState());
                producerPartitionState.segmentNumber = segment.getSegmentNumber();
                producerPartitionState.messageSequenceNumber = segment.getSequenceNumber();
                producerPartitionState.messageTimestamp = segment.getLastRecordProducerTimestamp();
                producerPartitionState.segmentStatus = segment.getStatus().getValue();
                producerPartitionState.isRegistered = segment.isRegistered();
                offsetRecord.setProducerPartitionState(this.producerGUID, producerPartitionState);
                partitionLock.unlock();
            } catch (Throwable th) {
                partitionLock.unlock();
                throw th;
            }
        }
    }

    public void validateMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, boolean z, Lazy<Boolean> lazy) throws DataValidationException {
        ReentrantLock partitionLock = getPartitionLock(pubSubMessage.getTopicPartition().getPartitionNumber());
        partitionLock.lock();
        try {
            Segment segment = this.segments.get(Integer.valueOf(pubSubMessage.getTopicPartition().getPartitionNumber()));
            boolean z2 = segment != null;
            Segment trackSegment = trackSegment(segment, pubSubMessage, z, lazy);
            trackSequenceNumber(trackSegment, pubSubMessage, z, lazy, z2);
            trackCheckSum(trackSegment, pubSubMessage, z, lazy);
            trackSegment.setLastSuccessfulOffset(pubSubMessage.getOffset().longValue());
            trackSegment.setNewSegment(false);
            partitionLock.unlock();
        } catch (Throwable th) {
            partitionLock.unlock();
            throw th;
        }
    }

    private Segment trackSegment(Segment segment, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, boolean z, Lazy<Boolean> lazy) throws DuplicateDataException {
        int i = pubSubMessage.getValue().producerMetadata.segmentNumber;
        if (segment == null) {
            if (i != 0) {
                handleUnregisteredProducer("track new segment with non-zero incomingSegment=" + i, pubSubMessage, null, z);
            }
            return initializeNewSegment(pubSubMessage, z, true);
        }
        int segmentNumber = segment.getSegmentNumber();
        if (i == segmentNumber) {
            return segment;
        }
        if (i == segmentNumber + 1 && segment.isEnded()) {
            return initializeNewSegment(pubSubMessage, z, false);
        }
        if (i > segmentNumber) {
            if (lazy.get().booleanValue()) {
                return initializeNewSegment(pubSubMessage, z, true);
            }
            throw DataFaultType.MISSING.getNewException(segment, pubSubMessage);
        }
        if (i < segmentNumber) {
            throw DataFaultType.DUPLICATE.getNewException(segment, pubSubMessage);
        }
        throw new IllegalStateException("This condition should never happen. " + getClass().getSimpleName() + " may have a regression.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Segment initializeNewSegment(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, boolean z, boolean z2) {
        CheckSumType checkSumType = CheckSumType.NONE;
        boolean z3 = true;
        Map hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (MessageType.valueOf(pubSubMessage.getValue()) == MessageType.CONTROL_MESSAGE) {
            ControlMessage controlMessage = (ControlMessage) pubSubMessage.getValue().payloadUnion;
            if (ControlMessageType.valueOf(controlMessage) == ControlMessageType.START_OF_SEGMENT) {
                StartOfSegment startOfSegment = (StartOfSegment) controlMessage.controlMessageUnion;
                checkSumType = CheckSumType.valueOf(startOfSegment.checksumType);
                hashMap = controlMessage.debugInfo;
                startOfSegment.upcomingAggregates.stream().forEach(charSequence -> {
                    hashMap2.put(charSequence, 0L);
                });
                z3 = false;
            }
        }
        Segment segment = new Segment(pubSubMessage.getTopicPartition().getPartitionNumber(), pubSubMessage.getValue().producerMetadata.segmentNumber, pubSubMessage.getValue().producerMetadata.messageSequenceNumber, checkSumType, hashMap, hashMap2);
        this.segments.put(Integer.valueOf(pubSubMessage.getTopicPartition().getPartitionNumber()), segment);
        if (z3) {
            handleUnregisteredProducer("initialize new segment with a non-" + ControlMessageType.START_OF_SEGMENT.name() + " message", pubSubMessage, null, z, Optional.of(Boolean.valueOf(z2)));
        } else {
            segment.registeredSegment();
        }
        return segment;
    }

    private void handleUnregisteredProducer(String str, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, Segment segment, boolean z) {
        handleUnregisteredProducer(str, pubSubMessage, segment, z, Optional.empty());
    }

    private void handleUnregisteredProducer(String str, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, Segment segment, boolean z, Optional<Boolean> optional) {
        String str2 = str + ", endOfPushReceived=" + z;
        if (optional.isPresent()) {
            str2 = str2 + ", tolerateAnyMessageType=" + optional;
        }
        if (!z || !optional.orElse(true).booleanValue()) {
            throw DataFaultType.UNREGISTERED_PRODUCER.getNewException(segment, pubSubMessage, Optional.of("Cannot " + str2));
        }
        if (REDUNDANT_LOGGING_FILTER.isRedundantException(pubSubMessage.getTopicPartition().getPubSubTopic().getName() + "-" + pubSubMessage.getTopicPartition().getPartitionNumber() + "-" + DataFaultType.UNREGISTERED_PRODUCER)) {
            return;
        }
        this.logger.warn("Will {}", str2);
    }

    private void trackSequenceNumber(Segment segment, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, boolean z, Lazy<Boolean> lazy, boolean z2) throws MissingDataException, DuplicateDataException {
        int sequenceNumber = segment.getSequenceNumber();
        int i = pubSubMessage.getValue().producerMetadata.messageSequenceNumber;
        if (!segment.isStarted()) {
            segment.start();
            segment.setLastRecordProducerTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp);
            return;
        }
        if (segment.isNewSegment() && i == sequenceNumber) {
            if (segment.getSequenceNumber() > 0 && !lazy.get().booleanValue()) {
                throw DataFaultType.MISSING.getNewException(segment, pubSubMessage);
            }
            segment.setLastRecordProducerTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp);
            return;
        }
        if (i == sequenceNumber + 1) {
            segment.getAndIncrementSequenceNumber();
            segment.setLastRecordProducerTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp);
            return;
        }
        if (i <= sequenceNumber) {
            if (z2) {
                throw DataFaultType.DUPLICATE.getNewException(segment, pubSubMessage);
            }
            segment.setLastRecordProducerTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp);
        } else {
            if (i <= sequenceNumber + 1) {
                throw new IllegalStateException("Unreachable code!");
            }
            DataValidationException newException = DataFaultType.MISSING.getNewException(segment, pubSubMessage);
            if ((!z || segment.isRegistered()) && !lazy.get().booleanValue()) {
                throw newException;
            }
            segment.setSequenceNumber(i);
            segment.setLastRecordProducerTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp);
        }
    }

    private void trackCheckSum(Segment segment, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, boolean z, Lazy<Boolean> lazy) throws CorruptDataException {
        boolean z2 = true;
        try {
            z2 = segment.addToCheckSum(pubSubMessage.getKey(), pubSubMessage.getValue());
        } catch (IncomingDataAfterSegmentEndedException e) {
            if (!lazy.get().booleanValue()) {
                throw e;
            }
        }
        if (z2) {
            return;
        }
        EndOfSegment endOfSegment = (EndOfSegment) ((ControlMessage) pubSubMessage.getValue().payloadUnion).controlMessageUnion;
        if (ByteBuffer.wrap(segment.getFinalCheckSum()).equals(endOfSegment.checksumValue)) {
            segment.end(endOfSegment.finalSegment);
            return;
        }
        DataValidationException newException = DataFaultType.CORRUPT.getNewException(segment, pubSubMessage);
        if ((z && !segment.isRegistered()) || lazy.get().booleanValue()) {
            segment.end(endOfSegment.finalSegment);
        } else {
            if (!z) {
                throw newException;
            }
            segment.end(endOfSegment.finalSegment);
            throw newException;
        }
    }

    private void validateSequenceNumber(Segment segment, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, long j, Optional<DIVErrorMetricCallback> optional) throws MissingDataException {
        int sequenceNumber = segment.getSequenceNumber();
        int i = pubSubMessage.getValue().producerMetadata.messageSequenceNumber;
        if (!segment.isStarted()) {
            segment.start();
            segment.setSequenceNumber(i);
            segment.setLastRecordTimestamp(pubSubMessage.getPubSubMessageTime());
            return;
        }
        if (i == sequenceNumber + 1) {
            segment.getAndIncrementSequenceNumber();
            segment.setLastRecordTimestamp(pubSubMessage.getPubSubMessageTime());
            return;
        }
        if (i <= sequenceNumber) {
            segment.setLastRecordTimestamp(pubSubMessage.getPubSubMessageTime());
            return;
        }
        if (i <= sequenceNumber + 1) {
            throw new IllegalStateException("Unreachable code!");
        }
        long lastRecordTimestamp = segment.getLastRecordTimestamp();
        if (j <= 0 || LatencyUtils.getElapsedTimeInMs(lastRecordTimestamp) >= j) {
            segment.setSequenceNumber(i);
            segment.setLastRecordTimestamp(pubSubMessage.getPubSubMessageTime());
        } else {
            DataValidationException newException = DataFaultType.MISSING.getNewException(segment, pubSubMessage);
            this.logger.error("Encountered missing data message within the log compaction time window. Error msg: {}", newException.getMessage());
            if (optional.isPresent()) {
                optional.get().execute(newException);
            }
            throw newException;
        }
    }

    public void checkMissingMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, Optional<DIVErrorMetricCallback> optional, long j) throws DataValidationException {
        ReentrantLock partitionLock = getPartitionLock(pubSubMessage.getTopicPartition().getPartitionNumber());
        partitionLock.lock();
        try {
            try {
                Segment trackSegment = trackSegment(this.segments.get(Integer.valueOf(pubSubMessage.getTopicPartition().getPartitionNumber())), pubSubMessage, true, Lazy.FALSE);
                validateSequenceNumber(trackSegment, pubSubMessage, j, optional);
                trackSegment.setLastRecordTimestamp(pubSubMessage.getPubSubMessageTime());
                pubSubMessage.getValue();
                switch (MessageType.valueOf(r0)) {
                    case CONTROL_MESSAGE:
                        switch (ControlMessageType.valueOf((ControlMessage) r0.payloadUnion)) {
                            case END_OF_SEGMENT:
                                trackSegment.end(true);
                                break;
                        }
                }
                partitionLock.unlock();
            } catch (DuplicateDataException e) {
                partitionLock.unlock();
            } catch (MissingDataException e2) {
                this.logger.error("Encountered a missing segment. This is unacceptable even if log compaction kicks in. Error msg: {}", e2.getMessage());
                if (optional.isPresent()) {
                    optional.get().execute(e2);
                }
                throw e2;
            }
        } catch (Throwable th) {
            partitionLock.unlock();
            throw th;
        }
    }
}
