package org.apache.pulsar.io.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "elastic_search", type = IOType.SINK, help = "A sink connector that sends pulsar messages to elastic search", configClass = ElasticSearchConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/elasticsearch/ElasticSearchSink.class */
public class ElasticSearchSink implements Sink<GenericObject> {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchSink.class);
    private ElasticSearchConfig elasticSearchConfig;
    private ElasticSearchClient elasticsearchClient;
    private ObjectMapper sortedObjectMapper;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private List<String> primaryFields = null;
    private final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
    private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.io.elasticsearch.ElasticSearchSink$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/io/elasticsearch/ElasticSearchSink$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$schema$SchemaType = new int[SchemaType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT8.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT16.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT32.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.INT64.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.STRING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.JSON.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.AVRO.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$IdHashingAlgorithm = new int[ElasticSearchConfig.IdHashingAlgorithm.values().length];
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$IdHashingAlgorithm[ElasticSearchConfig.IdHashingAlgorithm.SHA256.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$IdHashingAlgorithm[ElasticSearchConfig.IdHashingAlgorithm.SHA512.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$MalformedDocAction = new int[ElasticSearchConfig.MalformedDocAction.values().length];
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$MalformedDocAction[ElasticSearchConfig.MalformedDocAction.IGNORE.ordinal()] = 1;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$MalformedDocAction[ElasticSearchConfig.MalformedDocAction.WARN.ordinal()] = 2;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$MalformedDocAction[ElasticSearchConfig.MalformedDocAction.FAIL.ordinal()] = 3;
            } catch (NoSuchFieldError e12) {
            }
            $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$NullValueAction = new int[ElasticSearchConfig.NullValueAction.values().length];
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$NullValueAction[ElasticSearchConfig.NullValueAction.DELETE.ordinal()] = 1;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$NullValueAction[ElasticSearchConfig.NullValueAction.IGNORE.ordinal()] = 2;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$pulsar$io$elasticsearch$ElasticSearchConfig$NullValueAction[ElasticSearchConfig.NullValueAction.FAIL.ordinal()] = 3;
            } catch (NoSuchFieldError e15) {
            }
        }
    }

    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.elasticSearchConfig = ElasticSearchConfig.load(map);
        this.elasticSearchConfig.validate();
        this.elasticsearchClient = new ElasticSearchClient(this.elasticSearchConfig);
        if (!Strings.isNullOrEmpty(this.elasticSearchConfig.getPrimaryFields())) {
            this.primaryFields = Arrays.asList(this.elasticSearchConfig.getPrimaryFields().split(","));
        }
        if (this.elasticSearchConfig.isCanonicalKeyFields()) {
            this.sortedObjectMapper = JsonMapper.builder().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true).nodeFactory(new JsonNodeFactory() { // from class: org.apache.pulsar.io.elasticsearch.ElasticSearchSink.1
                @Override // com.fasterxml.jackson.databind.node.JsonNodeFactory, com.fasterxml.jackson.databind.node.JsonNodeCreator
                public ObjectNode objectNode() {
                    return new ObjectNode(this, new TreeMap());
                }
            }).build();
        }
    }

    public void close() {
        if (this.elasticsearchClient != null) {
            this.elasticsearchClient.close();
            this.elasticsearchClient = null;
        }
    }

    @VisibleForTesting
    void setElasticsearchClient(ElasticSearchClient elasticSearchClient) {
        this.elasticsearchClient = elasticSearchClient;
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x0042. Please report as an issue. */
    public void write(Record<GenericObject> record) throws Exception {
        if (this.elasticsearchClient.isFailed()) {
            throw new IllegalStateException("Elasticsearch client is in FAILED status");
        }
        Pair<String, String> extractIdAndDocument = extractIdAndDocument(record);
        try {
            if (log.isDebugEnabled()) {
                log.debug("index doc {} {}", extractIdAndDocument.getLeft(), extractIdAndDocument.getRight());
            }
            if (extractIdAndDocument.getRight() == null) {
                switch (this.elasticSearchConfig.getNullValueAction()) {
                    case DELETE:
                        if (extractIdAndDocument.getLeft() != null) {
                            if (this.elasticSearchConfig.isBulkEnabled()) {
                                this.elasticsearchClient.bulkDelete(record, extractIdAndDocument.getLeft());
                            } else {
                                this.elasticsearchClient.deleteDocument(record, extractIdAndDocument.getLeft());
                            }
                        }
                        break;
                    case FAIL:
                        this.elasticsearchClient.failed(new PulsarClientException.InvalidMessageException("Unexpected null message value"));
                        throw this.elasticsearchClient.irrecoverableError.get();
                }
            } else if (this.elasticSearchConfig.isBulkEnabled()) {
                this.elasticsearchClient.bulkIndex(record, extractIdAndDocument);
            } else {
                this.elasticsearchClient.indexDocument(record, extractIdAndDocument);
            }
        } catch (JsonProcessingException e) {
            switch (this.elasticSearchConfig.getMalformedDocAction()) {
                case IGNORE:
                default:
                    return;
                case WARN:
                    log.warn("Ignoring malformed document messageId={}", record.getMessage().map((v0) -> {
                        return v0.getMessageId();
                    }).orElse(null), e);
                    this.elasticsearchClient.failed(e);
                    throw e;
                case FAIL:
                    log.error("Malformed document messageId={}", record.getMessage().map((v0) -> {
                        return v0.getMessageId();
                    }).orElse(null), e);
                    this.elasticsearchClient.failed(e);
                    throw e;
            }
        } catch (Exception e2) {
            log.error("write error for {} {}:", new Object[]{extractIdAndDocument.getLeft(), extractIdAndDocument.getRight(), e2});
            throw e2;
        }
    }

    @VisibleForTesting
    ElasticSearchClient getElasticsearchClient() {
        return this.elasticsearchClient;
    }

    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) throws JsonProcessingException {
        Object orElse;
        Schema<?> schema;
        GenericObject genericObject;
        Hasher newHasher;
        if (!this.elasticSearchConfig.isSchemaEnable()) {
            return Pair.of(null, sanitizeValue(new String(((Message) record.getMessage().orElseThrow(() -> {
                return new IllegalArgumentException("Record does not carry message information");
            })).getData(), StandardCharsets.UTF_8)));
        }
        Schema<?> schema2 = null;
        if (record.getSchema() == null || !(record.getSchema() instanceof KeyValueSchema)) {
            orElse = record.getKey().orElse(null);
            schema = record.getSchema();
            genericObject = (GenericObject) record.getValue();
        } else {
            KeyValueSchema schema3 = record.getSchema();
            schema2 = schema3.getKeySchema();
            schema = schema3.getValueSchema();
            KeyValue keyValue = (KeyValue) ((GenericObject) record.getValue()).getNativeObject();
            orElse = keyValue.getKey();
            genericObject = (GenericObject) keyValue.getValue();
        }
        String str = null;
        if (!this.elasticSearchConfig.isKeyIgnore() && orElse != null && schema2 != null) {
            str = stringifyKey(schema2, orElse);
        }
        String str2 = null;
        if (genericObject != null) {
            str2 = schema != null ? (this.elasticSearchConfig.isCopyKeyFields() && (schema2.getSchemaInfo().getType().equals(SchemaType.AVRO) || schema2.getSchemaInfo().getType().equals(SchemaType.JSON))) ? stringify(JsonConverter.topLevelMerge(extractJsonNode(this.elasticSearchConfig, schema2, orElse), extractJsonNode(this.elasticSearchConfig, schema, genericObject))) : stringifyValue(schema, genericObject) : genericObject.getNativeObject() instanceof byte[] ? new String((byte[]) genericObject.getNativeObject(), StandardCharsets.UTF_8) : genericObject.getNativeObject().toString();
        }
        if (str2 != null && this.primaryFields != null) {
            try {
                str = stringifyKey(this.objectMapper.readTree(str2), this.primaryFields);
            } catch (JsonProcessingException e) {
                log.error("Failed to read JSON", e);
                throw e;
            }
        }
        ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm = this.elasticSearchConfig.getIdHashingAlgorithm();
        if (str != null && idHashingAlgorithm != null && idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
            boolean z = true;
            if (this.elasticSearchConfig.isConditionalIdHashing() && bytes.length <= 512) {
                z = false;
            }
            if (z) {
                switch (idHashingAlgorithm) {
                    case SHA256:
                        newHasher = Hashing.sha256().newHasher();
                        break;
                    case SHA512:
                        newHasher = Hashing.sha512().newHasher();
                        break;
                    default:
                        throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: " + idHashingAlgorithm);
                }
                newHasher.putBytes(bytes);
                str = this.base64Encoder.encodeToString(newHasher.hash().asBytes());
            }
        }
        if (log.isDebugEnabled()) {
            SchemaType schemaType = null;
            if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
                schemaType = record.getSchema().getSchemaInfo().getType();
            }
            log.debug("recordType={} schemaType={} id={} doc={}", new Object[]{record.getClass().getName(), schemaType, str, str2});
        }
        return Pair.of(str, sanitizeValue(str2));
    }

    private String sanitizeValue(String str) {
        return (str == null || !this.elasticSearchConfig.isStripNonPrintableCharacters()) ? str : this.nonPrintableCharactersPattern.matcher(str).replaceAll("");
    }

    public String stringifyKey(Schema<?> schema, Object obj) throws JsonProcessingException {
        switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$common$schema$SchemaType[schema.getSchemaInfo().getType().ordinal()]) {
            case 1:
                return Byte.toString(((Byte) obj).byteValue());
            case 2:
                return Short.toString(((Short) obj).shortValue());
            case 3:
                return Integer.toString(((Integer) obj).intValue());
            case 4:
                return Long.toString(((Long) obj).longValue());
            case 5:
                return (String) obj;
            case 6:
            case 7:
                return stringifyKey(extractJsonNode(this.elasticSearchConfig, schema, obj));
            default:
                throw new UnsupportedOperationException("Unsupported key schemaType=" + schema.getSchemaInfo().getType());
        }
    }

    public String stringifyKey(JsonNode jsonNode) throws JsonProcessingException {
        ArrayList arrayList = new ArrayList();
        Iterator<String> fieldNames = jsonNode.fieldNames();
        Objects.requireNonNull(arrayList);
        fieldNames.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        return stringifyKey(jsonNode, arrayList);
    }

    public String stringifyKey(JsonNode jsonNode, List<String> list) throws JsonProcessingException {
        JsonNode jsonArray = list.size() == 1 ? jsonNode.get(list.get(0)) : JsonConverter.toJsonArray(jsonNode, list);
        String writeValueAsString = this.elasticSearchConfig.isCanonicalKeyFields() ? this.sortedObjectMapper.writeValueAsString(this.sortedObjectMapper.treeToValue(jsonArray, Object.class)) : this.objectMapper.writeValueAsString(jsonArray);
        return (writeValueAsString.startsWith("\"") && writeValueAsString.endsWith("\"")) ? writeValueAsString.substring(1, writeValueAsString.length() - 1) : writeValueAsString;
    }

    public String stringifyValue(Schema<?> schema, Object obj) throws JsonProcessingException {
        return stringify(extractJsonNode(this.elasticSearchConfig, schema, obj));
    }

    public String stringify(JsonNode jsonNode) throws JsonProcessingException {
        return this.elasticSearchConfig.isStripNulls() ? this.objectMapper.writeValueAsString(stripNullNodes(jsonNode)) : this.objectMapper.writeValueAsString(jsonNode);
    }

    public static JsonNode stripNullNodes(JsonNode jsonNode) {
        Iterator<JsonNode> it = jsonNode.iterator();
        while (it.hasNext()) {
            JsonNode next = it.next();
            if (next.isNull()) {
                it.remove();
            } else {
                stripNullNodes(next);
            }
        }
        return jsonNode;
    }

    public static JsonNode extractJsonNode(ElasticSearchConfig elasticSearchConfig, Schema<?> schema, Object obj) {
        if (obj == null) {
            return null;
        }
        switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$common$schema$SchemaType[schema.getSchemaInfo().getType().ordinal()]) {
            case 6:
                return (JsonNode) ((GenericRecord) obj).getNativeObject();
            case 7:
                return JsonConverter.toJson(elasticSearchConfig, (org.apache.avro.generic.GenericRecord) ((GenericRecord) obj).getNativeObject());
            default:
                throw new UnsupportedOperationException("Unsupported value schemaType=" + schema.getSchemaInfo().getType());
        }
    }
}
