package org.apache.pulsar.client.api;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/ServiceUrlProviderTest.class */
public class ServiceUrlProviderTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ServiceUrlProviderTest.class);

    /* loaded from: input_file:org/apache/pulsar/client/api/ServiceUrlProviderTest$AutoChangedServiceUrlProvider.class */
    static class AutoChangedServiceUrlProvider extends TestServiceUrlProvider {
        public AutoChangedServiceUrlProvider(String str) {
            super(str);
        }

        public void onServiceUrlChanged(String str) throws PulsarClientException {
            getPulsarClient().updateServiceUrl(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/client/api/ServiceUrlProviderTest$TestServiceUrlProvider.class */
    public static class TestServiceUrlProvider implements ServiceUrlProvider {
        private PulsarClient pulsarClient;
        private final String serviceUrl;

        public TestServiceUrlProvider(String str) {
            this.serviceUrl = str;
        }

        public String getServiceUrl() {
            return this.serviceUrl;
        }

        public void initialize(PulsarClient pulsarClient) {
            this.pulsarClient = pulsarClient;
        }

        public PulsarClient getPulsarClient() {
            return this.pulsarClient;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testCreateClientWithServiceUrlProvider() throws Exception {
        PulsarClientImpl build = PulsarClient.builder().serviceUrlProvider(new TestServiceUrlProvider(this.pulsar.getSafeBrokerServiceUrl())).statsInterval(1L, TimeUnit.SECONDS).build();
        try {
            Assert.assertTrue(build.getConfiguration().getServiceUrlProvider() instanceof TestServiceUrlProvider);
            Producer create = build.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
            Consumer subscribe = build.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscribe").subscribe();
            for (int i = 0; i < 100; i++) {
                create.send("Hello Pulsar[" + i + "]");
            }
            build.updateServiceUrl(this.pulsar.getSafeBrokerServiceUrl());
            for (int i2 = 100; i2 < 200; i2++) {
                create.send("Hello Pulsar[" + i2 + "]");
            }
            int i3 = 0;
            do {
                System.out.println((String) subscribe.receive().getValue());
                i3++;
            } while (i3 < 200);
            Assert.assertEquals(i3, 200);
            create.close();
            subscribe.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testCreateClientWithAutoChangedServiceUrlProvider() throws Exception {
        AutoChangedServiceUrlProvider autoChangedServiceUrlProvider = new AutoChangedServiceUrlProvider(this.pulsar.getSafeBrokerServiceUrl());
        PulsarClientImpl build = PulsarClient.builder().serviceUrlProvider(autoChangedServiceUrlProvider).statsInterval(1L, TimeUnit.SECONDS).build();
        try {
            Assert.assertTrue(build.getConfiguration().getServiceUrlProvider() instanceof AutoChangedServiceUrlProvider);
            ProducerImpl create = build.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic").create();
            ConsumerImpl subscribe = build.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionName("my-subscribe").subscribe();
            PulsarService pulsarService = this.pulsar;
            this.conf.setBrokerShutdownTimeoutMs(0L);
            this.conf.setBrokerServicePort(Optional.of(0));
            this.conf.setWebServicePort(Optional.of(0));
            restartBroker();
            PulsarService pulsarService2 = this.pulsar;
            log.info("Pulsar1 = {}, Pulsar2 = {}", pulsarService.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
            Assert.assertNotEquals(pulsarService.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
            log.info("Service url : producer = {}, consumer = {}", create.getClient().getLookup().getServiceUrl(), subscribe.getClient().getLookup().getServiceUrl());
            Assert.assertEquals(create.getClient().getLookup().getServiceUrl(), pulsarService.getSafeBrokerServiceUrl());
            Assert.assertEquals(subscribe.getClient().getLookup().getServiceUrl(), pulsarService.getSafeBrokerServiceUrl());
            log.info("Changing service url from {} to {}", pulsarService.getSafeBrokerServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
            autoChangedServiceUrlProvider.onServiceUrlChanged(pulsarService2.getSafeBrokerServiceUrl());
            log.info("Service url changed : producer = {}, consumer = {}", create.getClient().getLookup().getServiceUrl(), subscribe.getClient().getLookup().getServiceUrl());
            Assert.assertEquals(create.getClient().getLookup().getServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
            Assert.assertEquals(subscribe.getClient().getLookup().getServiceUrl(), pulsarService2.getSafeBrokerServiceUrl());
            create.close();
            subscribe.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }
}
