/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import lombok.Generated;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jdbc.JdbcAbstractSink;
import org.apache.pulsar.io.jdbc.JdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseJdbcAutoSchemaSink
extends JdbcAbstractSink<GenericObject> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BaseJdbcAutoSchemaSink.class);

    @Override
    public String generateUpsertQueryStatement() {
        throw new IllegalStateException("UPSERT not supported");
    }

    @Override
    public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
        throw new IllegalStateException("UPSERT not supported");
    }

    @Override
    public void bindValue(PreparedStatement statement, JdbcAbstractSink.Mutation mutation) throws Exception {
        ArrayList<JdbcUtils.ColumnId> columns = new ArrayList<JdbcUtils.ColumnId>();
        switch (mutation.getType()) {
            case INSERT: {
                columns.addAll(this.tableDefinition.getColumns());
                break;
            }
            case UPSERT: {
                columns.addAll(this.getColumnsForUpsert());
                break;
            }
            case UPDATE: {
                columns.addAll(this.tableDefinition.getNonKeyColumns());
                columns.addAll(this.tableDefinition.getKeyColumns());
                break;
            }
            case DELETE: {
                columns.addAll(this.tableDefinition.getKeyColumns());
            }
        }
        int index = 1;
        for (JdbcUtils.ColumnId columnId : columns) {
            String colName = columnId.getName();
            int colType = columnId.getType();
            if (log.isDebugEnabled()) {
                log.debug("getting value for column: {} type: {}", (Object)colName, (Object)colType);
            }
            try {
                Object obj = mutation.getValues().apply(colName);
                if (obj != null) {
                    BaseJdbcAutoSchemaSink.setColumnValue(statement, index++, obj);
                    continue;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Column {} is null", (Object)colName);
                }
                BaseJdbcAutoSchemaSink.setColumnNull(statement, index++, colType);
            }
            catch (NullPointerException e) {
                if (log.isDebugEnabled()) {
                    log.debug("Column {} is null", (Object)colName);
                }
                BaseJdbcAutoSchemaSink.setColumnNull(statement, index++, colType);
            }
        }
    }

    @Override
    public JdbcAbstractSink.Mutation createMutation(Record<GenericObject> message) {
        Function<String, Object> recordValueGetter;
        GenericObject record = (GenericObject)message.getValue();
        JdbcAbstractSink.MutationType mutationType = null;
        if (message.getSchema() != null && message.getSchema() instanceof KeyValueSchema) {
            KeyValueSchema keyValueSchema = (KeyValueSchema)message.getSchema();
            org.apache.pulsar.client.api.Schema keySchema = keyValueSchema.getKeySchema();
            org.apache.pulsar.client.api.Schema valueSchema = keyValueSchema.getValueSchema();
            KeyValue keyValue = (KeyValue)record.getNativeObject();
            GenericObject key2 = (GenericObject)keyValue.getKey();
            GenericObject value = (GenericObject)keyValue.getValue();
            boolean isDelete = false;
            if (value == null) {
                switch (this.jdbcSinkConfig.getNullValueAction()) {
                    case DELETE: {
                        isDelete = true;
                        break;
                    }
                    case FAIL: {
                        throw new IllegalArgumentException("Got record with value NULL with nullValueAction=FAIL");
                    }
                }
            }
            HashMap<String, Object> data = new HashMap<String, Object>();
            BaseJdbcAutoSchemaSink.fillKeyValueSchemaData((org.apache.pulsar.client.api.Schema<GenericObject>)keySchema, key2, data);
            if (isDelete) {
                mutationType = JdbcAbstractSink.MutationType.DELETE;
            } else {
                BaseJdbcAutoSchemaSink.fillKeyValueSchemaData((org.apache.pulsar.client.api.Schema<GenericObject>)valueSchema, value, data);
            }
            recordValueGetter = k -> data.get(k);
        } else {
            SchemaType schemaType = message.getSchema().getSchemaInfo().getType();
            if (schemaType.isPrimitive()) {
                throw new UnsupportedOperationException("Primitive schema is not supported: " + String.valueOf(schemaType));
            }
            recordValueGetter = key -> ((org.apache.pulsar.client.api.schema.GenericRecord)record).getField(key);
        }
        String action = (String)message.getProperties().get("ACTION");
        if (action != null) {
            mutationType = JdbcAbstractSink.MutationType.valueOf(action);
        } else if (mutationType == null) {
            switch (this.jdbcSinkConfig.getInsertMode()) {
                case INSERT: {
                    mutationType = JdbcAbstractSink.MutationType.INSERT;
                    break;
                }
                case UPSERT: {
                    mutationType = JdbcAbstractSink.MutationType.UPSERT;
                    break;
                }
                case UPDATE: {
                    mutationType = JdbcAbstractSink.MutationType.UPDATE;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown insert mode: " + String.valueOf((Object)this.jdbcSinkConfig.getInsertMode()));
                }
            }
        }
        return new JdbcAbstractSink.Mutation(mutationType, recordValueGetter);
    }

    private static void setColumnNull(PreparedStatement statement, int index, int type) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Setting column value to null, statement: {}, index: {}", (Object)statement.toString(), (Object)index);
        }
        statement.setNull(index, type);
    }

    private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
        log.debug("Setting column value, statement: {}, index: {}, value: {}", new Object[]{statement, index, value});
        if (value instanceof Integer) {
            statement.setInt(index, (Integer)value);
        } else if (value instanceof Long) {
            statement.setLong(index, (Long)value);
        } else if (value instanceof Double) {
            statement.setDouble(index, (Double)value);
        } else if (value instanceof Float) {
            statement.setFloat(index, ((Float)value).floatValue());
        } else if (value instanceof Boolean) {
            statement.setBoolean(index, (Boolean)value);
        } else if (value instanceof String) {
            statement.setString(index, (String)value);
        } else if (value instanceof Short) {
            statement.setShort(index, (Short)value);
        } else if (value instanceof ByteString) {
            statement.setBytes(index, ((ByteString)value).toByteArray());
        } else {
            throw new Exception("Not supported value type, need to add it. " + String.valueOf(value.getClass()));
        }
    }

    private static Object getValueFromJsonNode(JsonNode fn) {
        if (fn == null || fn.isNull()) {
            return null;
        }
        if (fn.isContainerNode()) {
            throw new IllegalArgumentException("Container nodes are not supported, the JSON must contains only first level fields.");
        }
        if (fn.isBoolean()) {
            return fn.asBoolean();
        }
        if (fn.isFloatingPointNumber()) {
            return fn.asDouble();
        }
        if (fn.isBigInteger()) {
            if (fn.canConvertToLong()) {
                return fn.asLong();
            }
            return fn.asText();
        }
        if (fn.isNumber()) {
            return fn.numberValue();
        }
        return fn.asText();
    }

    private static void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema, GenericObject record, Map<String, Object> data) {
        if (record == null) {
            return;
        }
        switch (schema.getSchemaInfo().getType()) {
            case JSON: {
                JsonNode jsonNode = (JsonNode)record.getNativeObject();
                Iterator fieldNames = jsonNode.fieldNames();
                while (fieldNames.hasNext()) {
                    String fieldName = (String)fieldNames.next();
                    JsonNode nodeValue = jsonNode.get(fieldName);
                    data.put(fieldName, BaseJdbcAutoSchemaSink.getValueFromJsonNode(nodeValue));
                }
                break;
            }
            case AVRO: {
                GenericRecord avroNode = (GenericRecord)record.getNativeObject();
                for (Schema.Field field : avroNode.getSchema().getFields()) {
                    String fieldName = field.name();
                    data.put(fieldName, BaseJdbcAutoSchemaSink.convertAvroField(avroNode.get(fieldName), field.schema()));
                }
                break;
            }
            default: {
                throw new IllegalArgumentException("unexpected schema type: " + String.valueOf(schema.getSchemaInfo().getType()) + " with KeyValueSchema");
            }
        }
    }

    @VisibleForTesting
    static Object convertAvroField(Object avroValue, Schema schema) {
        if (avroValue == null) {
            return null;
        }
        switch (schema.getType()) {
            case NULL: 
            case INT: 
            case LONG: 
            case DOUBLE: 
            case FLOAT: 
            case BOOLEAN: {
                return avroValue;
            }
            case ENUM: 
            case STRING: {
                return avroValue.toString();
            }
            case UNION: {
                for (Schema s : schema.getTypes()) {
                    if (s.getType() == Schema.Type.NULL) continue;
                    return BaseJdbcAutoSchemaSink.convertAvroField(avroValue, s);
                }
                throw new IllegalArgumentException("Found UNION schema but it doesn't contain any type");
            }
        }
        throw new UnsupportedOperationException("Unsupported avro schema type=" + String.valueOf(schema.getType()) + " for value field schema " + schema.getName());
    }
}

