package org.apache.pulsar.client.api;

import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javassist.compiler.TokenId;
import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest.class */
public class SimpleSchemaTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SimpleSchemaTest.class);
    private final boolean schemaValidationEnforced;

    /* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest$IncompatibleData.class */
    static class IncompatibleData {
        int i;
        int j;

        /* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest$IncompatibleData$IncompatibleDataBuilder.class */
        public static class IncompatibleDataBuilder {
            private int i;
            private int j;

            IncompatibleDataBuilder() {
            }

            public IncompatibleDataBuilder i(int i) {
                this.i = i;
                return this;
            }

            public IncompatibleDataBuilder j(int i) {
                this.j = i;
                return this;
            }

            public IncompatibleData build() {
                return new IncompatibleData(this.i, this.j);
            }

            public String toString() {
                return "SimpleSchemaTest.IncompatibleData.IncompatibleDataBuilder(i=" + this.i + ", j=" + this.j + ")";
            }
        }

        public static IncompatibleDataBuilder builder() {
            return new IncompatibleDataBuilder();
        }

        public int getI() {
            return this.i;
        }

        public int getJ() {
            return this.j;
        }

        public void setI(int i) {
            this.i = i;
        }

        public void setJ(int i) {
            this.j = i;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof IncompatibleData)) {
                return false;
            }
            IncompatibleData incompatibleData = (IncompatibleData) obj;
            return incompatibleData.canEqual(this) && getI() == incompatibleData.getI() && getJ() == incompatibleData.getJ();
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof IncompatibleData;
        }

        public int hashCode() {
            return (((1 * 59) + getI()) * 59) + getJ();
        }

        public String toString() {
            return "SimpleSchemaTest.IncompatibleData(i=" + getI() + ", j=" + getJ() + ")";
        }

        public IncompatibleData() {
        }

        public IncompatibleData(int i, int i2) {
            this.i = i;
            this.j = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest$V1Data.class */
    public static class V1Data {
        int i;

        /* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest$V1Data$V1DataBuilder.class */
        public static class V1DataBuilder {
            private int i;

            V1DataBuilder() {
            }

            public V1DataBuilder i(int i) {
                this.i = i;
                return this;
            }

            public V1Data build() {
                return new V1Data(this.i);
            }

            public String toString() {
                return "SimpleSchemaTest.V1Data.V1DataBuilder(i=" + this.i + ")";
            }
        }

        public static V1DataBuilder builder() {
            return new V1DataBuilder();
        }

        public int getI() {
            return this.i;
        }

        public void setI(int i) {
            this.i = i;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof V1Data)) {
                return false;
            }
            V1Data v1Data = (V1Data) obj;
            return v1Data.canEqual(this) && getI() == v1Data.getI();
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof V1Data;
        }

        public int hashCode() {
            return (1 * 59) + getI();
        }

        public String toString() {
            return "SimpleSchemaTest.V1Data(i=" + getI() + ")";
        }

        public V1Data() {
        }

        public V1Data(int i) {
            this.i = i;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest$V2Data.class */
    static class V2Data {
        int i;
        Integer j;

        /* loaded from: input_file:org/apache/pulsar/client/api/SimpleSchemaTest$V2Data$V2DataBuilder.class */
        public static class V2DataBuilder {
            private int i;
            private Integer j;

            V2DataBuilder() {
            }

            public V2DataBuilder i(int i) {
                this.i = i;
                return this;
            }

            public V2DataBuilder j(Integer num) {
                this.j = num;
                return this;
            }

            public V2Data build() {
                return new V2Data(this.i, this.j);
            }

            public String toString() {
                return "SimpleSchemaTest.V2Data.V2DataBuilder(i=" + this.i + ", j=" + this.j + ")";
            }
        }

        public static V2DataBuilder builder() {
            return new V2DataBuilder();
        }

        public int getI() {
            return this.i;
        }

        public Integer getJ() {
            return this.j;
        }

        public void setI(int i) {
            this.i = i;
        }

        public void setJ(Integer num) {
            this.j = num;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof V2Data)) {
                return false;
            }
            V2Data v2Data = (V2Data) obj;
            if (!v2Data.canEqual(this) || getI() != v2Data.getI()) {
                return false;
            }
            Integer j = getJ();
            Integer j2 = v2Data.getJ();
            return j == null ? j2 == null : j.equals(j2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof V2Data;
        }

        public int hashCode() {
            int i = (1 * 59) + getI();
            Integer j = getJ();
            return (i * 59) + (j == null ? 43 : j.hashCode());
        }

        public String toString() {
            return "SimpleSchemaTest.V2Data(i=" + getI() + ", j=" + getJ() + ")";
        }

        public V2Data() {
        }

        public V2Data(int i, Integer num) {
            this.i = i;
            this.j = num;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batchingModes")
    public static Object[][] batchingModes() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "schemaValidationModes")
    public static Object[][] schemaValidationModes() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @DataProvider(name = "topicDomain")
    public static Object[] topicDomain() {
        return new Object[]{"persistent://", "non-persistent://"};
    }

    @Factory(dataProvider = "schemaValidationModes")
    public SimpleSchemaTest(boolean z) {
        this.schemaValidationEnforced = z;
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSchemaValidationEnforced(this.schemaValidationEnforced);
        this.isTcpLookup = true;
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testString() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://my-property/my-ns/my-topic1").create();
            for (int i = 0; i < 10; i++) {
                try {
                    create.send("my-message-" + i);
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            for (int i2 = 0; i2 < 10; i2++) {
                Message<?> receive = subscribe.receive();
                Assert.assertEquals((String) receive.getValue(), "my-message-" + i2);
                subscribe.acknowledge(receive);
            }
            if (create != null) {
                create.close();
            }
            if (subscribe != null) {
                subscribe.close();
            }
        } catch (Throwable th3) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void newProducerNewTopicNewSchema() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            create.send(new V1Data(0));
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newProducerTopicExistsWithoutSchema() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("my-property/my-ns/schema-test").create();
        try {
            create.send("my-property/my-ns/schema-test".getBytes(StandardCharsets.UTF_8));
            if (create != null) {
                create.close();
            }
            create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
            try {
                create.send(new V1Data(0));
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void newProducerTopicExistsWithSchema() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            create.send(new V1Data(1));
            if (create != null) {
                create.close();
            }
            create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
            try {
                create.send(new V1Data(0));
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
        Producer create;
        Producer create2 = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            create2.send(new V1Data(0));
            if (create2 != null) {
                create2.close();
            }
            try {
                create = this.pulsarClient.newProducer(Schema.BYTES).topic("my-property/my-ns/schema-test").create();
                try {
                    if (this.schemaValidationEnforced) {
                        Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema if SchemaValidationEnabled is enabled");
                    } else {
                        create.send("junkdata".getBytes(StandardCharsets.UTF_8));
                    }
                    if (create != null) {
                        create.close();
                    }
                } finally {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } catch (PulsarClientException e) {
                if (this.schemaValidationEnforced) {
                    Assert.assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
                } else {
                    Assert.fail("Shouldn't throw IncompatibleSchemaException if SchemaValidationEnforced is disabled");
                }
            }
            try {
                Producer create3 = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic("my-property/my-ns/schema-test").create();
                try {
                    create3.send("junkdata".getBytes(StandardCharsets.UTF_8));
                    if (create3 != null) {
                        create3.close();
                    }
                } finally {
                    if (create3 != null) {
                        try {
                            create3.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                }
            } catch (PulsarClientException e2) {
                Assert.assertTrue(e2.getCause() instanceof SchemaSerializationException);
            }
            try {
                create = this.pulsarClient.newProducer(Schema.NATIVE_AVRO(new Schema.Parser().parse(new ByteArrayInputStream(Schema.AVRO(V1Data.class).getSchemaInfo().getSchema())))).topic("my-property/my-ns/schema-test").create();
                try {
                    create.send("junkdata".getBytes(StandardCharsets.UTF_8));
                    if (create != null) {
                        create.close();
                    }
                } finally {
                }
            } catch (PulsarClientException e3) {
                Assert.assertTrue(e3.getCause() instanceof SchemaSerializationException);
            }
        } catch (Throwable th3) {
            if (create2 != null) {
                try {
                    create2.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
        Schema AVRO = Schema.AVRO(V1Data.class);
        AvroWriter avroWriter = new AvroWriter(new Schema.Parser().parse(new ByteArrayInputStream(AVRO.getSchemaInfo().getSchema())));
        Schema AVRO2 = Schema.AVRO(V2Data.class);
        AvroWriter avroWriter2 = new AvroWriter(new Schema.Parser().parse(new ByteArrayInputStream(AVRO2.getSchemaInfo().getSchema())));
        Producer create = this.pulsarClient.newProducer(AVRO).topic("my-property/my-ns/schema-test").create();
        if (create != null) {
            create.close();
        }
        Producer create2 = this.pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            create2.send(new V2Data(-1, -1));
            if (create2 != null) {
                create2.close();
            }
            V1Data v1Data = new V1Data(2);
            V2Data v2Data = new V2Data(3, 5);
            byte[] write = avroWriter.write(v1Data);
            byte[] write2 = avroWriter2.write(v2Data);
            create2 = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic("my-property/my-ns/schema-test").create();
            try {
                Consumer subscribe = this.pulsarClient.newConsumer(AVRO2).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
                try {
                    Assert.expectThrows(SchemaSerializationException.class, () -> {
                        create2.send(write);
                    });
                    create2.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class))).value(write).send();
                    create2.send(write2);
                    Message receive = subscribe.receive();
                    V2Data v2Data2 = (V2Data) receive.getValue();
                    Assert.assertEquals(v1Data.i, v2Data2.i);
                    Assert.assertNull(v2Data2.j);
                    Assert.assertEquals(receive.getSchemaVersion(), new LongSchemaVersion(0L).bytes());
                    Message receive2 = subscribe.receive();
                    Assert.assertEquals(v2Data, receive2.getValue());
                    Assert.assertEquals(receive2.getSchemaVersion(), new LongSchemaVersion(1L).bytes());
                    try {
                        create2.newMessage(Schema.BYTES).value(write).send();
                        if (this.schemaValidationEnforced) {
                            Assert.fail("Shouldn't be able to send to a schema'd topic with no schema if SchemaValidationEnabled is enabled");
                        }
                        Assert.assertEquals(subscribe.receive().getSchemaVersion(), SchemaVersion.Empty.bytes());
                    } catch (PulsarClientException e) {
                        if (this.schemaValidationEnforced) {
                            Assert.assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
                        } else {
                            Assert.fail("Shouldn't throw IncompatibleSchemaException if SchemaValidationEnforced is disabled");
                        }
                    }
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    if (create2 != null) {
                        create2.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
        org.apache.avro.Schema parse = new Schema.Parser().parse(new ByteArrayInputStream(Schema.AVRO(V1Data.class).getSchemaInfo().getSchema()));
        AvroWriter avroWriter = new AvroWriter(parse);
        Schema AVRO = Schema.AVRO(V2Data.class);
        org.apache.avro.Schema parse2 = new Schema.Parser().parse(new ByteArrayInputStream(AVRO.getSchemaInfo().getSchema()));
        AvroWriter avroWriter2 = new AvroWriter(parse2);
        V1Data v1Data = new V1Data(2);
        V2Data v2Data = new V2Data(3, 5);
        byte[] write = avroWriter.write(v1Data);
        byte[] write2 = avroWriter2.write(v2Data);
        Producer create = this.pulsarClient.newProducer(Schema.NATIVE_AVRO(parse)).topic("my-property/my-ns/schema-test").create();
        if (create != null) {
            create.close();
        }
        Producer create2 = this.pulsarClient.newProducer(Schema.NATIVE_AVRO(parse2)).topic("my-property/my-ns/schema-test").create();
        try {
            create2.send(write2);
            if (create2 != null) {
                create2.close();
            }
            create2 = this.pulsarClient.newProducer(Schema.NATIVE_AVRO(parse)).topic("my-property/my-ns/schema-test").create();
            try {
                Consumer subscribe = this.pulsarClient.newConsumer(AVRO).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
                try {
                    create2.newMessage(Schema.NATIVE_AVRO(parse)).value(write).send();
                    create2.newMessage(Schema.NATIVE_AVRO(parse2)).value(write2).send();
                    Message receive = subscribe.receive();
                    V2Data v2Data2 = (V2Data) receive.getValue();
                    Assert.assertEquals(v1Data.i, v2Data2.i);
                    Assert.assertNull(v2Data2.j);
                    Assert.assertEquals(receive.getSchemaVersion(), new LongSchemaVersion(0L).bytes());
                    Message receive2 = subscribe.receive();
                    Assert.assertEquals(v2Data, receive2.getValue());
                    Assert.assertEquals(receive2.getSchemaVersion(), new LongSchemaVersion(1L).bytes());
                    try {
                        create2.newMessage(Schema.BYTES).value(write).send();
                        if (this.schemaValidationEnforced) {
                            Assert.fail("Shouldn't be able to send to a schema'd topic with no schema if SchemaValidationEnabled is enabled");
                        }
                        Assert.assertEquals(subscribe.receive().getSchemaVersion(), SchemaVersion.Empty.bytes());
                    } catch (PulsarClientException e) {
                        if (this.schemaValidationEnforced) {
                            Assert.assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
                        } else {
                            Assert.fail("Shouldn't throw IncompatibleSchemaException if SchemaValidationEnforced is disabled");
                        }
                    }
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    if (create2 != null) {
                        create2.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void newProducerForMessageOnTopicWithDifferentSchemaType() throws Exception {
        V1Data v1Data = new V1Data(2);
        V2Data v2Data = new V2Data(3, 5);
        V1Data v1Data2 = new V1Data(8);
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(V2Data.class)).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
            try {
                create.newMessage().value(v1Data).send();
                create.newMessage(Schema.AVRO(V2Data.class)).value(v2Data).send();
                create.newMessage(Schema.AVRO(V1Data.class)).value(v1Data2).send();
                Message receive = subscribe.receive();
                V2Data v2Data2 = (V2Data) receive.getValue();
                Assert.assertEquals(v1Data.i, v2Data2.i);
                Assert.assertNull(v2Data2.j);
                Assert.assertEquals(receive.getSchemaVersion(), new LongSchemaVersion(0L).bytes());
                Message receive2 = subscribe.receive();
                Assert.assertEquals(v2Data, receive2.getValue());
                Assert.assertEquals(receive2.getSchemaVersion(), new LongSchemaVersion(1L).bytes());
                Message receive3 = subscribe.receive();
                V2Data v2Data3 = (V2Data) receive3.getValue();
                Assert.assertEquals(v1Data2.i, v2Data3.i);
                Assert.assertNull(v2Data3.j);
                Assert.assertEquals(receive3.getSchemaVersion(), new LongSchemaVersion(0L).bytes());
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception {
        Schema AVRO = Schema.AVRO(V1Data.class);
        AvroWriter avroWriter = new AvroWriter(new Schema.Parser().parse(new ByteArrayInputStream(AVRO.getSchemaInfo().getSchema())));
        Schema AVRO2 = Schema.AVRO(V2Data.class);
        AvroWriter avroWriter2 = new AvroWriter(new Schema.Parser().parse(new ByteArrayInputStream(AVRO2.getSchemaInfo().getSchema())));
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("my-property/my-ns/schema-test").create();
        try {
            Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
            for (int i = 0; i < 2; i++) {
                try {
                    V1Data v1Data = new V1Data(i);
                    V2Data v2Data = new V2Data(i, Integer.valueOf(-i));
                    byte[] write = avroWriter.write(v1Data);
                    byte[] write2 = avroWriter2.write(v2Data);
                    create.newMessage(Schema.AUTO_PRODUCE_BYTES(AVRO)).value(write).send();
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getSchemaVersion(), new LongSchemaVersion(0L).bytes());
                    Assert.assertEquals(receive.getData(), write);
                    create.newMessage(Schema.AUTO_PRODUCE_BYTES(AVRO2)).value(write2).send();
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getSchemaVersion(), new LongSchemaVersion(1L).bytes());
                    Assert.assertEquals(receive2.getData(), write2);
                } finally {
                }
            }
            if (subscribe != null) {
                subscribe.close();
            }
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(this.admin.schemas().getAllSchemas("my-property/my-ns/schema-test"), Arrays.asList(AVRO.getSchemaInfo(), AVRO2.getSchemaInfo()));
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newNativeAvroProducerForMessageSchemaOnTopicInitialWithNoSchema() throws Exception {
        Schema AVRO = Schema.AVRO(V1Data.class);
        org.apache.avro.Schema parse = new Schema.Parser().parse(new ByteArrayInputStream(AVRO.getSchemaInfo().getSchema()));
        AvroWriter avroWriter = new AvroWriter(parse);
        Schema AVRO2 = Schema.AVRO(V2Data.class);
        org.apache.avro.Schema parse2 = new Schema.Parser().parse(new ByteArrayInputStream(AVRO2.getSchemaInfo().getSchema()));
        AvroWriter avroWriter2 = new AvroWriter(parse2);
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("my-property/my-ns/schema-test").create();
        try {
            Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
            for (int i = 0; i < 2; i++) {
                try {
                    V1Data v1Data = new V1Data(i);
                    V2Data v2Data = new V2Data(i, Integer.valueOf(-i));
                    byte[] write = avroWriter.write(v1Data);
                    byte[] write2 = avroWriter2.write(v2Data);
                    create.newMessage(Schema.NATIVE_AVRO(parse)).value(write).send();
                    Message<byte[]> receive = subscribe.receive();
                    Assert.assertEquals(receive.getSchemaVersion(), new LongSchemaVersion(0L).bytes());
                    Assert.assertEquals(receive.getData(), write);
                    create.newMessage(Schema.NATIVE_AVRO(parse2)).value(write2).send();
                    Message<byte[]> receive2 = subscribe.receive();
                    Assert.assertEquals(receive2.getSchemaVersion(), new LongSchemaVersion(1L).bytes());
                    Assert.assertEquals(receive2.getData(), write2);
                } finally {
                }
            }
            if (subscribe != null) {
                subscribe.close();
            }
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(this.admin.schemas().getAllSchemas("my-property/my-ns/schema-test"), Arrays.asList(AVRO.getSchemaInfo(), AVRO2.getSchemaInfo()));
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newProducerForMessageSchemaWithBatch() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(V2Data.class)).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic("my-property/my-ns/schema-test").enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).create();
        AvroWriter avroWriter = new AvroWriter(ReflectData.AllowNull.get().getSchema(V1Data.class));
        AvroWriter avroWriter2 = new AvroWriter(ReflectData.AllowNull.get().getSchema(V2Data.class));
        AvroWriter avroWriter3 = new AvroWriter(ReflectData.AllowNull.get().getSchema(IncompatibleData.class));
        for (int i = 0; i < 20; i++) {
            if ((i / 5) % 2 == 0) {
                create.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class))).value(avroWriter.write(new V1Data(i))).sendAsync();
            } else {
                create.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class))).value(avroWriter2.write(new V2Data(i, Integer.valueOf(i + 20)))).sendAsync();
            }
            if ((i + 1) % 3 == 0) {
                try {
                    create.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class))).value(avroWriter3.write(new IncompatibleData(-i, -i))).send();
                } catch (Exception e) {
                    Assert.assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException, e.getMessage());
                }
            }
        }
        create.flush();
        for (int i2 = 0; i2 < 20; i2++) {
            V2Data v2Data = (V2Data) subscribe.receive().getValue();
            if ((i2 / 5) % 2 == 0) {
                Assert.assertNull(v2Data.j);
                Assert.assertEquals(v2Data.i, i2);
            } else {
                Assert.assertEquals(v2Data, new V2Data(i2, Integer.valueOf(i2 + 20)));
            }
        }
        subscribe.close();
    }

    @Test
    public void newNativeAvroProducerForMessageSchemaWithBatch() throws Exception {
        org.apache.avro.Schema parse = new Schema.Parser().parse(new ByteArrayInputStream(Schema.AVRO(V1Data.class).getSchemaInfo().getSchema()));
        new AvroWriter(parse);
        org.apache.avro.Schema parse2 = new Schema.Parser().parse(new ByteArrayInputStream(Schema.AVRO(V2Data.class).getSchemaInfo().getSchema()));
        new AvroWriter(parse2);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.BYTES).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.NATIVE_AVRO(parse)).topic("my-property/my-ns/schema-test").enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).create();
        AvroWriter avroWriter = new AvroWriter(ReflectData.AllowNull.get().getSchema(V1Data.class));
        AvroWriter avroWriter2 = new AvroWriter(ReflectData.AllowNull.get().getSchema(V2Data.class));
        AvroWriter avroWriter3 = new AvroWriter(ReflectData.AllowNull.get().getSchema(IncompatibleData.class));
        for (int i = 0; i < 20; i++) {
            if ((i / 5) % 2 == 0) {
                create.newMessage(Schema.NATIVE_AVRO(parse)).value(avroWriter.write(new V1Data(i))).sendAsync();
            } else {
                create.newMessage(Schema.NATIVE_AVRO(parse2)).value(avroWriter2.write(new V2Data(i, Integer.valueOf(i + 20)))).sendAsync();
            }
            if ((i + 1) % 3 == 0) {
                try {
                    create.newMessage(Schema.NATIVE_AVRO(new Schema.Parser().parse(new ByteArrayInputStream(Schema.AVRO(IncompatibleData.class).getSchemaInfo().getSchema())))).value(avroWriter3.write(new IncompatibleData(-i, -i))).send();
                } catch (Exception e) {
                    Assert.assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException, e.getMessage());
                }
            }
        }
        create.flush();
        for (int i2 = 0; i2 < 20; i2++) {
            byte[] data = subscribe.receive().getData();
            if ((i2 / 5) % 2 == 0) {
                Assert.assertEquals(((V1Data) new AvroReader(parse).read(data)).i, i2);
            } else {
                Assert.assertEquals((V2Data) new AvroReader(parse2).read(data), new V2Data(i2, Integer.valueOf(i2 + 20)));
            }
        }
        subscribe.close();
    }

    @Test
    public void newProducerWithMultipleSchemaDisabled() throws Exception {
        AvroWriter avroWriter = new AvroWriter(ReflectData.AllowNull.get().getSchema(V1Data.class));
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("my-property/my-ns/schema-test").enableMultiSchema(false).create();
        try {
            Assert.assertThrows(PulsarClientException.InvalidMessageException.class, () -> {
                create.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class))).value(avroWriter.write(new V1Data(0))).send();
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newConsumerWithSchemaOnNewTopic() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
            try {
                V1Data v1Data = new V1Data(1);
                create.send(v1Data);
                Assert.assertEquals(v1Data, subscribe.receive().getValue());
                if (create != null) {
                    create.close();
                }
                if (subscribe != null) {
                    subscribe.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newConsumerWithSchemaOnExistingTopicWithoutSchema() throws Exception {
        try {
            Producer<byte[]> create = this.pulsarClient.newProducer().topic("my-property/my-ns/schema-test").create();
            try {
                Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
                try {
                    Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (subscribe != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (PulsarClientException e) {
            Assert.assertTrue(e instanceof PulsarClientException.IncompatibleSchemaException);
        }
    }

    @Test
    public void newConsumerWithSchemaTopicHasSchema() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
            try {
                V1Data v1Data = new V1Data(1);
                create.send(v1Data);
                Assert.assertEquals(v1Data, subscribe.receive().getValue());
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void newBytesConsumerWithTopicWithSchema() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").create();
        try {
            Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscribe();
            try {
                create.send(new V1Data(1));
                Assert.assertTrue(subscribe.receive().getValue().length > 0);
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void getSchemaVersionFromMessagesBatchingDisabled() throws Exception {
        getSchemaVersionFromMessages(false);
    }

    @Test
    public void getSchemaVersionFromMessagesBatchingEnabled() throws Exception {
        getSchemaVersionFromMessages(true);
    }

    private void getSchemaVersionFromMessages(boolean z) throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").enableBatching(z).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(V1Data.class)).topic("my-property/my-ns/schema-test").subscriptionName("sub1").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                create.send(new V1Data(1));
                Message receive = subscribe.receive();
                Assert.assertNotNull(receive.getSchemaVersion());
                Assert.assertEquals(receive.getValue(), new V1Data(1));
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(dataProvider = "batchingModes")
    public void testAutoConsume(boolean z) throws Exception {
        String str = "my-property/my-ns/schema-test-auto-consume-" + z;
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(str).enableBatching(z).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(str).subscriptionName("sub1").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.sendAsync(new V1Data(i));
                } catch (Throwable th) {
                    if (subscribe != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            create.flush();
            for (int i2 = 0; i2 < 10; i2++) {
                MessageImpl receive = subscribe.receive();
                Assert.assertNotNull(receive.getSchemaVersion());
                Assert.assertEquals(((GenericRecord) receive.getValue()).getField("i"), Integer.valueOf(i2));
                Assert.assertNotNull((org.apache.avro.Schema) receive.getSchemaInternal().getNativeSchema().get());
                Assert.assertNotNull((org.apache.avro.Schema) receive.getReaderSchema().get().getNativeSchema().get());
            }
            if (subscribe != null) {
                subscribe.close();
            }
            if (create != null) {
                create.close();
            }
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test(dataProvider = "batchingModes")
    public void testAutoKeyValueConsume(boolean z) throws Exception {
        String str = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + z;
        Schema KeyValue = Schema.KeyValue(Schema.AVRO(V1Data.class), Schema.AVRO(V1Data.class), KeyValueEncodingType.SEPARATED);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AVRO(V1Data.class), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub3b").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(KeyValue).topic(str).enableBatching(z).create();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(str).subscriptionName("sub0").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub1").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                    try {
                        Consumer subscribe4 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(V1Data.class), Schema.AVRO(V1Data.class), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub2").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                        try {
                            Consumer subscribe5 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AVRO(V1Data.class), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub3").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                            try {
                                subscribe = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(V1Data.class), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub4").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                                for (int i = 0; i < 10; i++) {
                                    try {
                                        create.sendAsync(new KeyValue(new V1Data(i * 100), new V1Data(i * 1000)));
                                    } finally {
                                    }
                                }
                                create.flush();
                                for (int i2 = 0; i2 < 10; i2++) {
                                    Message<?> receive = subscribe2.receive();
                                    KeyValue keyValue = (KeyValue) ((GenericRecord) receive.getValue()).getNativeObject();
                                    Assert.assertNotNull(receive.getSchemaVersion());
                                    Assert.assertEquals(((GenericRecord) keyValue.getKey()).getField("i"), Integer.valueOf(i2 * 100));
                                    Assert.assertEquals(((GenericRecord) keyValue.getValue()).getField("i"), Integer.valueOf(i2 * 1000));
                                    subscribe2.acknowledge(receive);
                                    KeyValueSchemaImpl keyValueSchemaImpl = (Schema) receive.getReaderSchema().get();
                                    KeyValueSchemaImpl keyValueSchemaImpl2 = keyValueSchemaImpl;
                                    Assert.assertEquals(SchemaType.AVRO, keyValueSchemaImpl2.getKeySchema().getSchemaInfo().getType());
                                    Assert.assertEquals(SchemaType.AVRO, keyValueSchemaImpl2.getValueSchema().getSchemaInfo().getType());
                                    Assert.assertNotNull(keyValueSchemaImpl.getSchemaInfo());
                                }
                                for (int i3 = 0; i3 < 10; i3++) {
                                    Message<?> receive2 = subscribe3.receive();
                                    Assert.assertNotNull(receive2.getSchemaVersion());
                                    Assert.assertEquals(((GenericRecord) ((KeyValue) receive2.getValue()).getKey()).getField("i"), Integer.valueOf(i3 * 100));
                                    Assert.assertEquals(((GenericRecord) ((KeyValue) receive2.getValue()).getValue()).getField("i"), Integer.valueOf(i3 * 1000));
                                    subscribe3.acknowledge(receive2);
                                    KeyValueSchemaImpl keyValueSchemaImpl3 = receive2.getReaderSchema().get();
                                    Assert.assertNotNull(keyValueSchemaImpl3.getKeySchema());
                                    Assert.assertNotNull(keyValueSchemaImpl3.getValueSchema());
                                }
                                for (int i4 = 0; i4 < 10; i4++) {
                                    Message<?> receive3 = subscribe4.receive();
                                    Assert.assertNotNull(receive3.getSchemaVersion());
                                    Assert.assertEquals(((V1Data) ((KeyValue) receive3.getValue()).getKey()).i, i4 * 100);
                                    Assert.assertEquals(((V1Data) ((KeyValue) receive3.getValue()).getValue()).i, i4 * 1000);
                                    subscribe4.acknowledge(receive3);
                                    KeyValueSchemaImpl keyValueSchemaImpl4 = receive3.getReaderSchema().get();
                                    Assert.assertNotNull(keyValueSchemaImpl4.getKeySchema());
                                    Assert.assertNotNull(keyValueSchemaImpl4.getValueSchema());
                                }
                                for (int i5 = 0; i5 < 10; i5++) {
                                    Message<?> receive4 = subscribe5.receive();
                                    Assert.assertNotNull(receive4.getSchemaVersion());
                                    Assert.assertEquals(((GenericRecord) ((KeyValue) receive4.getValue()).getKey()).getField("i"), Integer.valueOf(i5 * 100));
                                    Assert.assertEquals(((V1Data) ((KeyValue) receive4.getValue()).getValue()).i, i5 * 1000);
                                    subscribe5.acknowledge(receive4);
                                    KeyValueSchemaImpl keyValueSchemaImpl5 = receive4.getReaderSchema().get();
                                    Assert.assertNotNull(keyValueSchemaImpl5.getKeySchema());
                                    Assert.assertNotNull(keyValueSchemaImpl5.getValueSchema());
                                }
                                for (int i6 = 0; i6 < 10; i6++) {
                                    Message<?> receive5 = subscribe.receive();
                                    Assert.assertNotNull(receive5.getSchemaVersion());
                                    Assert.assertEquals(((GenericRecord) ((KeyValue) receive5.getValue()).getKey()).getField("i"), Integer.valueOf(i6 * 100));
                                    Assert.assertEquals(((V1Data) ((KeyValue) receive5.getValue()).getValue()).i, i6 * 1000);
                                    subscribe.acknowledge(receive5);
                                    KeyValueSchemaImpl keyValueSchemaImpl6 = receive5.getReaderSchema().get();
                                    Assert.assertNotNull(keyValueSchemaImpl6.getKeySchema());
                                    Assert.assertNotNull(keyValueSchemaImpl6.getValueSchema());
                                }
                                for (int i7 = 0; i7 < 10; i7++) {
                                    Message<?> receive6 = subscribe.receive();
                                    Assert.assertNotNull(receive6.getSchemaVersion());
                                    Assert.assertEquals(((V1Data) ((KeyValue) receive6.getValue()).getKey()).i, i7 * 100);
                                    Assert.assertEquals(((GenericRecord) ((KeyValue) receive6.getValue()).getValue()).getField("i"), Integer.valueOf(i7 * 1000));
                                    subscribe.acknowledge(receive6);
                                }
                                if (subscribe != null) {
                                    subscribe.close();
                                }
                                if (subscribe5 != null) {
                                    subscribe5.close();
                                }
                                if (subscribe4 != null) {
                                    subscribe4.close();
                                }
                                if (subscribe3 != null) {
                                    subscribe3.close();
                                }
                                if (subscribe2 != null) {
                                    subscribe2.close();
                                }
                                if (create != null) {
                                    create.close();
                                }
                                if (subscribe != null) {
                                    subscribe.close();
                                }
                                Schema KeyValue2 = Schema.KeyValue(Schema.AVRO(V2Data.class), Schema.AVRO(V2Data.class), KeyValueEncodingType.SEPARATED);
                                Consumer subscribe6 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AVRO(V2Data.class), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub3b").subscribe();
                                try {
                                    Producer create2 = this.pulsarClient.newProducer(KeyValue2).topic(str).enableBatching(z).create();
                                    try {
                                        Consumer subscribe7 = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(str).subscriptionName("sub0").subscribe();
                                        try {
                                            Consumer subscribe8 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub1").subscribe();
                                            try {
                                                Consumer subscribe9 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(V2Data.class), Schema.AVRO(V2Data.class), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub2").subscribe();
                                                try {
                                                    Consumer subscribe10 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AVRO(V2Data.class), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub3").subscribe();
                                                    try {
                                                        subscribe4 = this.pulsarClient.newConsumer(Schema.KeyValue(Schema.AVRO(V2Data.class), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)).topic(str).subscriptionName("sub4").subscribe();
                                                        for (int i8 = 0; i8 < 10; i8++) {
                                                            try {
                                                                create2.sendAsync(new KeyValue(new V2Data(i8 * 100, Integer.valueOf(i8)), new V2Data(i8 * 1000, Integer.valueOf(i8 * 20))));
                                                            } finally {
                                                            }
                                                        }
                                                        create2.flush();
                                                        for (int i9 = 0; i9 < 10; i9++) {
                                                            Message receive7 = subscribe7.receive();
                                                            KeyValue keyValue2 = (KeyValue) ((GenericRecord) receive7.getValue()).getNativeObject();
                                                            Assert.assertNotNull(receive7.getSchemaVersion());
                                                            Assert.assertEquals(((GenericRecord) keyValue2.getKey()).getField("i"), Integer.valueOf(i9 * 100));
                                                            Assert.assertEquals(((GenericRecord) keyValue2.getValue()).getField("i"), Integer.valueOf(i9 * 1000));
                                                            Assert.assertEquals(((GenericRecord) keyValue2.getKey()).getField("j"), Integer.valueOf(i9));
                                                            Assert.assertEquals(((GenericRecord) keyValue2.getValue()).getField("j"), Integer.valueOf(i9 * 20));
                                                        }
                                                        for (int i10 = 0; i10 < 10; i10++) {
                                                            Message receive8 = subscribe8.receive();
                                                            Assert.assertNotNull(receive8.getSchemaVersion());
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive8.getValue()).getKey()).getField("i"), Integer.valueOf(i10 * 100));
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive8.getValue()).getValue()).getField("i"), Integer.valueOf(i10 * 1000));
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive8.getValue()).getKey()).getField("j"), Integer.valueOf(i10));
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive8.getValue()).getValue()).getField("j"), Integer.valueOf(i10 * 20));
                                                        }
                                                        for (int i11 = 0; i11 < 10; i11++) {
                                                            Message receive9 = subscribe9.receive();
                                                            Assert.assertNotNull(receive9.getSchemaVersion());
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive9.getValue()).getKey()).i, i11 * 100);
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive9.getValue()).getValue()).i, i11 * 1000);
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive9.getValue()).getKey()).j, Integer.valueOf(i11));
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive9.getValue()).getValue()).j, Integer.valueOf(i11 * 20));
                                                        }
                                                        for (int i12 = 0; i12 < 10; i12++) {
                                                            Message receive10 = subscribe10.receive();
                                                            Assert.assertNotNull(receive10.getSchemaVersion());
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive10.getValue()).getKey()).getField("i"), Integer.valueOf(i12 * 100));
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive10.getValue()).getValue()).i, i12 * 1000);
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive10.getValue()).getKey()).getField("j"), Integer.valueOf(i12));
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive10.getValue()).getValue()).j, Integer.valueOf(i12 * 20));
                                                        }
                                                        for (int i13 = 0; i13 < 10; i13++) {
                                                            Message receive11 = subscribe6.receive();
                                                            Assert.assertNotNull(receive11.getSchemaVersion());
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive11.getValue()).getKey()).getField("i"), Integer.valueOf(i13 * 100));
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive11.getValue()).getValue()).i, i13 * 1000);
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive11.getValue()).getKey()).getField("j"), Integer.valueOf(i13));
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive11.getValue()).getValue()).j, Integer.valueOf(i13 * 20));
                                                        }
                                                        for (int i14 = 0; i14 < 10; i14++) {
                                                            Message receive12 = subscribe4.receive();
                                                            Assert.assertNotNull(receive12.getSchemaVersion());
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive12.getValue()).getKey()).i, i14 * 100);
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive12.getValue()).getValue()).getField("i"), Integer.valueOf(i14 * 1000));
                                                            Assert.assertEquals(((V2Data) ((KeyValue) receive12.getValue()).getKey()).j, Integer.valueOf(i14));
                                                            Assert.assertEquals(((GenericRecord) ((KeyValue) receive12.getValue()).getValue()).getField("j"), Integer.valueOf(i14 * 20));
                                                        }
                                                        if (subscribe4 != null) {
                                                            subscribe4.close();
                                                        }
                                                        if (subscribe10 != null) {
                                                            subscribe10.close();
                                                        }
                                                        if (subscribe9 != null) {
                                                            subscribe9.close();
                                                        }
                                                        if (subscribe8 != null) {
                                                            subscribe8.close();
                                                        }
                                                        if (subscribe7 != null) {
                                                            subscribe7.close();
                                                        }
                                                        if (create2 != null) {
                                                            create2.close();
                                                        }
                                                        if (subscribe6 != null) {
                                                            subscribe6.close();
                                                        }
                                                    } finally {
                                                        if (subscribe10 != null) {
                                                            try {
                                                                subscribe10.close();
                                                            } catch (Throwable th) {
                                                                th.addSuppressed(th);
                                                            }
                                                        }
                                                    }
                                                } finally {
                                                    if (subscribe9 != null) {
                                                        try {
                                                            subscribe9.close();
                                                        } catch (Throwable th2) {
                                                            th.addSuppressed(th2);
                                                        }
                                                    }
                                                }
                                            } finally {
                                                if (subscribe8 != null) {
                                                    try {
                                                        subscribe8.close();
                                                    } catch (Throwable th3) {
                                                        th.addSuppressed(th3);
                                                    }
                                                }
                                            }
                                        } finally {
                                            if (subscribe7 != null) {
                                                try {
                                                    subscribe7.close();
                                                } catch (Throwable th4) {
                                                    th.addSuppressed(th4);
                                                }
                                            }
                                        }
                                    } catch (Throwable th5) {
                                        if (create2 != null) {
                                            try {
                                                create2.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        }
                                        throw th5;
                                    }
                                } finally {
                                    if (subscribe6 != null) {
                                        try {
                                            subscribe6.close();
                                        } catch (Throwable th7) {
                                            th.addSuppressed(th7);
                                        }
                                    }
                                }
                            } finally {
                                if (subscribe5 != null) {
                                    try {
                                        subscribe5.close();
                                    } catch (Throwable th8) {
                                        th.addSuppressed(th8);
                                    }
                                }
                            }
                        } finally {
                            if (subscribe4 != null) {
                                try {
                                    subscribe4.close();
                                } catch (Throwable th9) {
                                    th.addSuppressed(th9);
                                }
                            }
                        }
                    } finally {
                        if (subscribe3 != null) {
                            try {
                                subscribe3.close();
                            } catch (Throwable th10) {
                                th.addSuppressed(th10);
                            }
                        }
                    }
                } finally {
                    if (subscribe2 != null) {
                        try {
                            subscribe2.close();
                        } catch (Throwable th11) {
                            th.addSuppressed(th11);
                        }
                    }
                }
            } catch (Throwable th12) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th13) {
                        th12.addSuppressed(th13);
                    }
                }
                throw th12;
            }
        } finally {
            if (subscribe != null) {
                try {
                    subscribe.close();
                } catch (Throwable th14) {
                    th.addSuppressed(th14);
                }
            }
        }
    }

    @Test
    public void testAutoKeyValueConsumeGenericObject() throws Exception {
        String str = "my-property/my-ns/schema-test-auto-keyvalue-consume-" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.KeyValue(Schema.AVRO(V1Data.class), Schema.AVRO(V1Data.class), KeyValueEncodingType.SEPARATED)).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(str).subscriptionName("sub0").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    create.sendAsync(new KeyValue(new V1Data(i * 100), new V1Data(i * 1000)));
                } catch (Throwable th) {
                    if (subscribe != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            create.flush();
            for (int i2 = 0; i2 < 10; i2++) {
                Message<?> receive = subscribe.receive();
                KeyValue keyValue = (KeyValue) ((GenericRecord) receive.getValue()).getNativeObject();
                Assert.assertNotNull(receive.getSchemaVersion());
                Assert.assertEquals(((GenericRecord) keyValue.getKey()).getField("i"), Integer.valueOf(i2 * 100));
                Assert.assertEquals(((GenericRecord) keyValue.getValue()).getField("i"), Integer.valueOf(i2 * 1000));
                subscribe.acknowledge(receive);
                KeyValueSchemaImpl keyValueSchemaImpl = receive.getReaderSchema().get();
                Assert.assertNotNull(keyValueSchemaImpl.getKeySchema());
                Assert.assertNotNull(keyValueSchemaImpl.getValueSchema());
                Assert.assertTrue(keyValueSchemaImpl.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V1Data"));
                Assert.assertTrue(keyValueSchemaImpl.getValueSchema().getSchemaInfo().getSchemaDefinition().contains("V1Data"));
                Assert.assertTrue(keyValueSchemaImpl.getKeySchema().getNativeSchema().isPresent());
                Assert.assertTrue(keyValueSchemaImpl.getValueSchema().getNativeSchema().isPresent());
            }
            create = this.pulsarClient.newProducer(Schema.KeyValue(Schema.AVRO(V2Data.class), Schema.AVRO(V2Data.class), KeyValueEncodingType.SEPARATED)).topic(str).create();
            for (int i3 = 0; i3 < 10; i3++) {
                try {
                    create.sendAsync(new KeyValue(new V2Data(i3 * 100, Integer.valueOf(i3)), new V2Data(i3 * 1000, Integer.valueOf(i3 * 20))));
                } catch (Throwable th3) {
                    throw th3;
                }
            }
            create.flush();
            for (int i4 = 0; i4 < 10; i4++) {
                Message receive2 = subscribe.receive();
                KeyValue keyValue2 = (KeyValue) ((GenericRecord) receive2.getValue()).getNativeObject();
                Assert.assertNotNull(receive2.getSchemaVersion());
                Assert.assertEquals(((GenericRecord) keyValue2.getKey()).getField("i"), Integer.valueOf(i4 * 100));
                Assert.assertEquals(((GenericRecord) keyValue2.getValue()).getField("i"), Integer.valueOf(i4 * 1000));
                Assert.assertEquals(((GenericRecord) keyValue2.getKey()).getField("j"), Integer.valueOf(i4));
                Assert.assertEquals(((GenericRecord) keyValue2.getValue()).getField("j"), Integer.valueOf(i4 * 20));
                KeyValueSchemaImpl keyValueSchemaImpl2 = receive2.getReaderSchema().get();
                Assert.assertNotNull(keyValueSchemaImpl2.getKeySchema());
                Assert.assertNotNull(keyValueSchemaImpl2.getValueSchema());
                Assert.assertTrue(keyValueSchemaImpl2.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V2Data"));
                Assert.assertTrue(keyValueSchemaImpl2.getValueSchema().getSchemaInfo().getSchemaDefinition().contains("V2Data"));
                Assert.assertTrue(keyValueSchemaImpl2.getKeySchema().getNativeSchema().isPresent());
                Assert.assertTrue(keyValueSchemaImpl2.getValueSchema().getNativeSchema().isPresent());
            }
            if (create != null) {
                create.close();
            }
            if (subscribe != null) {
                subscribe.close();
            }
            if (create != null) {
                create.close();
            }
        } finally {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
        }
    }

    @Test
    public void testGetSchemaByVersion() throws PulsarClientException, PulsarAdminException, ExecutionException, InterruptedException {
        PulsarClientImpl build = PulsarClient.builder().serviceUrl(this.brokerUrl.toString()).build();
        PulsarClientImpl pulsarClientImpl = this.pulsarClient;
        this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("persistent://my-property/my-ns/testGetSchemaByVersion").create();
        this.pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic("persistent://my-property/my-ns/testGetSchemaByVersion").create();
        LookupService lookup = build.getLookup();
        LookupService lookup2 = pulsarClientImpl.getLookup();
        Assert.assertTrue(lookup instanceof HttpLookupService);
        Assert.assertTrue(lookup2 instanceof BinaryProtoLookupService);
        Assert.assertEquals(this.admin.schemas().getAllSchemas("persistent://my-property/my-ns/testGetSchemaByVersion").size(), 2);
        Assert.assertTrue(((Optional) lookup.getSchema(TopicName.get("persistent://my-property/my-ns/testGetSchemaByVersion"), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
        Assert.assertTrue(((Optional) lookup.getSchema(TopicName.get("persistent://my-property/my-ns/testGetSchemaByVersion"), ByteBuffer.allocate(8).putLong(1L).array()).get()).isPresent());
        Assert.assertTrue(((Optional) lookup2.getSchema(TopicName.get("persistent://my-property/my-ns/testGetSchemaByVersion"), ByteBuffer.allocate(8).putLong(0L).array()).get()).isPresent());
        Assert.assertTrue(((Optional) lookup2.getSchema(TopicName.get("persistent://my-property/my-ns/testGetSchemaByVersion"), ByteBuffer.allocate(8).putLong(1L).array()).get()).isPresent());
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testGetNativeSchemaWithAutoConsumeWithMultiVersion() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("test").topic("persistent://my-property/my-ns/testGetSchemaWithMultiVersion").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic("persistent://my-property/my-ns/testGetSchemaWithMultiVersion").create();
            try {
                Producer create2 = this.pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic("persistent://my-property/my-ns/testGetSchemaWithMultiVersion").create();
                try {
                    Assert.assertEquals(this.admin.schemas().getAllSchemas("persistent://my-property/my-ns/testGetSchemaWithMultiVersion").size(), 2);
                    create.send(new V1Data());
                    create2.send(new V2Data());
                    Schema<?> schema = subscribe.receive().getReaderSchema().get();
                    Schema<?> schema2 = subscribe.receive().getReaderSchema().get();
                    log.info("schemaV1 {} {}", schema.getSchemaInfo(), schema.getNativeSchema());
                    log.info("schemaV2 {} {}", schema2.getSchemaInfo(), schema2.getNativeSchema());
                    Assert.assertTrue(schema.getSchemaInfo().getSchemaDefinition().contains("V1Data"));
                    Assert.assertTrue(schema2.getSchemaInfo().getSchemaDefinition().contains("V2Data"));
                    org.apache.avro.Schema schema3 = (org.apache.avro.Schema) schema.getNativeSchema().get();
                    org.apache.avro.Schema schema4 = (org.apache.avro.Schema) schema2.getNativeSchema().get();
                    Assert.assertNotEquals(schema3.toString(false), schema4.toString(false));
                    Assert.assertTrue(schema3.toString(false).contains("V1Data"));
                    Assert.assertTrue(schema4.toString(false).contains("V2Data"));
                    if (Collections.singletonList(create2).get(0) != null) {
                        create2.close();
                    }
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create2).get(0) != null) {
                        create2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test(dataProvider = "topicDomain")
    public void testAutoCreatedSchema(String str) throws Exception {
        String str2 = str + "my-property/my-ns/testAutoCreatedSchema-1";
        String str3 = str + "my-property/my-ns/testAutoCreatedSchema-2";
        this.pulsarClient.newProducer(Schema.BYTES).topic(str2).create().close();
        try {
            this.admin.schemas().getSchemaInfo(str2);
            Assert.fail("The schema of topic1 should not exist");
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), TokenId.FloatConstant);
        }
        this.pulsarClient.newProducer(Schema.STRING).topic(str2).create().close();
        Assert.assertEquals(this.admin.schemas().getSchemaInfo(str2).getType(), SchemaType.STRING);
        this.pulsarClient.newConsumer(Schema.BYTES).topic(str3).subscriptionName("sub").subscribe().close();
        try {
            this.admin.schemas().getSchemaInfo(str3);
            Assert.fail("The schema of topic2 should not exist");
        } catch (PulsarAdminException e2) {
            Assert.assertEquals(e2.getStatusCode(), TokenId.FloatConstant);
        }
        this.pulsarClient.newConsumer(Schema.STRING).topic(str3).subscriptionName("sub").subscribe().close();
        Assert.assertEquals(this.admin.schemas().getSchemaInfo(str3).getType(), SchemaType.STRING);
    }

    @DataProvider(name = "keyEncodingType")
    public static Object[] keyEncodingType() {
        return new Object[]{KeyValueEncodingType.SEPARATED, KeyValueEncodingType.INLINE};
    }

    @Test(dataProvider = "keyEncodingType")
    public void testAutoKeyValueConsumeGenericObjectNullValues(KeyValueEncodingType keyValueEncodingType) throws Exception {
        String str = "my-property/my-ns/schema-test-auto-keyvalue-" + keyValueEncodingType + "-null-value-consume-" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.KeyValue(Schema.AVRO(V1Data.class), Schema.AVRO(V1Data.class), keyValueEncodingType)).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.AUTO_CONSUME()).topic(str).subscriptionName("sub0").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                create.send(new KeyValue(new V1Data(1), new V1Data(2)));
                create.send(new KeyValue(new V1Data(1), null));
                create.send(new KeyValue(null, new V1Data(2)));
                create.send(new KeyValue(null, null));
                Message receive = subscribe.receive();
                Assert.assertEquals(keyValueEncodingType, receive.getReaderSchema().get().getKeyValueEncodingType());
                KeyValue keyValue = (KeyValue) ((GenericRecord) receive.getValue()).getNativeObject();
                Assert.assertEquals(1, ((GenericRecord) keyValue.getKey()).getField("i"));
                Assert.assertEquals(2, ((GenericRecord) keyValue.getValue()).getField("i"));
                KeyValue keyValue2 = (KeyValue) ((GenericRecord) subscribe.receive().getValue()).getNativeObject();
                Assert.assertEquals(1, ((GenericRecord) keyValue2.getKey()).getField("i"));
                Assert.assertNull(keyValue2.getValue());
                KeyValue keyValue3 = (KeyValue) ((GenericRecord) subscribe.receive().getValue()).getNativeObject();
                Assert.assertNull(keyValue3.getKey());
                Assert.assertEquals(2, ((GenericRecord) keyValue3.getValue()).getField("i"));
                KeyValue keyValue4 = (KeyValue) ((GenericRecord) subscribe.receive().getValue()).getNativeObject();
                Assert.assertNull(keyValue4.getKey());
                Assert.assertNull(keyValue4.getValue());
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
