package org.apache.pulsar.client.api;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.ExponentialBackOff;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/TlsProducerConsumerTest.class */
public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TlsProducerConsumerTest.class);

    @Test(timeOut = ExponentialBackOff.DEFAULT_MAX_INTERVAL)
    public void testTlsLargeSizeMessage() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        log.info("-- message size -- {}", (Object) 16385);
        internalSetUpForClient(true, this.pulsar.getBrokerServiceUrlTls());
        internalSetUpForNamespace();
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        for (int i = 0; i < 10; i++) {
            byte[] bArr = new byte[16385];
            Arrays.fill(bArr, (byte) i);
            create.send(bArr);
        }
        Message<byte[]> message = null;
        for (int i2 = 0; i2 < 10; i2++) {
            message = subscribe.receive(5, TimeUnit.SECONDS);
            byte[] bArr2 = new byte[16385];
            Arrays.fill(bArr2, (byte) i2);
            Assert.assertEquals(bArr2, message.getData());
        }
        subscribe.acknowledgeCumulative((Message<?>) message);
        subscribe.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test(timeOut = ExponentialBackOff.DEFAULT_MAX_INTERVAL)
    public void testTlsClientAuthOverBinaryProtocol() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        log.info("-- message size -- {}", (Object) 16385);
        internalSetUpForNamespace();
        internalSetUpForClient(false, this.pulsar.getBrokerServiceUrlTls());
        try {
            this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail("Server should have failed the TLS handshake since client didn't .");
        } catch (Exception e) {
        }
        internalSetUpForClient(true, this.pulsar.getBrokerServiceUrlTls());
        try {
            this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        } catch (Exception e2) {
            Assert.fail("Should not fail since certs are sent.");
        }
    }

    @Test(timeOut = ExponentialBackOff.DEFAULT_MAX_INTERVAL)
    public void testTlsClientAuthOverHTTPProtocol() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        log.info("-- message size -- {}", (Object) 16385);
        internalSetUpForNamespace();
        internalSetUpForClient(false, this.pulsar.getWebServiceAddressTls());
        try {
            this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
            Assert.fail("Server should have failed the TLS handshake since client didn't .");
        } catch (Exception e) {
        }
        internalSetUpForClient(true, this.pulsar.getWebServiceAddressTls());
        try {
            this.pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Exclusive).subscribe();
        } catch (Exception e2) {
            Assert.fail("Should not fail since certs are sent.");
        }
    }

    @Test(timeOut = 60000)
    public void testTlsCertsFromDynamicStream() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ClientBuilder operationTimeout = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrlTls()).enableTls(true).allowTlsInsecureConnection(false).operationTimeout(1000, TimeUnit.MILLISECONDS);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ByteArrayInputStream createByteInputStream = createByteInputStream("./src/test/resources/authentication/tls/client-cert.pem");
        ByteArrayInputStream createByteInputStream2 = createByteInputStream("./src/test/resources/authentication/tls/client-key.pem");
        ByteArrayInputStream createByteInputStream3 = createByteInputStream("./src/test/resources/authentication/tls/cacert.pem");
        operationTimeout.authentication(new AuthenticationTls(() -> {
            return getStream(atomicInteger, createByteInputStream);
        }, () -> {
            return getStream(atomicInteger, createByteInputStream2);
        }, () -> {
            return getStream(atomicInteger, createByteInputStream3);
        }));
        PulsarClient build = operationTimeout.build();
        try {
            Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
            ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/use/my-ns/my-topic1").get()).close(false);
            Producer<byte[]> producer = build.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").createAsync().get(30L, TimeUnit.SECONDS);
            for (int i = 0; i < 10; i++) {
                producer.send(("test" + i).getBytes());
            }
            Message<byte[]> message = null;
            for (int i2 = 0; i2 < 10; i2++) {
                message = subscribe.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals(("test" + i2).getBytes(), message.getData());
            }
            subscribe.acknowledgeCumulative((Message<?>) message);
            subscribe.close();
            log.info("-- Exiting {} test --", this.methodName);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        ClientBuilder operationTimeout = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrlTls()).enableTls(true).allowTlsInsecureConnection(false).operationTimeout(1000, TimeUnit.MILLISECONDS);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(1);
        ByteArrayInputStream createByteInputStream = createByteInputStream("./src/test/resources/authentication/tls/client-cert.pem");
        ByteArrayInputStream createByteInputStream2 = createByteInputStream("./src/test/resources/authentication/tls/client-key.pem");
        ByteArrayInputStream createByteInputStream3 = createByteInputStream("./src/test/resources/authentication/tls/cacert.pem");
        operationTimeout.authentication(new AuthenticationTls(() -> {
            return getStream(atomicInteger, createByteInputStream, createByteInputStream2);
        }, () -> {
            return getStream(atomicInteger2, createByteInputStream2);
        }, () -> {
            return getStream(atomicInteger3, createByteInputStream3, createByteInputStream2);
        }));
        PulsarClient build = operationTimeout.build();
        try {
            try {
                build.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
                Assert.fail("should have failed due to invalid tls cert");
            } catch (PulsarClientException e) {
            }
            atomicInteger.set(0);
            try {
                build.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
                Assert.fail("should have failed due to invalid tls cert");
            } catch (PulsarClientException e2) {
            }
            atomicInteger3.set(0);
            build.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe().close();
            log.info("-- Exiting {} test --", this.methodName);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    private ByteArrayInputStream createByteInputStream(String str) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(str);
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            IOUtils.copy(fileInputStream, byteArrayOutputStream);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
            fileInputStream.close();
            return byteArrayInputStream;
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private ByteArrayInputStream getStream(AtomicInteger atomicInteger, ByteArrayInputStream... byteArrayInputStreamArr) {
        return byteArrayInputStreamArr[atomicInteger.intValue()];
    }
}
