package org.apache.pulsar.schema.compatibility;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Objects;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckOnTopicLevelTest.class */
public class SchemaTypeCompatibilityCheckOnTopicLevelTest extends MockedPulsarServiceBaseTest {
    private static final String CLUSTER_NAME = "test";
    private static final String PUBLIC_TENANT = "public";
    private static final String namespace = "test-namespace";
    private static final String namespaceName = "public/test-namespace";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setSystemTopicEnabled(true);
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant(PUBLIC_TENANT, TenantInfo.builder().allowedClusters(Collections.singleton("test")).build());
        this.admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(new String[]{"test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSetAlwaysInCompatibleStrategyOnTopicLevelAndCheckAlwaysInCompatible() throws PulsarClientException, PulsarServerException, PulsarAdminException {
        String topicName = TopicName.get(TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, "testSetAlwaysInCompatibleStrategyOnTopicLevelAndCheckAlwaysInCompatible").toString();
        this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName);
        this.pulsar.getAdminClient().topicPolicies().setSchemaCompatibilityStrategy(topicName, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar.getAdminClient().topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        });
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Objects.requireNonNull(producerBuilder);
        Assert.assertTrue(Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create).getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
    }

    @Test
    public void testSetAlwaysCompatibleOnNamespaceLevelAndCheckAlwaysInCompatible() throws PulsarClientException, PulsarServerException, PulsarAdminException {
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        String topicName = TopicName.get(TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, "testSetAlwaysCompatibleOnNamespaceLevelAndCheckAlwaysInCompatible").toString();
        this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName);
        this.pulsar.getAdminClient().topicPolicies().setSchemaCompatibilityStrategy(topicName, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar.getAdminClient().topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        });
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Objects.requireNonNull(producerBuilder);
        Assert.assertTrue(Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create).getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
    }

    @Test
    public void testDisableTopicPoliciesAndSetAlwaysInCompatibleOnNamespaceLevel() throws PulsarClientException, PulsarServerException, PulsarAdminException {
        this.conf.setTopicLevelPoliciesEnabled(false);
        this.conf.setSystemTopicEnabled(false);
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        String topicName = TopicName.get(TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, "testDisableTopicPoliciesAndSetAlwaysInCompatibleOnNamespaceLevel").toString();
        this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName);
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Objects.requireNonNull(producerBuilder);
        Assert.assertTrue(Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create).getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
    }

    @Test
    public void testDisableTopicPoliciesWithDefaultConfig() throws PulsarClientException, PulsarServerException, PulsarAdminException {
        this.conf.setTopicLevelPoliciesEnabled(false);
        this.conf.setSystemTopicEnabled(false);
        String topicName = TopicName.get(TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, "testDisableTopicPoliciesWithDefaultConfig").toString();
        this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName);
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName).create();
    }

    @Test
    public void testDefaultConfig() throws PulsarClientException, PulsarServerException, PulsarAdminException {
        String topicName = TopicName.get(TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, "testDefaultConfig").toString();
        this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName);
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName).create();
    }

    @Test
    public void testUpdateSchemaCompatibilityStrategyRepeatedly() throws PulsarClientException, PulsarServerException, PulsarAdminException {
        Assert.assertEquals(this.conf.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL);
        String topicName = TopicName.get(TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, "testUpdateSchemaCompatibilityStrategyRepeatedly").toString();
        this.pulsar.getAdminClient().topics().createNonPartitionedTopic(topicName);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.FULL);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespaceName));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), SchemaCompatibilityStrategy.UNDEFINED);
        });
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        this.admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespaceName, SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespaceName), SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), SchemaCompatibilityStrategy.UNDEFINED);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        });
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Objects.requireNonNull(producerBuilder);
        Assert.assertTrue(Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create).getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespaceName), SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        });
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        this.admin.topicPolicies().setSchemaCompatibilityStrategy(topicName, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        });
        ProducerBuilder producerBuilder2 = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Objects.requireNonNull(producerBuilder2);
        Assert.assertTrue(Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder2::create).getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
        this.admin.topicPolicies().removeSchemaCompatibilityStrategy(topicName);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        });
        this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonOne.class).build())).topic(topicName).create();
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName), SchemaCompatibilityStrategy.UNDEFINED);
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true), SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
        });
        ProducerBuilder producerBuilder3 = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(true).withPojo(Schemas.PersonThree.class).build())).topic(topicName);
        Objects.requireNonNull(producerBuilder3);
        Assert.assertTrue(Assert.expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder3::create).getMessage().contains("org.apache.avro.SchemaValidationException: Unable to read schema"));
    }
}
