/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.confluent.connect.avro.AvroData;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectSource
extends AbstractKafkaConnectSource<KeyValue<byte[], byte[]>> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectSource.class);
    private final Cache<org.apache.kafka.connect.data.Schema, KafkaSchemaWrappedSchema> readerCache = CacheBuilder.newBuilder().maximumSize(10000L).expireAfterAccess(30L, TimeUnit.MINUTES).build();
    private boolean jsonWithEnvelope = false;
    private static final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
    private Map<String, Predicate<SourceRecord>> predicates = new HashMap<String, Predicate<SourceRecord>>();
    private List<PredicatedTransform> transformations = new ArrayList<PredicatedTransform>();
    private static final AvroData avroData = new AvroData(1000);

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
            this.jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
            config.put("schemas.enable", this.jsonWithEnvelope);
        } else {
            config.put("schemas.enable", false);
        }
        log.info("jsonWithEnvelope: {}", (Object)this.jsonWithEnvelope);
        this.initPredicates(config);
        this.initTransforms(config);
        super.open(config, sourceContext);
    }

    private void initPredicates(Map<String, Object> config) {
        Object predicatesListObj = config.get("predicates");
        if (predicatesListObj != null) {
            String predicatesList = predicatesListObj.toString();
            for (String predicateName : predicatesList.split(",")) {
                String prefix = "predicates." + (predicateName = predicateName.trim()) + ".";
                String typeKey = prefix + "type";
                Object classNameObj = config.get(typeKey);
                if (classNameObj == null) continue;
                String className = classNameObj.toString();
                try {
                    Class<?> clazz = Class.forName(className);
                    Predicate predicate = (Predicate)clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    Map<String, Object> predicateConfig = config.entrySet().stream().filter(e -> ((String)e.getKey()).startsWith(prefix)).collect(Collectors.toMap(e -> ((String)e.getKey()).substring(prefix.length()), Map.Entry::getValue));
                    log.info("predicate config: {}", predicateConfig);
                    predicate.configure(predicateConfig);
                    this.predicates.put(predicateName, (Predicate<SourceRecord>)predicate);
                }
                catch (Exception e2) {
                    throw new RuntimeException("Failed to instantiate predicate: " + className, e2);
                }
            }
        }
    }

    private void initTransforms(Map<String, Object> config) {
        this.transformations.clear();
        Object transformsListObj = config.get("transforms");
        if (transformsListObj != null) {
            String transformsList = transformsListObj.toString();
            for (String transformName : transformsList.split(",")) {
                String prefix = "transforms." + (transformName = transformName.trim()) + ".";
                String typeKey = prefix + "type";
                Object classNameObj = config.get(typeKey);
                if (classNameObj == null) continue;
                String className = classNameObj.toString();
                try {
                    Class<?> clazz = Class.forName(className);
                    Transformation transform = (Transformation)clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    Map<String, Object> transformConfig = config.entrySet().stream().filter(e -> ((String)e.getKey()).startsWith(prefix)).collect(Collectors.toMap(e -> ((String)e.getKey()).substring(prefix.length()), Map.Entry::getValue));
                    log.info("transform config: {}", transformConfig);
                    String predicateName = (String)transformConfig.get("predicate");
                    boolean negated = Boolean.parseBoolean(String.valueOf(transformConfig.getOrDefault("negate", "false")));
                    Predicate<SourceRecord> predicate = null;
                    if (predicateName != null && (predicate = this.predicates.get(predicateName)) == null) {
                        log.warn("Transform {} references non-existent predicate: {}", (Object)transformName, (Object)predicateName);
                    }
                    transform.configure(transformConfig);
                    this.transformations.add(new PredicatedTransform(predicate, (Transformation<SourceRecord>)transform, negated));
                }
                catch (Exception e2) {
                    throw new RuntimeException("Failed to instantiate SMT: " + className, e2);
                }
            }
        }
    }

    public synchronized KafkaSourceRecord processSourceRecord(SourceRecord srcRecord) {
        SourceRecord transformedRecord = this.applyTransforms(srcRecord);
        this.offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
        if (transformedRecord == null) {
            return null;
        }
        KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord);
        return record;
    }

    public SourceRecord applyTransforms(SourceRecord record) {
        SourceRecord current = record;
        for (PredicatedTransform pt : this.transformations) {
            if (current == null) break;
            if (pt.predicate != null && pt.negated == pt.predicate.test((ConnectRecord)current)) continue;
            current = (SourceRecord)pt.transform.apply((ConnectRecord)current);
        }
        return current;
    }

    private record PredicatedTransform(Predicate<SourceRecord> predicate, Transformation<SourceRecord> transform, boolean negated) {
    }

    public class KafkaSourceRecord
    extends AbstractKafkaConnectSource.AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
    implements KVRecord<byte[], byte[]> {
        final int keySize;
        final int valueSize;
        final SourceRecord srcRecord;

        KafkaSourceRecord(SourceRecord srcRecord) {
            super(srcRecord);
            this.srcRecord = srcRecord;
            byte[] keyBytes = KafkaConnectSource.this.keyConverter.fromConnectData(srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
            this.keySize = keyBytes != null ? keyBytes.length : 0;
            this.key = keyBytes != null ? Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
            byte[] valueBytes = KafkaConnectSource.this.valueConverter.fromConnectData(srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
            this.valueSize = valueBytes != null ? valueBytes.length : 0;
            this.value = new KeyValue((Object)keyBytes, (Object)valueBytes);
            this.topicName = Optional.of(srcRecord.topic());
            if (srcRecord.keySchema() != null) {
                this.keySchema = (KafkaSchemaWrappedSchema)KafkaConnectSource.this.readerCache.getIfPresent((Object)srcRecord.keySchema());
            }
            if (srcRecord.valueSchema() != null) {
                this.valueSchema = (KafkaSchemaWrappedSchema)KafkaConnectSource.this.readerCache.getIfPresent((Object)srcRecord.valueSchema());
            }
            if (srcRecord.keySchema() != null && this.keySchema == null) {
                this.keySchema = new KafkaSchemaWrappedSchema(avroData.fromConnectSchema(srcRecord.keySchema()), KafkaConnectSource.this.keyConverter);
                KafkaConnectSource.this.readerCache.put((Object)srcRecord.keySchema(), (Object)this.keySchema);
            }
            if (srcRecord.valueSchema() != null && this.valueSchema == null) {
                this.valueSchema = new KafkaSchemaWrappedSchema(avroData.fromConnectSchema(srcRecord.valueSchema()), KafkaConnectSource.this.valueConverter);
                KafkaConnectSource.this.readerCache.put((Object)srcRecord.valueSchema(), (Object)this.valueSchema);
            }
            this.eventTime = Optional.ofNullable(srcRecord.timestamp());
            this.partitionId = Optional.of(srcRecord.sourcePartition().entrySet().stream().map(e -> (String)e.getKey() + "=" + String.valueOf(e.getValue())).collect(Collectors.joining(",")));
            this.partitionIndex = Optional.ofNullable(srcRecord.kafkaPartition());
        }

        @Override
        public boolean isEmpty() {
            return ((KeyValue)this.value).getValue() == null;
        }

        public Schema<byte[]> getKeySchema() {
            if (KafkaConnectSource.this.jsonWithEnvelope || this.keySchema == null) {
                return Schema.BYTES;
            }
            return this.keySchema;
        }

        public Schema<byte[]> getValueSchema() {
            if (KafkaConnectSource.this.jsonWithEnvelope || this.valueSchema == null) {
                return Schema.BYTES;
            }
            return this.valueSchema;
        }

        public KeyValueEncodingType getKeyValueEncodingType() {
            if (KafkaConnectSource.this.jsonWithEnvelope) {
                return KeyValueEncodingType.INLINE;
            }
            return KeyValueEncodingType.SEPARATED;
        }

        @Override
        public void ack() {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("commitRecord() for record: {}", (Object)this.srcRecord);
                }
                KafkaConnectSource.this.getSourceTask().commitRecord(this.srcRecord, new RecordMetadata(new TopicPartition(this.srcRecord.topic() == null ? this.topicName.orElse("UNDEFINED") : this.srcRecord.topic(), this.srcRecord.kafkaPartition() == null ? 0 : this.srcRecord.kafkaPartition()), -1L, 0, null == this.srcRecord.timestamp() ? -1L : this.srcRecord.timestamp(), this.keySize, this.valueSize));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("Source task failed to commit record, source task should resend data, will get duplicate", (Throwable)e);
                return;
            }
            super.ack();
        }
    }
}

