package org.apache.pulsar.websocket.proxy;

import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.TlsProducerConsumerBase;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.ExponentialBackOff;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.class */
public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
    protected String methodName;
    private ProxyServer proxyServer;
    private WebSocketService service;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProxyPublishConsumeTlsTest.class);

    @Override // org.apache.pulsar.client.api.TlsProducerConsumerBase, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.setup();
        super.internalSetUpForNamespace();
        WebSocketProxyConfiguration webSocketProxyConfiguration = new WebSocketProxyConfiguration();
        webSocketProxyConfiguration.setWebServicePort(Optional.of(0));
        webSocketProxyConfiguration.setWebServicePortTls(Optional.of(0));
        webSocketProxyConfiguration.setBrokerClientTlsEnabled(true);
        webSocketProxyConfiguration.setTlsKeyFilePath("./src/test/resources/authentication/tls/broker-key.pem");
        webSocketProxyConfiguration.setTlsCertificateFilePath("./src/test/resources/authentication/tls/broker-cert.pem");
        webSocketProxyConfiguration.setTlsTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        webSocketProxyConfiguration.setBrokerClientTrustCertsFilePath("./src/test/resources/authentication/tls/cacert.pem");
        webSocketProxyConfiguration.setClusterName("use");
        webSocketProxyConfiguration.setConfigurationStoreServers("dummy-zk-servers");
        webSocketProxyConfiguration.setBrokerClientAuthenticationParameters("tlsCertFile:./src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:./src/test/resources/authentication/tls/client-key.pem");
        webSocketProxyConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.service = (WebSocketService) Mockito.spy(new WebSocketService(webSocketProxyConfiguration));
        ((WebSocketService) Mockito.doReturn(this.mockZooKeeperClientFactory).when(this.service)).getZooKeeperClientFactory();
        this.proxyServer = new ProxyServer(webSocketProxyConfiguration);
        WebSocketServiceStarter.start(this.proxyServer, this.service);
        log.info("Proxy Server Started");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.api.TlsProducerConsumerBase, org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.cleanup();
        this.service.close();
        this.proxyServer.stop();
        log.info("Finished Cleaning Up Test setup");
    }

    @Test(timeOut = ExponentialBackOff.DEFAULT_MAX_INTERVAL)
    public void socketTest() throws InterruptedException, GeneralSecurityException {
        String str = "wss://localhost:" + this.proxyServer.getListenPortHTTPS().get() + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
        String str2 = "wss://localhost:" + this.proxyServer.getListenPortHTTPS().get() + "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
        URI create = URI.create(str);
        URI create2 = URI.create(str2);
        SslContextFactory sslContextFactory = new SslContextFactory();
        sslContextFactory.setSslContext(SecurityUtility.createSslContext(false, SecurityUtility.loadCertificatesFromPemFile("./src/test/resources/authentication/tls/cacert.pem")));
        WebSocketClient webSocketClient = new WebSocketClient(sslContextFactory);
        SimpleConsumerSocket simpleConsumerSocket = new SimpleConsumerSocket();
        WebSocketClient webSocketClient2 = new WebSocketClient(sslContextFactory);
        try {
            try {
                webSocketClient.start();
                Future connect = webSocketClient.connect(simpleConsumerSocket, create, new ClientUpgradeRequest());
                log.info("Connecting to : {}", create);
                Assert.assertTrue(((Session) connect.get()).isOpen());
                SimpleProducerSocket simpleProducerSocket = new SimpleProducerSocket();
                ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
                webSocketClient2.start();
                Assert.assertTrue(((Session) webSocketClient2.connect(simpleProducerSocket, create2, clientUpgradeRequest).get()).isOpen());
                simpleConsumerSocket.awaitClose(1, TimeUnit.SECONDS);
                simpleProducerSocket.awaitClose(1, TimeUnit.SECONDS);
                Assert.assertTrue(simpleProducerSocket.getBuffer().size() > 0);
                Assert.assertEquals(simpleProducerSocket.getBuffer(), simpleConsumerSocket.getBuffer());
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
                try {
                    newFixedThreadPool.submit(() -> {
                        try {
                            webSocketClient.stop();
                            webSocketClient2.stop();
                            log.info("proxy clients are stopped successfully");
                        } catch (Exception e) {
                            log.error(e.getMessage());
                        }
                    }).get(2L, TimeUnit.SECONDS);
                } catch (Exception e) {
                    log.error("failed to close clients ", (Throwable) e);
                }
                newFixedThreadPool.shutdownNow();
            } catch (Throwable th) {
                log.error(th.getMessage());
                Assert.fail(th.getMessage());
                ExecutorService newFixedThreadPool2 = Executors.newFixedThreadPool(1);
                try {
                    newFixedThreadPool2.submit(() -> {
                        try {
                            webSocketClient.stop();
                            webSocketClient2.stop();
                            log.info("proxy clients are stopped successfully");
                        } catch (Exception e2) {
                            log.error(e2.getMessage());
                        }
                    }).get(2L, TimeUnit.SECONDS);
                } catch (Exception e2) {
                    log.error("failed to close clients ", (Throwable) e2);
                }
                newFixedThreadPool2.shutdownNow();
            }
        } catch (Throwable th2) {
            ExecutorService newFixedThreadPool3 = Executors.newFixedThreadPool(1);
            try {
                newFixedThreadPool3.submit(() -> {
                    try {
                        webSocketClient.stop();
                        webSocketClient2.stop();
                        log.info("proxy clients are stopped successfully");
                    } catch (Exception e22) {
                        log.error(e22.getMessage());
                    }
                }).get(2L, TimeUnit.SECONDS);
            } catch (Exception e3) {
                log.error("failed to close clients ", (Throwable) e3);
            }
            newFixedThreadPool3.shutdownNow();
            throw th2;
        }
    }
}
