package org.apache.pulsar.functions.sink;

import com.google.common.annotations.VisibleForTesting;
import java.nio.charset.StandardCharsets;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.AbstractSinkRecord;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink.class */
public class PulsarSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarSink.class);
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private ComponentStatsManager stats;

    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
    private final TopicSchema topicSchema;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$Crypto.class */
    public static class Crypto {
        private CryptoKeyReader keyReader;
        private ProducerCryptoFailureAction failureAction;
        private String[] encryptionKeys;

        /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$Crypto$CryptoBuilder.class */
        public static class CryptoBuilder {
            private CryptoKeyReader keyReader;
            private ProducerCryptoFailureAction failureAction;
            private String[] encryptionKeys;

            CryptoBuilder() {
            }

            public CryptoBuilder keyReader(CryptoKeyReader cryptoKeyReader) {
                this.keyReader = cryptoKeyReader;
                return this;
            }

            public CryptoBuilder failureAction(ProducerCryptoFailureAction producerCryptoFailureAction) {
                this.failureAction = producerCryptoFailureAction;
                return this;
            }

            public CryptoBuilder encryptionKeys(String[] strArr) {
                this.encryptionKeys = strArr;
                return this;
            }

            public Crypto build() {
                return new Crypto(this.keyReader, this.failureAction, this.encryptionKeys);
            }

            public String toString() {
                return "PulsarSink.Crypto.CryptoBuilder(keyReader=" + this.keyReader + ", failureAction=" + this.failureAction + ", encryptionKeys=" + Arrays.deepToString(this.encryptionKeys) + ")";
            }
        }

        Crypto(CryptoKeyReader cryptoKeyReader, ProducerCryptoFailureAction producerCryptoFailureAction, String[] strArr) {
            this.keyReader = cryptoKeyReader;
            this.failureAction = producerCryptoFailureAction;
            this.encryptionKeys = strArr;
        }

        public static CryptoBuilder builder() {
            return new CryptoBuilder();
        }

        public CryptoKeyReader getKeyReader() {
            return this.keyReader;
        }

        public ProducerCryptoFailureAction getFailureAction() {
            return this.failureAction;
        }

        public String[] getEncryptionKeys() {
            return this.encryptionKeys;
        }

        public void setKeyReader(CryptoKeyReader cryptoKeyReader) {
            this.keyReader = cryptoKeyReader;
        }

        public void setFailureAction(ProducerCryptoFailureAction producerCryptoFailureAction) {
            this.failureAction = producerCryptoFailureAction;
        }

        public void setEncryptionKeys(String[] strArr) {
            this.encryptionKeys = strArr;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Crypto)) {
                return false;
            }
            Crypto crypto = (Crypto) obj;
            if (!crypto.canEqual(this)) {
                return false;
            }
            CryptoKeyReader keyReader = getKeyReader();
            CryptoKeyReader keyReader2 = crypto.getKeyReader();
            if (keyReader == null) {
                if (keyReader2 != null) {
                    return false;
                }
            } else if (!keyReader.equals(keyReader2)) {
                return false;
            }
            ProducerCryptoFailureAction failureAction = getFailureAction();
            ProducerCryptoFailureAction failureAction2 = crypto.getFailureAction();
            if (failureAction == null) {
                if (failureAction2 != null) {
                    return false;
                }
            } else if (!failureAction.equals(failureAction2)) {
                return false;
            }
            return Arrays.deepEquals(getEncryptionKeys(), crypto.getEncryptionKeys());
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof Crypto;
        }

        public int hashCode() {
            CryptoKeyReader keyReader = getKeyReader();
            int hashCode = (1 * 59) + (keyReader == null ? 43 : keyReader.hashCode());
            ProducerCryptoFailureAction failureAction = getFailureAction();
            return (((hashCode * 59) + (failureAction == null ? 43 : failureAction.hashCode())) * 59) + Arrays.deepHashCode(getEncryptionKeys());
        }

        public String toString() {
            return "PulsarSink.Crypto(keyReader=" + getKeyReader() + ", failureAction=" + getFailureAction() + ", encryptionKeys=" + Arrays.deepToString(getEncryptionKeys()) + ")";
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtLeastOnceProcessor.class */
    class PulsarSinkAtLeastOnceProcessor extends PulsarSink<T>.PulsarSinkAtMostOnceProcessor {
        public PulsarSinkAtLeastOnceProcessor(Schema schema, Crypto crypto) {
            super(schema, crypto);
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkAtMostOnceProcessor, org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
                abstractSinkRecord.ack();
            }).exceptionally((Function<Throwable, ? extends Void>) getPublishErrorHandler(abstractSinkRecord, true));
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtMostOnceProcessor.class */
    class PulsarSinkAtMostOnceProcessor extends PulsarSink<T>.PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
            super(schema, crypto);
            if (schema instanceof AutoConsumeSchema) {
                if (PulsarSink.log.isDebugEnabled()) {
                    PulsarSink.log.debug("The Pulsar producer is not initialized until the first record is published for `AUTO_CONSUME` schema.");
                }
            } else {
                try {
                    this.publishProducers.put(PulsarSink.this.pulsarSinkConfig.getTopic(), createProducer(PulsarSink.this.client, PulsarSink.this.pulsarSinkConfig.getTopic(), null, schema));
                } catch (PulsarClientException e) {
                    PulsarSink.log.error("Failed to create Producer while doing user publish", (Throwable) e);
                    throw new RuntimeException(e);
                }
            }
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> abstractSinkRecord) {
            Schema<T> schema = abstractSinkRecord.getSchema();
            if (!abstractSinkRecord.shouldSetSchema()) {
                schema = this.schema;
            }
            return schema != null ? (TypedMessageBuilder<T>) getProducer(abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), schema).newMessage(schema) : getProducer(abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), null).newMessage();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
            }).exceptionally((Function<Throwable, ? extends Void>) getPublishErrorHandler(abstractSinkRecord, false));
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkEffectivelyOnceProcessor.class */
    class PulsarSinkEffectivelyOnceProcessor extends PulsarSink<T>.PulsarSinkProcessorBase {
        public PulsarSinkEffectivelyOnceProcessor(Schema schema, Crypto crypto) {
            super(schema, crypto);
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> abstractSinkRecord) {
            if (!abstractSinkRecord.getPartitionId().isPresent()) {
                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
            }
            Schema<T> schema = abstractSinkRecord.getSchema();
            if (!abstractSinkRecord.shouldSetSchema()) {
                schema = this.schema;
            }
            Producer<T> producer = getProducer(String.format("%s-%s", abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), abstractSinkRecord.getPartitionId().get()), abstractSinkRecord.getPartitionId().get(), abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), schema);
            return schema != null ? (TypedMessageBuilder<T>) producer.newMessage(schema) : producer.newMessage();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord) {
            if (!abstractSinkRecord.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
            }
            typedMessageBuilder.sequenceId(abstractSinkRecord.getRecordSequence().get().longValue());
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
                abstractSinkRecord.ack();
            }).exceptionally((Function<Throwable, ? extends Void>) getPublishErrorHandler(abstractSinkRecord, true));
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessor.class */
    private interface PulsarSinkProcessor<T> {
        TypedMessageBuilder<T> newMessage(AbstractSinkRecord<T> abstractSinkRecord);

        void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, AbstractSinkRecord<T> abstractSinkRecord);

        void close() throws Exception;
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.4.8.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessorBase.class */
    abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
        protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap();
        protected Schema schema;
        protected Crypto crypto;

        protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
            this.schema = schema;
            this.crypto = crypto;
        }

        public Producer<T> createProducer(PulsarClient pulsarClient, String str, String str2, Schema<T> schema) throws PulsarClientException {
            ProducerBuilder<T> producerBuilder = pulsarClient.newProducer(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(str);
            if (str2 != null) {
                producerBuilder.producerName(str2);
            }
            if (PulsarSink.this.pulsarSinkConfig.getProducerConfig() != null) {
                ProducerConfig producerConfig = PulsarSink.this.pulsarSinkConfig.getProducerConfig();
                if (producerConfig.getMaxPendingMessages().intValue() != 0) {
                    producerBuilder.maxPendingMessages(producerConfig.getMaxPendingMessages().intValue());
                }
                if (producerConfig.getMaxPendingMessagesAcrossPartitions().intValue() != 0) {
                    producerBuilder.maxPendingMessagesAcrossPartitions(producerConfig.getMaxPendingMessagesAcrossPartitions().intValue());
                }
                if (producerConfig.getCryptoConfig() != null) {
                    producerBuilder.cryptoKeyReader(this.crypto.keyReader);
                    producerBuilder.cryptoFailureAction(this.crypto.failureAction);
                    for (String str3 : this.crypto.getEncryptionKeys()) {
                        producerBuilder.addEncryptionKey(str3);
                    }
                }
                if (producerConfig.getBatchBuilder() != null) {
                    if (producerConfig.getBatchBuilder().equals("KEY_BASED")) {
                        producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
                    } else {
                        producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
                    }
                }
            }
            return producerBuilder.properties(PulsarSink.this.properties).create();
        }

        protected Producer<T> getProducer(String str, Schema schema) {
            return getProducer(str, null, str, schema);
        }

        protected Producer<T> getProducer(String str, String str2, String str3, Schema schema) {
            return this.publishProducers.computeIfAbsent(str, str4 -> {
                try {
                    PulsarSink.log.info("Initializing producer {} on topic {} with schema {}", str2, str3, schema);
                    Producer<T> createProducer = createProducer(PulsarSink.this.client, str3, str2, schema != null ? schema : this.schema);
                    PulsarSink.log.info("Initialized producer {} on topic {} with schema {}: {} -> {}", str2, str3, schema, str, createProducer);
                    return createProducer;
                } catch (PulsarClientException e) {
                    PulsarSink.log.error("Failed to create Producer while doing user publish", (Throwable) e);
                    throw new RuntimeException(e);
                }
            });
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void close() throws Exception {
            ArrayList arrayList = new ArrayList(this.publishProducers.size());
            Iterator<Map.Entry<String, Producer<T>>> it = this.publishProducers.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getValue().closeAsync());
            }
            try {
                FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList);
            } catch (Exception e) {
                PulsarSink.log.warn("Failed to close all the producers", (Throwable) e);
            }
        }

        public Function<Throwable, Void> getPublishErrorHandler(AbstractSinkRecord<T> abstractSinkRecord, boolean z) {
            return th -> {
                String format;
                Record<?> sourceRecord = abstractSinkRecord.getSourceRecord();
                if (z) {
                    sourceRecord.fail();
                }
                String orElse = abstractSinkRecord.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
                if (sourceRecord instanceof PulsarRecord) {
                    format = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", orElse, th.getMessage(), ((PulsarRecord) sourceRecord).getMessageId());
                } else {
                    format = String.format("Failed to publish to topic [%s] with error [%s]", orElse, th.getMessage());
                    if (abstractSinkRecord.getRecordSequence().isPresent()) {
                        format = String.format(format + " with src sequence id [%s]", abstractSinkRecord.getRecordSequence().get());
                    }
                }
                PulsarSink.log.error(format);
                PulsarSink.this.stats.incrSinkExceptions(new Exception(format));
                return null;
            };
        }
    }

    public PulsarSink(PulsarClient pulsarClient, PulsarSinkConfig pulsarSinkConfig, Map<String, String> map, ComponentStatsManager componentStatsManager, ClassLoader classLoader) {
        this.client = pulsarClient;
        this.pulsarSinkConfig = pulsarSinkConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = map;
        this.stats = componentStatsManager;
        this.functionClassLoader = classLoader;
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        log.info("Opening pulsar sink with config: {}", this.pulsarSinkConfig);
        Schema<T> initializeSchema = initializeSchema();
        if (initializeSchema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
        Crypto initializeCrypto = initializeCrypto();
        if (initializeCrypto == null) {
            log.info("crypto key reader is not provided, not enabling end to end encryption");
        }
        switch (this.pulsarSinkConfig.getProcessingGuarantees()) {
            case ATMOST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(initializeSchema, initializeCrypto);
                return;
            case ATLEAST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(initializeSchema, initializeCrypto);
                return;
            case EFFECTIVELY_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(initializeSchema, initializeCrypto);
                return;
            default:
                return;
        }
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) {
        AbstractSinkRecord<T> abstractSinkRecord = (AbstractSinkRecord) record;
        TypedMessageBuilder<T> newMessage = this.pulsarSinkProcessor.newMessage(abstractSinkRecord);
        if (record.getKey().isPresent() && (!(record.getSchema() instanceof KeyValueSchema) || ((KeyValueSchema) record.getSchema()).getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED)) {
            newMessage.key(record.getKey().get());
        }
        newMessage.value(record.getValue());
        if (!record.getProperties().isEmpty() && (abstractSinkRecord.shouldAlwaysSetMessageProperties() || this.pulsarSinkConfig.isForwardSourceMessageProperty())) {
            newMessage.properties(record.getProperties());
        }
        if (abstractSinkRecord.getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord) abstractSinkRecord.getSourceRecord();
            newMessage.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()).property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()), StandardCharsets.UTF_8));
        } else {
            Optional<Long> eventTime = abstractSinkRecord.getSourceRecord().getEventTime();
            Objects.requireNonNull(newMessage);
            eventTime.ifPresent((v1) -> {
                r1.eventTime(v1);
            });
        }
        this.pulsarSinkProcessor.sendOutputMessage(newMessage, abstractSinkRecord);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    @VisibleForTesting
    Schema<T> initializeSchema() throws ClassNotFoundException {
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
            return (Schema<T>) Schema.BYTES;
        }
        Class<?> loadClass = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), this.functionClassLoader);
        if (Void.class.equals(loadClass)) {
            return null;
        }
        ConsumerConfig consumerConfig = new ConsumerConfig();
        consumerConfig.setSchemaProperties(this.pulsarSinkConfig.getSchemaProperties());
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getSchemaType())) {
            consumerConfig.setSchemaType(this.pulsarSinkConfig.getSerdeClassName());
            return (Schema<T>) this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), loadClass, consumerConfig, false, this.functionClassLoader);
        }
        if (GenericRecord.class.isAssignableFrom(loadClass)) {
            consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
            SchemaType valueOf = SchemaType.valueOf(this.pulsarSinkConfig.getSchemaType());
            if (SchemaType.AUTO_CONSUME != valueOf) {
                log.info("The configured schema type {} is not able to write GenericRecords. So overwrite the schema type to be {}", valueOf, SchemaType.AUTO_CONSUME);
            }
        } else {
            consumerConfig.setSchemaType(this.pulsarSinkConfig.getSchemaType());
        }
        return (Schema<T>) this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), loadClass, consumerConfig, false);
    }

    @VisibleForTesting
    Crypto initializeCrypto() throws ClassNotFoundException {
        if (this.pulsarSinkConfig.getProducerConfig() == null || this.pulsarSinkConfig.getProducerConfig().getCryptoConfig() == null || org.apache.commons.lang.StringUtils.isEmpty(this.pulsarSinkConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
            return null;
        }
        CryptoConfig cryptoConfig = this.pulsarSinkConfig.getProducerConfig().getCryptoConfig();
        if (Security.getProvider("BC") == null) {
            Security.addProvider(new BouncyCastleProvider());
        }
        Crypto.CryptoBuilder encryptionKeys = Crypto.builder().failureAction(cryptoConfig.getProducerCryptoFailureAction()).encryptionKeys(cryptoConfig.getEncryptionKeys());
        encryptionKeys.keyReader(CryptoUtils.getCryptoKeyReaderInstance(cryptoConfig.getCryptoKeyReaderClassName(), cryptoConfig.getCryptoKeyReaderConfig(), this.functionClassLoader));
        return encryptionKeys.build();
    }
}
