package org.apache.pulsar.common.api.raw;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-common-2.8.0.1.1.28.jar:org/apache/pulsar/common/api/raw/MessageParser.class */
public final class MessageParser {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageParser.class);
    private static final FastThreadLocal<SingleMessageMetadata> LOCAL_SINGLE_MESSAGE_METADATA = new FastThreadLocal<SingleMessageMetadata>() { // from class: org.apache.pulsar.common.api.raw.MessageParser.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.netty.util.concurrent.FastThreadLocal
        public SingleMessageMetadata initialValue() throws Exception {
            return new SingleMessageMetadata();
        }
    };

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-common-2.8.0.1.1.28.jar:org/apache/pulsar/common/api/raw/MessageParser$MessageProcessor.class */
    public interface MessageProcessor {
        void process(RawMessage rawMessage) throws IOException;
    }

    public static void parseMessage(TopicName topicName, long j, long j2, ByteBuf byteBuf, MessageProcessor messageProcessor, int i) throws IOException {
        ReferenceCountedMessageMetadata referenceCountedMessageMetadata = null;
        try {
            if (!verifyChecksum(topicName, byteBuf, j, j2)) {
                ReferenceCountUtil.safeRelease(null);
                ReferenceCountUtil.safeRelease(null);
                return;
            }
            referenceCountedMessageMetadata = ReferenceCountedMessageMetadata.get(byteBuf);
            MessageMetadata metadata = referenceCountedMessageMetadata.getMetadata();
            try {
                Commands.parseMessageMetadata(byteBuf, metadata);
                if (metadata.hasMarkerType()) {
                    ReferenceCountUtil.safeRelease(null);
                    ReferenceCountUtil.safeRelease(referenceCountedMessageMetadata);
                    return;
                }
                if (metadata.getEncryptionKeysCount() > 0) {
                    throw new IOException("Cannot parse encrypted message " + metadata + " on topic " + topicName);
                }
                ByteBuf uncompressPayloadIfNeeded = uncompressPayloadIfNeeded(topicName, metadata, byteBuf, j, j2, i);
                if (uncompressPayloadIfNeeded == null) {
                    ReferenceCountUtil.safeRelease(uncompressPayloadIfNeeded);
                    ReferenceCountUtil.safeRelease(referenceCountedMessageMetadata);
                    return;
                }
                if (metadata.getNumMessagesInBatch() != 1 || metadata.hasNumMessagesInBatch()) {
                    receiveIndividualMessagesFromBatch(referenceCountedMessageMetadata, uncompressPayloadIfNeeded, j, j2, messageProcessor);
                } else {
                    messageProcessor.process(RawMessageImpl.get(referenceCountedMessageMetadata, null, uncompressPayloadIfNeeded.retain(), j, j2, 0L));
                }
                ReferenceCountUtil.safeRelease(uncompressPayloadIfNeeded);
                ReferenceCountUtil.safeRelease(referenceCountedMessageMetadata);
            } catch (Throwable th) {
                log.warn("[{}] Failed to deserialize metadata for message {}:{} - Ignoring", topicName, Long.valueOf(j), Long.valueOf(j2));
                ReferenceCountUtil.safeRelease(null);
                ReferenceCountUtil.safeRelease(referenceCountedMessageMetadata);
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.safeRelease(null);
            ReferenceCountUtil.safeRelease(referenceCountedMessageMetadata);
            throw th2;
        }
    }

    public static boolean verifyChecksum(TopicName topicName, ByteBuf byteBuf, long j, long j2) {
        int readChecksum;
        int computeChecksum;
        if (!Commands.hasChecksum(byteBuf) || (readChecksum = Commands.readChecksum(byteBuf)) == (computeChecksum = Crc32cIntChecksum.computeChecksum(byteBuf))) {
            return true;
        }
        log.error("[{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", topicName, Long.valueOf(j), Long.valueOf(j2), Long.toHexString(readChecksum), Integer.toHexString(computeChecksum));
        return false;
    }

    public static ByteBuf uncompressPayloadIfNeeded(TopicName topicName, MessageMetadata messageMetadata, ByteBuf byteBuf, long j, long j2, int i) {
        CompressionCodec compressionCodec = CompressionCodecProvider.getCompressionCodec(messageMetadata.getCompression());
        int uncompressedSize = messageMetadata.getUncompressedSize();
        int readableBytes = byteBuf.readableBytes();
        if (readableBytes > i) {
            log.error("[{}] Got corrupted payload message size {} at {}:{}", topicName, Integer.valueOf(readableBytes), Long.valueOf(j), Long.valueOf(j2));
            return null;
        }
        try {
            return compressionCodec.decode(byteBuf, uncompressedSize);
        } catch (IOException e) {
            log.error("[{}] Failed to decompress message with {} at {}:{} : {}", topicName, messageMetadata.getCompression(), Long.valueOf(j), Long.valueOf(j2), e.getMessage(), e);
            return null;
        }
    }

    private static void receiveIndividualMessagesFromBatch(ReferenceCountedMessageMetadata referenceCountedMessageMetadata, ByteBuf byteBuf, long j, long j2, MessageProcessor messageProcessor) {
        int numMessagesInBatch = referenceCountedMessageMetadata.getMetadata().getNumMessagesInBatch();
        for (int i = 0; i < numMessagesInBatch; i++) {
            try {
                SingleMessageMetadata singleMessageMetadata = LOCAL_SINGLE_MESSAGE_METADATA.get();
                ByteBuf deSerializeSingleMessageInBatch = Commands.deSerializeSingleMessageInBatch(byteBuf, singleMessageMetadata, i, numMessagesInBatch);
                if (singleMessageMetadata.isCompactedOut()) {
                    deSerializeSingleMessageInBatch.release();
                } else {
                    messageProcessor.process(RawMessageImpl.get(referenceCountedMessageMetadata, singleMessageMetadata, deSerializeSingleMessageInBatch, j, j2, i));
                }
            } catch (IOException e) {
                log.warn("Unable to obtain messages in batch", (Throwable) e);
                return;
            }
        }
    }

    private MessageParser() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }
}
