package org.apache.pulsar.kafka.shade.io.confluent.kafka.formatter;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.common.KafkaException;
import kafka.common.MessageReader;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.ParsedSchema;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.SchemaProvider;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

/* loaded from: input_file:META-INF/bundled-dependencies/kafka-connect-avro-converter-shaded-2.10.2.5.jar:org/apache/pulsar/kafka/shade/io/confluent/kafka/formatter/SchemaMessageReader.class */
public abstract class SchemaMessageReader<T> implements MessageReader {
    public static final String VALUE_SCHEMA = "value.schema";
    public static final String KEY_SCHEMA = "key.schema";
    private String topic;
    private BufferedReader reader;
    private Boolean parseKey;
    private String keySeparator;
    private boolean ignoreError;
    private ParsedSchema keySchema;
    private ParsedSchema valueSchema;
    private String keySubject;
    private String valueSubject;
    private SchemaMessageSerializer<T> serializer;

    public SchemaMessageReader() {
        this.topic = null;
        this.reader = null;
        this.parseKey = false;
        this.keySeparator = "\t";
        this.ignoreError = false;
        this.keySchema = null;
        this.valueSchema = null;
        this.keySubject = null;
        this.valueSubject = null;
    }

    public SchemaMessageReader(SchemaRegistryClient schemaRegistryClient, ParsedSchema parsedSchema, ParsedSchema parsedSchema2, String str, boolean z, BufferedReader bufferedReader, boolean z2, boolean z3, boolean z4) {
        this.topic = null;
        this.reader = null;
        this.parseKey = false;
        this.keySeparator = "\t";
        this.ignoreError = false;
        this.keySchema = null;
        this.valueSchema = null;
        this.keySubject = null;
        this.valueSubject = null;
        this.keySchema = parsedSchema;
        this.valueSchema = parsedSchema2;
        this.topic = str;
        this.keySubject = str + "-key";
        this.valueSubject = str + "-value";
        this.parseKey = Boolean.valueOf(z);
        this.reader = bufferedReader;
        this.serializer = createSerializer(schemaRegistryClient, z2, z3, z4, null);
    }

    protected abstract SchemaMessageSerializer<T> createSerializer(SchemaRegistryClient schemaRegistryClient, boolean z, boolean z2, boolean z3, Serializer serializer);

    public void init(InputStream inputStream, Properties properties) {
        this.topic = properties.getProperty("topic");
        if (this.topic == null) {
            throw new ConfigException("Missing topic!");
        }
        if (properties.containsKey("parse.key")) {
            this.parseKey = Boolean.valueOf(properties.getProperty("parse.key").trim().toLowerCase().equals("true"));
        }
        if (properties.containsKey("key.separator")) {
            this.keySeparator = properties.getProperty("key.separator");
        }
        if (properties.containsKey("ignore.error")) {
            this.ignoreError = properties.getProperty("ignore.error").trim().toLowerCase().equals("true");
        }
        this.reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
        String property = properties.getProperty("schema.registry.url");
        if (property == null) {
            throw new ConfigException("Missing schema registry url!");
        }
        SchemaRegistryClient schemaRegistryClient = getSchemaRegistryClient(properties, property);
        Serializer keySerializer = getKeySerializer(properties);
        boolean parseBoolean = properties.containsKey(AbstractKafkaSchemaSerDeConfig.NORMALIZE_SCHEMAS) ? Boolean.parseBoolean(properties.getProperty(AbstractKafkaSchemaSerDeConfig.NORMALIZE_SCHEMAS).trim()) : false;
        boolean parseBoolean2 = properties.containsKey("auto.register") ? Boolean.parseBoolean(properties.getProperty("auto.register").trim()) : properties.containsKey("auto.register.schemas") ? Boolean.parseBoolean(properties.getProperty("auto.register.schemas").trim()) : true;
        boolean parseBoolean3 = properties.containsKey(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION) ? Boolean.parseBoolean(properties.getProperty(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION).trim()) : false;
        if (this.serializer == null) {
            this.serializer = createSerializer(schemaRegistryClient, parseBoolean, parseBoolean2, parseBoolean3, keySerializer);
        }
        AbstractKafkaSchemaSerDeConfig abstractKafkaSchemaSerDeConfig = new AbstractKafkaSchemaSerDeConfig(AbstractKafkaSchemaSerDeConfig.baseConfigDef(), properties, false);
        this.valueSchema = getSchema(schemaRegistryClient, properties, false);
        this.valueSubject = getSubjectName(abstractKafkaSchemaSerDeConfig.valueSubjectNameStrategy(), this.topic, false, this.valueSchema);
        if (needsKeySchema()) {
            this.keySchema = getSchema(schemaRegistryClient, properties, true);
            this.keySubject = getSubjectName(abstractKafkaSchemaSerDeConfig.keySubjectNameStrategy(), this.topic, true, this.keySchema);
        }
    }

    private SchemaRegistryClient getSchemaRegistryClient(Properties properties, String str) {
        Map<String, Object> propertiesMap = getPropertiesMap(properties);
        List singletonList = Collections.singletonList(str);
        List singletonList2 = Collections.singletonList(getProvider());
        String validateAndMaybeGetMockScope = MockSchemaRegistry.validateAndMaybeGetMockScope(singletonList);
        return validateAndMaybeGetMockScope == null ? new CachedSchemaRegistryClient(str, 1000, (List<SchemaProvider>) singletonList2, (Map<String, ?>) propertiesMap) : MockSchemaRegistry.getClientForScope(validateAndMaybeGetMockScope, singletonList2);
    }

    private String getSubjectName(Object obj, String str, boolean z, ParsedSchema parsedSchema) {
        if (obj instanceof SubjectNameStrategy) {
            return ((SubjectNameStrategy) obj).subjectName(str, z, parsedSchema);
        }
        throw new RuntimeException("Classes extending deprecated " + org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.subject.SubjectNameStrategy.class.getCanonicalName() + " are not supported. Use classes extending " + SubjectNameStrategy.class.getCanonicalName() + " instead.");
    }

    protected abstract SchemaProvider getProvider();

    protected ParsedSchema parseSchema(SchemaRegistryClient schemaRegistryClient, String str, List<SchemaReference> list) {
        SchemaProvider provider = getProvider();
        provider.configure(Collections.singletonMap(SchemaProvider.SCHEMA_VERSION_FETCHER_CONFIG, schemaRegistryClient));
        return provider.parseSchema(str, list).get();
    }

    private ParsedSchema getSchema(SchemaRegistryClient schemaRegistryClient, Properties properties, boolean z) {
        ParsedSchema schemaById = getSchemaById(schemaRegistryClient, properties, z);
        return schemaById != null ? schemaById : parseSchema(schemaRegistryClient, getSchemaString(properties, z), getSchemaReferences(properties, z));
    }

    private ParsedSchema getSchemaById(SchemaRegistryClient schemaRegistryClient, Properties properties, boolean z) {
        String str = z ? "key.schema.id" : "value.schema.id";
        int i = 0;
        try {
            if (!properties.containsKey(str)) {
                return null;
            }
            i = Integer.parseInt(properties.getProperty(str));
            return schemaRegistryClient.getSchemaById(i);
        } catch (IOException | RestClientException e) {
            throw new SerializationException(String.format("Error retrieving schema for id %d", Integer.valueOf(i)), e);
        } catch (NumberFormatException e2) {
            throw new SerializationException(String.format("Error parsing %s as int", str), e2);
        }
    }

    private String getSchemaString(Properties properties, boolean z) {
        String str = z ? KEY_SCHEMA : VALUE_SCHEMA;
        String str2 = z ? "key.schema.file" : "value.schema.file";
        if (properties.containsKey(str)) {
            return properties.getProperty(str);
        }
        if (!properties.containsKey(str2)) {
            throw new ConfigException("Must provide the " + (z ? AvroData.KEY_FIELD : AvroData.VALUE_FIELD) + " schema in either " + str + ", " + str + ".id, or " + str2);
        }
        try {
            return new String(Files.readAllBytes(Paths.get(properties.getProperty(str2), new String[0])), StandardCharsets.UTF_8);
        } catch (IOException e) {
            throw new ConfigException("Error reading schema from " + properties.getProperty(str2));
        }
    }

    private List<SchemaReference> getSchemaReferences(Properties properties, boolean z) {
        String str = z ? "key.refs" : "value.refs";
        if (!properties.containsKey(str)) {
            return Collections.emptyList();
        }
        try {
            return (List) JacksonMapper.INSTANCE.readValue(properties.getProperty(str), new TypeReference<List<SchemaReference>>() { // from class: org.apache.pulsar.kafka.shade.io.confluent.kafka.formatter.SchemaMessageReader.1
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Serializer getKeySerializer(Properties properties) throws ConfigException {
        if (!properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            return null;
        }
        try {
            return (Serializer) Class.forName((String) properties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).newInstance();
        } catch (Exception e) {
            throw new ConfigException("Error initializing Key serializer: " + e.getMessage());
        }
    }

    private boolean needsKeySchema() {
        return this.parseKey.booleanValue() && this.serializer.getKeySerializer() == null;
    }

    private Map<String, Object> getPropertiesMap(Properties properties) {
        HashMap hashMap = new HashMap();
        for (String str : properties.stringPropertyNames()) {
            hashMap.put(str, properties.getProperty(str));
        }
        return hashMap;
    }

    public ProducerRecord<byte[], byte[]> readMessage() {
        try {
            String readLine = this.reader.readLine();
            if (readLine == null) {
                return null;
            }
            if (!this.parseKey.booleanValue()) {
                return new ProducerRecord<>(this.topic, this.serializer.serialize(this.valueSubject, this.topic, false, readFrom(readLine, this.valueSchema), this.valueSchema));
            }
            int indexOf = readLine.indexOf(this.keySeparator);
            if (indexOf < 0) {
                if (this.ignoreError) {
                    return new ProducerRecord<>(this.topic, this.serializer.serialize(this.valueSubject, this.topic, false, readFrom(readLine, this.valueSchema), this.valueSchema));
                }
                throw new KafkaException("No key found in line " + readLine);
            }
            String substring = readLine.substring(0, indexOf);
            return new ProducerRecord<>(this.topic, this.serializer.getKeySerializer() != null ? serializeNonSchemaKey(substring) : this.serializer.serialize(this.keySubject, this.topic, true, readFrom(substring, this.keySchema), this.keySchema), this.serializer.serialize(this.valueSubject, this.topic, false, readFrom(indexOf + this.keySeparator.length() > readLine.length() ? "" : readLine.substring(indexOf + this.keySeparator.length()), this.valueSchema), this.valueSchema));
        } catch (IOException e) {
            throw new KafkaException("Error reading from input", e);
        }
    }

    private byte[] serializeNonSchemaKey(String str) {
        Class<?> cls = this.serializer.getKeySerializer().getClass();
        if (cls == LongSerializer.class) {
            return this.serializer.serializeKey(this.topic, Long.valueOf(Long.parseLong(str)));
        }
        if (cls == IntegerSerializer.class) {
            return this.serializer.serializeKey(this.topic, Integer.valueOf(Integer.parseInt(str)));
        }
        if (cls != ShortSerializer.class) {
            return this.serializer.serializeKey(this.topic, str);
        }
        return this.serializer.serializeKey(this.topic, Short.valueOf(Short.parseShort(str)));
    }

    protected abstract T readFrom(String str, ParsedSchema parsedSchema);

    public void close() {
    }
}
