package io.smallrye.reactive.messaging.kafka.companion;

import com.opencsv.CSVReader;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import java.io.Closeable;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/ProducerBuilder.class */
public class ProducerBuilder<K, V> implements Closeable {
    private static final Logger LOGGER = Logger.getLogger(ProducerBuilder.class);
    private final Map<String, Object> props;
    private final Function<Map<String, Object>, KafkaProducer<K, V>> producerCreator;
    private final Duration kafkaApiTimeout;
    private Serde<K> keySerde;
    private Serde<V> valueSerde;
    private KafkaProducer<K, V> kafkaProducer;
    private ExecutorService executorService;
    private BiConsumer<KafkaProducer<K, V>, Throwable> onTermination;

    public ProducerBuilder(Map<String, Object> map, Duration duration, String str, String str2) {
        this.onTermination = this::terminate;
        this.props = map;
        this.kafkaApiTimeout = duration;
        this.props.put("key.serializer", str);
        this.props.put("value.serializer", str2);
        this.producerCreator = KafkaProducer::new;
    }

    public ProducerBuilder(Map<String, Object> map, Duration duration, Serializer<K> serializer, Serializer<V> serializer2) {
        this.onTermination = this::terminate;
        this.props = map;
        this.kafkaApiTimeout = duration;
        this.producerCreator = map2 -> {
            return new KafkaProducer(map2, serializer, serializer2);
        };
    }

    public ProducerBuilder(Map<String, Object> map, Duration duration, Serde<K> serde, Serde<V> serde2) {
        this(map, duration, serde.serializer(), serde2.serializer());
        this.keySerde = serde;
        this.valueSerde = serde2;
    }

    private synchronized KafkaProducer<K, V> getOrCreateProducer() {
        if (this.kafkaProducer == null) {
            this.kafkaProducer = this.producerCreator.apply(this.props);
            if (this.props.containsKey("transactional.id")) {
                this.kafkaProducer.initTransactions();
            }
        }
        return this.kafkaProducer;
    }

    private synchronized ExecutorService getOrCreateExecutor() {
        if (this.executorService == null) {
            this.executorService = Executors.newFixedThreadPool(1, runnable -> {
                return new Thread(runnable, "producer-" + clientId());
            });
        }
        return this.executorService;
    }

    public KafkaProducer<K, V> unwrap() {
        return this.kafkaProducer;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.kafkaProducer != null) {
            LOGGER.infof("Closing producer %s", clientId());
            this.kafkaProducer.flush();
            this.kafkaProducer.close(this.kafkaApiTimeout);
            this.kafkaProducer = null;
            this.executorService.shutdown();
            this.executorService = null;
        }
    }

    private void terminate(KafkaProducer<K, V> kafkaProducer, Throwable th) {
        if (isTransactional()) {
            if (th == null) {
                kafkaProducer.commitTransaction();
            } else {
                kafkaProducer.abortTransaction();
            }
        }
        close();
    }

    public ProducerBuilder<K, V> withProp(String str, String str2) {
        this.props.put(str, str2);
        return this;
    }

    public ProducerBuilder<K, V> withProps(Map<String, String> map) {
        this.props.putAll(map);
        return this;
    }

    public ProducerBuilder<K, V> withClientId(String str) {
        return withProp("client.id", str);
    }

    public ProducerBuilder<K, V> withTransactionalId(String str) {
        return withProp("transactional.id", str);
    }

    public ProducerBuilder<K, V> withOnTermination(BiConsumer<KafkaProducer<K, V>, Throwable> biConsumer) {
        Objects.requireNonNull(this.props.get("transactional.id"), "transactional id");
        this.onTermination = biConsumer;
        return this;
    }

    public String clientId() {
        return (String) this.props.get("client.id");
    }

    public boolean isTransactional() {
        return this.props.containsKey("transactional.id");
    }

    private Uni<RecordMetadata> record(ProducerRecord<K, V> producerRecord) {
        return Uni.createFrom().emitter(uniEmitter -> {
            getOrCreateProducer().send(producerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    uniEmitter.fail(exc);
                } else {
                    uniEmitter.complete(recordMetadata);
                }
            });
        }).emitOn(getOrCreateExecutor()).invoke(() -> {
            LOGGER.debugf("Producer %s: sent message %s", clientId(), producerRecord);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Multi<RecordMetadata> getProduceMulti(Multi<ProducerRecord<K, V>> multi) {
        return Multi.createFrom().deferred(() -> {
            if (isTransactional()) {
                getOrCreateProducer().beginTransaction();
            }
            return multi.onItem().transformToUniAndConcatenate(this::record).runSubscriptionOn(getOrCreateExecutor()).onTermination().invoke((th, bool) -> {
                this.onTermination.accept(getOrCreateProducer(), th);
            });
        });
    }

    public ProducerTask fromCsv(String str) {
        Objects.requireNonNull(str, "resource path");
        Objects.requireNonNull(this.keySerde, "Producer needs to be created with key Serde");
        Objects.requireNonNull(this.valueSerde, "Producer needs to be created with value Serde");
        return new ProducerTask(getProduceMulti(Multi.createFrom().resource(() -> {
            return new CSVReader(new InputStreamReader(getResourceAsStream(str), StandardCharsets.UTF_8));
        }, cSVReader -> {
            return Multi.createFrom().iterable(cSVReader).onItem().transform(this::getProducerRecord).filter((v0) -> {
                return Objects.nonNull(v0);
            });
        }).withFinalizer(Unchecked.consumer((v0) -> {
            v0.close();
        }))));
    }

    private ProducerRecord<K, V> getProducerRecord(String[] strArr) {
        if (strArr.length < 1) {
            return null;
        }
        return strArr.length == 1 ? new ProducerRecord<>(strArr[0], (Object) null) : strArr.length == 2 ? new ProducerRecord<>(strArr[0], this.valueSerde.deserializer().deserialize(strArr[0], strArr[1].getBytes(StandardCharsets.UTF_8))) : strArr.length == 3 ? new ProducerRecord<>(strArr[0], this.keySerde.deserializer().deserialize(strArr[0], strArr[1].getBytes(StandardCharsets.UTF_8)), this.valueSerde.deserializer().deserialize(strArr[0], strArr[1].getBytes(StandardCharsets.UTF_8))) : new ProducerRecord<>(strArr[0], Integer.valueOf(Integer.parseInt(strArr[1])), this.keySerde.deserializer().deserialize(strArr[0], strArr[2].getBytes(StandardCharsets.UTF_8)), this.valueSerde.deserializer().deserialize(strArr[0], strArr[3].getBytes(StandardCharsets.UTF_8)));
    }

    private InputStream getResourceAsStream(String str) {
        InputStream resourceAsStream;
        HashSet<ClassLoader> hashSet = new HashSet();
        hashSet.add(Thread.currentThread().getContextClassLoader());
        hashSet.add(ClassLoader.getSystemClassLoader());
        hashSet.add(KafkaCompanion.class.getClassLoader());
        for (ClassLoader classLoader : hashSet) {
            InputStream resourceAsStream2 = classLoader.getResourceAsStream(str);
            if (resourceAsStream2 != null) {
                return resourceAsStream2;
            }
            if (str.startsWith("/") && (resourceAsStream = classLoader.getResourceAsStream(str.replaceFirst("/", ""))) != null) {
                return resourceAsStream;
            }
        }
        throw new IllegalArgumentException("Resource '" + str + "' not found on classpath.");
    }

    public ProducerTask fromMulti(Multi<ProducerRecord<K, V>> multi) {
        Objects.requireNonNull(multi, "record multi");
        return new ProducerTask(getProduceMulti(multi));
    }

    public ProducerTask fromRecords(List<ProducerRecord<K, V>> list) {
        Objects.requireNonNull(list, "records");
        return fromMulti(Multi.createFrom().iterable(list));
    }

    @SafeVarargs
    public final ProducerTask fromRecords(ProducerRecord<K, V>... producerRecordArr) {
        Objects.requireNonNull(producerRecordArr, "records");
        return fromMulti(Multi.createFrom().items(producerRecordArr));
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> function, Function<Multi<ProducerRecord<K, V>>, Multi<ProducerRecord<K, V>>> function2) {
        Objects.requireNonNull(function, "record generator function");
        return fromMulti(Multi.createFrom().range(0, Integer.MAX_VALUE).onItem().transform(function).plug(function2));
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> function) {
        return usingGenerator(function, Function.identity());
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> function, long j) {
        return usingGenerator(function, RecordQualifiers.until(Long.valueOf(j)));
    }

    public ProducerTask usingGenerator(Function<Integer, ProducerRecord<K, V>> function, Duration duration) {
        return usingGenerator(function, RecordQualifiers.until(duration));
    }
}
