package org.apache.pulsar.broker.transaction;

import com.google.common.collect.Sets;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.impl.ClientCnxTest;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.class */
public class AuthenticatedTransactionProducerConsumerTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(AuthenticatedTransactionProducerConsumerTest.class);
    private static final String TOPIC = "tnx/ns1/txn-auth";
    private final KeyPair kp = KeyPairGenerator.getInstance("RSA").generateKeyPair();
    private final String TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(this.kp.getPublic().getEncoded());
    private final String ADMIN_TOKEN = generateToken(this.kp, "admin");

    AuthenticatedTransactionProducerConsumerTest() throws NoSuchAlgorithmException {
    }

    private String generateToken(KeyPair keyPair, String str) {
        PrivateKey privateKey = keyPair.getPrivate();
        return Jwts.builder().setSubject(str).setExpiration(new Date(System.currentTimeMillis() + Duration.ofHours(1L).toMillis())).signWith(privateKey, SignatureAlgorithm.forSigningKey(privateKey)).compact();
    }

    @BeforeMethod(alwaysRun = true)
    public void setup() throws Exception {
        this.conf.setAuthenticationEnabled(true);
        this.conf.setAuthorizationEnabled(true);
        HashSet hashSet = new HashSet();
        hashSet.add("admin");
        this.conf.setSuperUserRoles(hashSet);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(AuthenticationProviderToken.class.getName());
        this.conf.setAuthenticationProviders(hashSet2);
        Properties properties = new Properties();
        properties.setProperty("tokenPublicKey", this.TOKEN_PUBLIC_KEY);
        this.conf.setProperties(properties);
        this.conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
        this.conf.setBrokerClientAuthenticationParameters("token:" + this.ADMIN_TOKEN);
        setBrokerCount(1);
        internalSetup();
        setUpBase(1, 1, TOPIC, 1);
        grantTxnLookupToRole("client");
        this.admin.namespaces().grantPermissionOnNamespace(ClientCnxTest.NAMESPACE, "client", EnumSet.allOf(AuthAction.class));
        grantTxnLookupToRole("client2");
    }

    private void grantTxnLookupToRole(String str) {
        this.admin.namespaces().grantPermissionOnNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), str, Sets.newHashSet(new AuthAction[]{AuthAction.consume}));
    }

    @Override // org.apache.pulsar.broker.transaction.TransactionTestBase
    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
        return clientBuilder.enableTransaction(true).authentication(AuthenticationFactory.token(this.ADMIN_TOKEN)).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.transaction.TransactionTestBase
    public PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder pulsarAdminBuilder) throws PulsarClientException {
        return pulsarAdminBuilder.authentication(AuthenticationFactory.token(this.ADMIN_TOKEN)).build();
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanup() {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "actors")
    public Object[][] actors() {
        return new Object[]{new Object[]{"client", true}, new Object[]{"client", false}, new Object[]{"client2", true}, new Object[]{"client2", false}, new Object[]{"admin", true}, new Object[]{"admin", false}};
    }

    /* JADX WARN: Finally extract failed */
    @Test(dataProvider = "actors")
    public void testEndTxn(String str, boolean z) throws Exception {
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).authentication(AuthenticationFactory.token(generateToken(this.kp, "client"))).enableTransaction(true).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).authentication(AuthenticationFactory.token(generateToken(this.kp, str))).enableTransaction(true).build();
            try {
                Transaction transaction = (Transaction) build.newTransaction().withTransactionTimeout(60L, TimeUnit.SECONDS).build().get();
                Consumer subscribe = build.newConsumer(Schema.STRING).subscriptionName("test").topic(new String[]{TOPIC}).subscribe();
                try {
                    Producer create = build.newProducer(Schema.STRING).sendTimeout(60, TimeUnit.SECONDS).topic(TOPIC).create();
                    try {
                        create.newMessage().value("beforetxn").send();
                        subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction);
                        create.newMessage(transaction).value("message").send();
                        if (z) {
                            this.pulsarServiceList.get(0).getTransactionMetadataStoreService().removeTransactionMetadataStore(TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
                        }
                        Throwable syncGetException = syncGetException(build.getTcClient().commitAsync(transaction.getTxnID()));
                        if (str.equals("client") || str.equals("admin")) {
                            Assert.assertNull(syncGetException);
                            Assert.assertEquals((String) subscribe.receive(5, TimeUnit.SECONDS).getValue(), "message");
                        } else {
                            Assert.assertNotNull(syncGetException);
                            Assert.assertTrue(syncGetException instanceof TransactionCoordinatorClientException, syncGetException.getClass().getName());
                            Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                            transaction.commit().get();
                            Assert.assertEquals((String) subscribe.receive(5, TimeUnit.SECONDS).getValue(), "message");
                        }
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        if (Collections.singletonList(subscribe).get(0) != null) {
                            subscribe.close();
                        }
                        if (Collections.singletonList(build).get(0) != null) {
                            build.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th2;
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th3) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th3;
        }
    }

    @Test(dataProvider = "actors")
    public void testAddPartitionToTxn(String str, boolean z) throws Exception {
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).authentication(AuthenticationFactory.token(generateToken(this.kp, "client"))).enableTransaction(true).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).authentication(AuthenticationFactory.token(generateToken(this.kp, str))).enableTransaction(true).build();
            try {
                Transaction transaction = (Transaction) build.newTransaction().withTransactionTimeout(60L, TimeUnit.SECONDS).build().get();
                if (z) {
                    this.pulsarServiceList.get(0).getTransactionMetadataStoreService().removeTransactionMetadataStore(TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
                }
                Throwable syncGetException = syncGetException(build.getTcClient().addPublishPartitionToTxnAsync(transaction.getTxnID(), List.of(TOPIC)));
                TxnMeta txnMeta = (TxnMeta) this.pulsarServiceList.get(0).getTransactionMetadataStoreService().getTxnMeta(transaction.getTxnID()).get();
                if (str.equals("client") || str.equals("admin")) {
                    Assert.assertNull(syncGetException);
                    Assert.assertEquals(txnMeta.producedPartitions(), List.of(TOPIC));
                } else {
                    Assert.assertNotNull(syncGetException);
                    Assert.assertTrue(syncGetException instanceof TransactionCoordinatorClientException);
                    Assert.assertTrue(txnMeta.producedPartitions().isEmpty());
                }
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "actors")
    public void testAddSubscriptionToTxn(String str, boolean z) throws Exception {
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).authentication(AuthenticationFactory.token(generateToken(this.kp, "client"))).enableTransaction(true).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).authentication(AuthenticationFactory.token(generateToken(this.kp, str))).enableTransaction(true).build();
            try {
                Transaction transaction = (Transaction) build.newTransaction().withTransactionTimeout(60L, TimeUnit.SECONDS).build().get();
                if (z) {
                    this.pulsarServiceList.get(0).getTransactionMetadataStoreService().removeTransactionMetadataStore(TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
                }
                Throwable syncGetException = syncGetException(build.getTcClient().addSubscriptionToTxnAsync(transaction.getTxnID(), TOPIC, "sub"));
                TxnMeta txnMeta = (TxnMeta) this.pulsarServiceList.get(0).getTransactionMetadataStoreService().getTxnMeta(transaction.getTxnID()).get();
                if (str.equals("client") || str.equals("admin")) {
                    Assert.assertNull(syncGetException);
                    Assert.assertEquals(txnMeta.ackedPartitions().size(), 1);
                } else {
                    Assert.assertNotNull(syncGetException);
                    Assert.assertTrue(syncGetException instanceof TransactionCoordinatorClientException);
                    Assert.assertTrue(txnMeta.ackedPartitions().isEmpty());
                }
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testNoAuth() throws Exception {
        try {
            PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsarServiceList.get(0).getBrokerServiceUrl()).enableTransaction(true).build();
            try {
                Assert.fail("should have failed");
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
                throw th;
            }
        } catch (Exception e) {
            Assert.assertTrue(Exceptions.areExceptionsPresentInChain(e, new Class[]{PulsarClientException.AuthenticationException.class}));
        }
    }

    private static Throwable syncGetException(CompletableFuture<?> completableFuture) {
        try {
            completableFuture.get();
            return null;
        } catch (InterruptedException e) {
            return e;
        } catch (ExecutionException e2) {
            return FutureUtil.unwrapCompletionException(e2);
        }
    }
}
