package org.apache.pulsar.broker.service.persistent;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {SaslConstants.SASL_BROKER_PROTOCOL})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ChecksumTest.class */
public class ChecksumTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void verifyChecksumStoredInManagedLedger() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic0").create();
        ManagedCursor openCursor = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic0").get()).getManagedLedger().openCursor("test");
        create.send("Hello".getBytes());
        List readEntriesOrWait = openCursor.readEntriesOrWait(1);
        Assert.assertEquals(readEntriesOrWait.size(), 1);
        ByteBuf dataBuffer = ((Entry) readEntriesOrWait.get(0)).getDataBuffer();
        Assert.assertTrue(Commands.hasChecksum(dataBuffer));
        Assert.assertEquals(Commands.readChecksum(dataBuffer), Crc32cIntChecksum.computeChecksum(dataBuffer));
        ((Entry) readEntriesOrWait.get(0)).release();
        create.close();
    }

    @Test
    public void verifyChecksumSentToConsumer() throws Exception {
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/topic-1").create();
        RawReader rawReader = (RawReader) RawReader.create(this.pulsarClient, "persistent://prop/use/ns-abc/topic-1", "sub").get();
        create.send("Hello".getBytes());
        ByteBuf headersAndPayload = ((RawMessage) rawReader.readNextAsync().get()).getHeadersAndPayload();
        Assert.assertTrue(Commands.hasChecksum(headersAndPayload));
        Assert.assertEquals(Commands.readChecksum(headersAndPayload), Crc32cIntChecksum.computeChecksum(headersAndPayload));
        create.close();
        rawReader.closeAsync().get();
    }
}
