package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/MaxMessageSizeTest.class */
public class MaxMessageSizeTest {
    PulsarService pulsar;
    ServiceConfiguration configuration;
    PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;

    @BeforeMethod
    void setup() {
        try {
            this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
                return 0;
            });
            ServerConfiguration serverConfiguration = new ServerConfiguration();
            serverConfiguration.setNettyMaxFrameSizeBytes(10485760);
            this.bkEnsemble.startStandalone(serverConfiguration, false);
            this.configuration = new ServiceConfiguration();
            this.configuration.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            this.configuration.setAdvertisedAddress("localhost");
            this.configuration.setWebServicePort(Optional.of(0));
            this.configuration.setClusterName("max_message_test");
            this.configuration.setBrokerShutdownTimeoutMs(0L);
            this.configuration.setBrokerServicePort(Optional.of(0));
            this.configuration.setAuthorizationEnabled(false);
            this.configuration.setAuthenticationEnabled(false);
            this.configuration.setManagedLedgerMaxEntriesPerLedger(5);
            this.configuration.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.configuration.setMaxMessageSize(10485760);
            this.pulsar = new PulsarService(this.configuration);
            this.pulsar.start();
            String str = "http://127.0.0.1:" + this.pulsar.getListenPortHTTP().get();
            this.admin = PulsarAdmin.builder().serviceHttpUrl(str).build();
            this.admin.clusters().createCluster("max_message_test", ClusterData.builder().serviceUrl(str).build());
            this.admin.tenants().createTenant("test", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"max_message_test"})));
            this.admin.namespaces().createNamespace("test/message", Sets.newHashSet(new String[]{"max_message_test"}));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @AfterMethod(alwaysRun = true)
    void shutdown() {
        try {
            this.pulsar.close();
            this.bkEnsemble.stop();
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    @Test
    public void testMaxMessageSetting() throws PulsarClientException {
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).build();
        try {
            Producer create = build.newProducer().topic("persistent://test/message/topic1").sendTimeout(60, TimeUnit.SECONDS).create();
            Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://test/message/topic1"}).subscriptionName("test1").subscribe();
            byte[] bArr = new byte[2097152];
            try {
                create.send(bArr);
            } catch (PulsarClientException e) {
                Assert.fail("Shouldn't have exception at here", e);
            }
            Assert.assertEquals(bArr, subscribe.receive().getData());
            byte[] bArr2 = new byte[5242880];
            try {
                create.send(bArr2);
            } catch (PulsarClientException e2) {
                Assert.fail("Shouldn't have exception at here", e2);
            }
            Assert.assertEquals(bArr2, subscribe.receive().getData());
            byte[] bArr3 = new byte[8388608];
            try {
                create.send(bArr3);
            } catch (PulsarClientException e3) {
                Assert.fail("Shouldn't have exception at here", e3);
            }
            Assert.assertEquals(bArr3, subscribe.receive().getData());
            try {
                create.send(new byte[10485760]);
                Assert.fail("Shouldn't send out this message");
            } catch (PulsarClientException e4) {
            }
            subscribe.unsubscribe();
            subscribe.close();
            create.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }
}
