package com.datastax.oss.pulsar.functions.transforms;

import com.azure.ai.openai.OpenAIClient;
import com.datastax.oss.streaming.ai.JsonNodeSchema;
import com.datastax.oss.streaming.ai.TransformContext;
import com.datastax.oss.streaming.ai.TransformStep;
import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource;
import com.datastax.oss.streaming.ai.jstl.predicate.StepPredicatePair;
import com.datastax.oss.streaming.ai.model.TransformSchemaType;
import com.datastax.oss.streaming.ai.model.config.DataSourceConfig;
import com.datastax.oss.streaming.ai.model.config.OpenAIConfig;
import com.datastax.oss.streaming.ai.model.config.TransformStepConfig;
import com.datastax.oss.streaming.ai.util.TransformFunctionUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SchemaValidatorsConfig;
import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;
import jakarta.el.ELResolver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.FunctionRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/functions/transforms/TransformFunction.class */
public class TransformFunction implements Function<GenericObject, Record<GenericObject>>, TransformStep {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransformFunction.class);
    private List<StepPredicatePair> steps;
    private TransformStepConfig transformConfig;
    private QueryStepDataSource dataSource;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.datastax.oss.pulsar.functions.transforms.TransformFunction$1, reason: invalid class name */
    /* loaded from: input_file:com/datastax/oss/pulsar/functions/transforms/TransformFunction$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$schema$SchemaType;

        static {
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.INT8.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.INT16.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.INT32.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.INT64.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.FLOAT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.DOUBLE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.BOOLEAN.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.STRING.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.BYTES.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.DATE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.TIME.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.TIMESTAMP.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.INSTANT.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.LOCAL_DATE.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.LOCAL_TIME.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.LOCAL_DATE_TIME.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.AVRO.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$com$datastax$oss$streaming$ai$model$TransformSchemaType[TransformSchemaType.JSON.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            $SwitchMap$org$apache$pulsar$common$schema$SchemaType = new int[SchemaType.values().length];
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT8.ordinal()] = 1;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT16.ordinal()] = 2;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT32.ordinal()] = 3;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT64.ordinal()] = 4;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.FLOAT.ordinal()] = 5;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.DOUBLE.ordinal()] = 6;
            } catch (NoSuchFieldError e24) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.BOOLEAN.ordinal()] = 7;
            } catch (NoSuchFieldError e25) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.STRING.ordinal()] = 8;
            } catch (NoSuchFieldError e26) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.BYTES.ordinal()] = 9;
            } catch (NoSuchFieldError e27) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.DATE.ordinal()] = 10;
            } catch (NoSuchFieldError e28) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.TIME.ordinal()] = 11;
            } catch (NoSuchFieldError e29) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.TIMESTAMP.ordinal()] = 12;
            } catch (NoSuchFieldError e30) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INSTANT.ordinal()] = 13;
            } catch (NoSuchFieldError e31) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.LOCAL_DATE.ordinal()] = 14;
            } catch (NoSuchFieldError e32) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.LOCAL_TIME.ordinal()] = 15;
            } catch (NoSuchFieldError e33) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.LOCAL_DATE_TIME.ordinal()] = 16;
            } catch (NoSuchFieldError e34) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.JSON.ordinal()] = 17;
            } catch (NoSuchFieldError e35) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.AVRO.ordinal()] = 18;
            } catch (NoSuchFieldError e36) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.PROTOBUF.ordinal()] = 19;
            } catch (NoSuchFieldError e37) {
            }
        }
    }

    public void initialize(Context context) {
        Map userConfigMap = context.getUserConfigMap();
        ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
        JsonNode jsonNode = (JsonNode) objectMapper.convertValue(userConfigMap, JsonNode.class);
        JsonSchemaFactory build = JsonSchemaFactory.builder(JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4)).objectMapper(objectMapper).addUrnFactory(str -> {
            try {
                return Thread.currentThread().getContextClassLoader().getResource(str).toURI();
            } catch (Exception e) {
                return null;
            }
        }).build();
        SchemaValidatorsConfig schemaValidatorsConfig = new SchemaValidatorsConfig();
        schemaValidatorsConfig.setLosslessNarrowing(true);
        Set<ValidationMessage> validate = build.getSchema(Thread.currentThread().getContextClassLoader().getResourceAsStream("config-schema.yaml"), schemaValidatorsConfig).validate(jsonNode);
        if (validate.size() != 0) {
            if (!jsonNode.hasNonNull("steps")) {
                throw new IllegalArgumentException("Missing config 'steps' field");
            }
            JsonNode jsonNode2 = jsonNode.get("steps");
            if (!jsonNode2.isArray()) {
                throw new IllegalArgumentException("Config 'steps' field must be an array");
            }
            String str2 = null;
            try {
                Iterator<JsonNode> it = jsonNode2.iterator();
                while (it.hasNext()) {
                    JsonNode next = it.next();
                    String asText = next.get(ELResolver.TYPE).asText();
                    str2 = (String) build.getSchema(String.format("{\"$ref\": \"config-schema.yaml#/components/schemas/%s\"}", kebabToPascal(asText))).validate(next).stream().findFirst().map(validationMessage -> {
                        return String.format("Invalid '%s' step config: %s", asText, validationMessage);
                    }).orElse(null);
                    if (str2 != null) {
                        break;
                    }
                }
            } catch (Exception e) {
                log.debug("Exception during steps validation, ignoring", (Throwable) e);
            }
            if (str2 != null) {
                throw new IllegalArgumentException(str2);
            }
            validate.stream().findFirst().ifPresent(validationMessage2 -> {
                throw new IllegalArgumentException("Configuration validation failed: " + validationMessage2);
            });
        }
        this.transformConfig = (TransformStepConfig) objectMapper.convertValue(userConfigMap, TransformStepConfig.class);
        OpenAIClient buildOpenAIClient = buildOpenAIClient(this.transformConfig.getOpenai());
        this.dataSource = buildDataSource(this.transformConfig.getDatasource());
        this.steps = TransformFunctionUtil.getTransformSteps(this.transformConfig, buildOpenAIClient, this.dataSource);
    }

    @Override // com.datastax.oss.streaming.ai.TransformStep, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.dataSource != null) {
            this.dataSource.close();
        }
        Iterator<StepPredicatePair> it = this.steps.iterator();
        while (it.hasNext()) {
            it.next().getTransformStep().close();
        }
    }

    public Record<GenericObject> process(GenericObject genericObject, Context context) throws Exception {
        Object nativeObject = genericObject.getNativeObject();
        if (log.isDebugEnabled()) {
            Record currentRecord = context.getCurrentRecord();
            log.debug("apply to {} {}", genericObject, nativeObject);
            log.debug("record with schema {} version {} {}", currentRecord.getSchema(), ((Message) currentRecord.getMessage().orElseThrow()).getSchemaVersion(), currentRecord);
        }
        TransformContext newTransformContext = newTransformContext(context, nativeObject, this.transformConfig.isAttemptJsonConversion());
        process(newTransformContext);
        return send(context, newTransformContext);
    }

    public static TransformContext newTransformContext(Context context, Object obj, boolean z) {
        Record currentRecord = context.getCurrentRecord();
        TransformContext transformContext = new TransformContext();
        transformContext.setInputTopic((String) currentRecord.getTopicName().orElse(null));
        transformContext.setOutputTopic((String) currentRecord.getDestinationTopic().orElse(null));
        transformContext.setKey((String) currentRecord.getKey().orElse(null));
        transformContext.setEventTime((Long) currentRecord.getEventTime().orElse(null));
        if (currentRecord.getProperties() != null) {
            transformContext.setProperties(new HashMap(currentRecord.getProperties()));
        }
        KeyValueSchema schema = currentRecord.getSchema();
        if ((schema instanceof KeyValueSchema) && (obj instanceof KeyValue)) {
            KeyValueSchema keyValueSchema = schema;
            KeyValue keyValue = (KeyValue) obj;
            Schema keySchema = keyValueSchema.getKeySchema();
            Schema valueSchema = keyValueSchema.getValueSchema();
            transformContext.setKeySchemaType(pulsarSchemaToTransformSchemaType(keySchema));
            transformContext.setKeyNativeSchema(getNativeSchema(keySchema));
            transformContext.setKeyObject(keySchema.getSchemaInfo().getType().isStruct() ? ((GenericObject) keyValue.getKey()).getNativeObject() : keyValue.getKey());
            transformContext.setValueSchemaType(pulsarSchemaToTransformSchemaType(valueSchema));
            transformContext.setValueNativeSchema(getNativeSchema(valueSchema));
            transformContext.setValueObject(valueSchema.getSchemaInfo().getType().isStruct() ? ((GenericObject) keyValue.getValue()).getNativeObject() : keyValue.getValue());
            transformContext.getCustomContext().put("keyValueEncodingType", keyValueSchema.getKeyValueEncodingType());
        } else {
            transformContext.setValueSchemaType(pulsarSchemaToTransformSchemaType(schema));
            transformContext.setValueNativeSchema(getNativeSchema(schema));
            transformContext.setValueObject(obj);
        }
        if (z) {
            transformContext.setKeyObject(TransformFunctionUtil.attemptJsonConversion(transformContext.getKeyObject()));
            transformContext.setValueObject(TransformFunctionUtil.attemptJsonConversion(transformContext.getValueObject()));
        }
        return transformContext;
    }

    private static Object getNativeSchema(Schema<?> schema) {
        if (schema == null) {
            return null;
        }
        return schema.getNativeSchema().orElse(null);
    }

    private static TransformSchemaType pulsarSchemaToTransformSchemaType(Schema<?> schema) {
        if (schema == null) {
            return null;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$common$schema$SchemaType[schema.getSchemaInfo().getType().ordinal()]) {
            case 1:
                return TransformSchemaType.INT8;
            case 2:
                return TransformSchemaType.INT16;
            case 3:
                return TransformSchemaType.INT32;
            case 4:
                return TransformSchemaType.INT64;
            case 5:
                return TransformSchemaType.FLOAT;
            case 6:
                return TransformSchemaType.DOUBLE;
            case 7:
                return TransformSchemaType.BOOLEAN;
            case 8:
                return TransformSchemaType.STRING;
            case 9:
                return TransformSchemaType.BYTES;
            case 10:
                return TransformSchemaType.DATE;
            case 11:
                return TransformSchemaType.TIME;
            case 12:
                return TransformSchemaType.TIMESTAMP;
            case 13:
                return TransformSchemaType.INSTANT;
            case 14:
                return TransformSchemaType.LOCAL_DATE;
            case 15:
                return TransformSchemaType.LOCAL_TIME;
            case 16:
                return TransformSchemaType.LOCAL_DATE_TIME;
            case 17:
                return TransformSchemaType.JSON;
            case 18:
                return TransformSchemaType.AVRO;
            case 19:
                return TransformSchemaType.PROTOBUF;
            default:
                throw new IllegalArgumentException("Unsupported schema type " + schema.getSchemaInfo().getType());
        }
    }

    @Override // com.datastax.oss.streaming.ai.TransformStep
    public void process(TransformContext transformContext) throws Exception {
        TransformFunctionUtil.processTransformSteps(transformContext, this.steps);
    }

    public static Record<GenericObject> send(Context context, TransformContext transformContext) throws IOException {
        Schema<?> buildSchema;
        Object valueObject;
        if (transformContext.isDropCurrentRecord()) {
            return null;
        }
        transformContext.convertAvroToBytes();
        transformContext.convertMapToStringOrBytes();
        if (transformContext.getKeySchemaType() != null) {
            KeyValueEncodingType keyValueEncodingType = (KeyValueEncodingType) transformContext.getCustomContext().get("keyValueEncodingType");
            buildSchema = Schema.KeyValue(buildSchema(transformContext.getKeySchemaType(), transformContext.getKeyNativeSchema()), buildSchema(transformContext.getValueSchemaType(), transformContext.getValueNativeSchema()), keyValueEncodingType != null ? keyValueEncodingType : KeyValueEncodingType.INLINE);
            valueObject = new KeyValue(transformContext.getKeyObject(), transformContext.getValueObject());
        } else {
            buildSchema = buildSchema(transformContext.getValueSchemaType(), transformContext.getValueNativeSchema());
            valueObject = transformContext.getValueObject();
        }
        if (log.isDebugEnabled()) {
            log.debug("output {} schema {}", valueObject, buildSchema);
        }
        FunctionRecord.FunctionRecordBuilder properties = context.newOutputRecordBuilder(buildSchema).destinationTopic(transformContext.getOutputTopic()).value(valueObject).properties(transformContext.getProperties());
        if (transformContext.getKeySchemaType() == null && transformContext.getKey() != null) {
            properties.key(transformContext.getKey());
        }
        return properties.build();
    }

    private static Schema<?> buildSchema(TransformSchemaType transformSchemaType, Object obj) {
        if (transformSchemaType == null) {
            throw new IllegalArgumentException("Schema type should not be null.");
        }
        switch (transformSchemaType) {
            case INT8:
                return Schema.INT8;
            case INT16:
                return Schema.INT16;
            case INT32:
                return Schema.INT32;
            case INT64:
                return Schema.INT64;
            case FLOAT:
                return Schema.FLOAT;
            case DOUBLE:
                return Schema.DOUBLE;
            case BOOLEAN:
                return Schema.BOOL;
            case STRING:
                return Schema.STRING;
            case BYTES:
                return Schema.BYTES;
            case DATE:
                return Schema.DATE;
            case TIME:
                return Schema.TIME;
            case TIMESTAMP:
                return Schema.TIMESTAMP;
            case INSTANT:
                return Schema.INSTANT;
            case LOCAL_DATE:
                return Schema.LOCAL_DATE;
            case LOCAL_TIME:
                return Schema.LOCAL_TIME;
            case LOCAL_DATE_TIME:
                return Schema.LOCAL_DATE_TIME;
            case AVRO:
                return Schema.NATIVE_AVRO(obj);
            case JSON:
                return new JsonNodeSchema((org.apache.avro.Schema) obj);
            default:
                throw new IllegalArgumentException("Unsupported schema type " + transformSchemaType);
        }
    }

    private static String kebabToPascal(String str) {
        return Pattern.compile("(?:^|-)(.)").matcher(str).replaceAll(matchResult -> {
            return matchResult.group(1).toUpperCase();
        });
    }

    protected OpenAIClient buildOpenAIClient(OpenAIConfig openAIConfig) {
        return TransformFunctionUtil.buildOpenAIClient(openAIConfig);
    }

    protected QueryStepDataSource buildDataSource(DataSourceConfig dataSourceConfig) {
        return TransformFunctionUtil.buildDataSource(dataSourceConfig);
    }
}
