package org.apache.pulsar.client.impl;

import io.netty.buffer.Unpooled;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/MessageChecksumTest.class */
public class MessageChecksumTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageChecksumTest.class);

    /* loaded from: input_file:org/apache/pulsar/client/impl/MessageChecksumTest$MixedVersionScenario.class */
    enum MixedVersionScenario {
        CONNECTED_TO_NEW_THEN_OLD_VERSION,
        CONNECTED_TO_OLD_THEN_NEW_VERSION
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
        clientBuilder.connectionsPerBroker(0);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        return PulsarTestClient.create(clientBuilder);
    }

    /* JADX WARN: Type inference failed for: r0v24, types: [java.lang.Object, byte[]] */
    @Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
    public void testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario mixedVersionScenario) throws Exception {
        if (mixedVersionScenario == MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
            makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
        }
        PulsarTestClient pulsarTestClient = (PulsarTestClient) this.pulsarClient;
        ProducerImpl<?> producerImpl = (ProducerImpl) this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling").subscriptionName("my-sub").subscribe();
        new CountDownLatch(2);
        producerImpl.send((ProducerImpl<?>) "message-1".getBytes());
        pulsarTestClient.dropOpSendMessages();
        ?? bytes = "message-2".getBytes();
        TypedMessageBuilder<?> value = producerImpl.newMessage().value(bytes);
        CompletableFuture<MessageId> sendAsync = value.sendAsync();
        pulsarTestClient.setPendingMessageCallback(null);
        pulsarTestClient.disconnectProducerAndRejectReconnecting(producerImpl);
        ((TypedMessageBuilderImpl) value).getContent().put(bytes.length - 1, (byte) 51);
        if (mixedVersionScenario == MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
            makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
        } else {
            resetOverridingConnectedBrokerVersion();
        }
        pulsarTestClient.allowReconnecting();
        try {
            sendAsync.get(10L, TimeUnit.SECONDS);
        } catch (Exception e) {
            Assert.fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail", e);
        }
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-1");
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-3");
    }

    private void makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
        ((PulsarTestClient) this.pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
    }

    private void resetOverridingConnectedBrokerVersion() {
        ((PulsarTestClient) this.pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
    }

    private void waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producerImpl) {
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(producerImpl.getPendingQueueSize(), 1);
        });
    }

    @Test
    public void testTamperingMessageIsDetected() throws Exception {
        ProducerImpl producerImpl = (ProducerImpl) this.pulsarClient.newProducer().topic("persistent://prop/use/ns-abc/testTamperingMessageIsDetected").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        TypedMessageBuilderImpl typedMessageBuilderImpl = (TypedMessageBuilderImpl) producerImpl.newMessage().value("a message".getBytes());
        ProducerImpl.OpSendMsg create = ProducerImpl.OpSendMsg.create((MessageImpl<?>) typedMessageBuilderImpl.getMessage(), Commands.newSend(1L, 1L, 1, Commands.ChecksumType.Crc32c, typedMessageBuilderImpl.getMetadataBuilder().setProducerName("test").setSequenceId(1L).setPublishTime(10L), Unpooled.wrappedBuffer(typedMessageBuilderImpl.getContent())), 1L, (SendCallback) null);
        Assert.assertTrue(producerImpl.verifyLocalBufferIsNotCorrupted(create));
        typedMessageBuilderImpl.getContent().put(0, (byte) 98);
        Assert.assertFalse(producerImpl.verifyLocalBufferIsNotCorrupted(create));
    }
}
