package org.apache.pulsar.broker.service;

import io.netty.buffer.Unpooled;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/EnableProxyProtocolTest.class */
public class EnableProxyProtocolTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setHaProxyProtocolEnabled(true);
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSimpleProduceAndConsume() throws PulsarClientException {
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/testSimpleProduceAndConsume").subscriptionName("my-subscriber-name").subscribe();
        try {
            Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testSimpleProduceAndConsume").create();
            for (int i = 0; i < 100; i++) {
                try {
                    create.send(("Message-" + i).getBytes());
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            int i2 = 0;
            for (int i3 = 0; i3 < 100; i3++) {
                subscribe.acknowledge(subscribe.receive());
                i2++;
            }
            Assert.assertEquals(i2, 100);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
        ((PulsarClientImpl) this.pulsarClient).getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", ((Integer) this.pulsar.getBrokerListenPort().get()).intValue())).get().ctx().channel().writeAndFlush(Unpooled.copiedBuffer("PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes()));
        this.pulsarClient.newConsumer().topic("persistent://prop/ns-abc/testProxyProtocol").subscriptionName("my-subscriber-name").subscribe();
        Consumer consumer = (Consumer) ((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testProxyProtocol").get()).getSubscription("my-subscriber-name").getConsumers().get(0);
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertTrue(consumer.cnx().hasHAProxyMessage());
        });
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-abc/testProxyProtocol");
        Assert.assertEquals(stats.subscriptions.size(), 1);
        SubscriptionStats subscriptionStats = stats.subscriptions.get("my-subscriber-name");
        Assert.assertEquals(subscriptionStats.consumers.size(), 1);
        Assert.assertEquals(subscriptionStats.consumers.get(0).getAddress(), "198.51.100.22:35646");
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testProxyProtocol").create();
        TopicStats stats2 = this.admin.topics().getStats("persistent://prop/ns-abc/testProxyProtocol");
        Assert.assertEquals(stats2.publishers.size(), 1);
        Assert.assertEquals(stats2.publishers.get(0).getAddress(), "198.51.100.22:35646");
    }
}
