package io.smallrye.reactive.messaging.kafka.transactions;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.class */
public class KafkaTransactionsImpl<T> extends MutinyEmitterImpl<T> implements KafkaTransactions<T> {
    private final KafkaClientService clientService;
    private final KafkaProducer<?, ?> producer;
    private volatile KafkaTransactionsImpl<T>.Transaction<?> currentTransaction;
    private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl$Transaction.class */
    private class Transaction<R> implements TransactionalEmitter<T> {
        private final Uni<Void> beforeCommit;
        private final Function<R, Uni<R>> afterCommit;
        private final Uni<Void> beforeAbort;
        private final Function<Throwable, Uni<R>> afterAbort;
        private volatile boolean abort;

        public Transaction(KafkaTransactionsImpl kafkaTransactionsImpl) {
            this(KafkaTransactionsImpl.VOID_UNI, obj -> {
                return KafkaTransactionsImpl.defaultAfterCommit(obj);
            }, KafkaTransactionsImpl.VOID_UNI, th -> {
                return KafkaTransactionsImpl.defaultAfterAbort(th);
            });
        }

        public Transaction(Uni<Void> uni, Function<R, Uni<R>> function, Uni<Void> uni2, Function<Throwable, Uni<R>> function2) {
            this.beforeCommit = uni;
            this.afterCommit = function;
            this.beforeAbort = uni2;
            this.afterAbort = function2;
        }

        Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> function) {
            KafkaTransactionsImpl.this.currentTransaction = this;
            Context currentContext = Vertx.currentContext();
            Uni<Void> beginTransaction = KafkaTransactionsImpl.this.producer.beginTransaction();
            if (currentContext != null) {
                beginTransaction = beginTransaction.emitOn(runnable -> {
                    currentContext.runOnContext(r3 -> {
                        runnable.run();
                    });
                });
            }
            return beginTransaction.chain(() -> {
                return executeInTransaction(function);
            }).eventually(() -> {
                KafkaTransactionsImpl.this.currentTransaction = null;
            });
        }

        private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> function) {
            return Uni.createFrom().nullItem().chain(() -> {
                return (Uni) function.apply(this);
            }).call(() -> {
                return KafkaTransactionsImpl.this.producer.flush();
            }).onFailure().call(th -> {
                return abort();
            }).onCancellation().call(() -> {
                return abort();
            }).call(() -> {
                return this.abort ? abort() : commit();
            }).onFailure().recoverWithUni(th2 -> {
                return this.afterAbort.apply(th2);
            }).onItem().transformToUni(obj -> {
                return this.afterCommit.apply(obj);
            });
        }

        private Uni<Void> commit() {
            Uni<Void> uni = this.beforeCommit;
            KafkaProducer kafkaProducer = KafkaTransactionsImpl.this.producer;
            Objects.requireNonNull(kafkaProducer);
            return uni.call(kafkaProducer::commitTransaction);
        }

        private Uni<Void> abort() {
            Uni<Void> uni = this.beforeAbort;
            KafkaProducer kafkaProducer = KafkaTransactionsImpl.this.producer;
            Objects.requireNonNull(kafkaProducer);
            Uni<Void> call = uni.call(kafkaProducer::abortTransaction);
            return this.abort ? call.chain(() -> {
                return Uni.createFrom().failure(new TransactionAbortedException());
            }) : call;
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public <M extends Message<? extends T>> void send(M m) {
            KafkaTransactionsImpl.this.send(m.withNack(th -> {
                return CompletableFuture.completedFuture(null);
            }));
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public void send(T t) {
            UniSubscribe subscribe = KafkaTransactionsImpl.this.send(t).subscribe();
            Consumer consumer = r1 -> {
            };
            KafkaLogging kafkaLogging = KafkaLogging.log;
            Objects.requireNonNull(kafkaLogging);
            subscribe.with(consumer, kafkaLogging::unableToSendRecord);
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public void markForAbort() {
            this.abort = true;
        }

        @Override // io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter
        public boolean isMarkedForAbort() {
            return this.abort;
        }
    }

    public KafkaTransactionsImpl(EmitterConfiguration emitterConfiguration, long j, KafkaClientService kafkaClientService) {
        super(emitterConfiguration, j);
        this.clientService = kafkaClientService;
        this.producer = kafkaClientService.getProducer(emitterConfiguration.name());
    }

    @Override // io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions
    public synchronized boolean isTransactionInProgress() {
        return this.currentTransaction != null;
    }

    @Override // io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions
    @CheckReturnValue
    public synchronized <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> function) {
        if (this.currentTransaction == null) {
            return new Transaction(this).execute(function);
        }
        throw KafkaExceptions.ex.transactionInProgress(this.name);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v40, types: [java.util.Map] */
    @Override // io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions
    @CheckReturnValue
    public synchronized <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> function) {
        String channel;
        HashMap hashMap;
        Optional metadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class);
        Optional metadata2 = message.getMetadata(IncomingKafkaRecordMetadata.class);
        if (metadata.isPresent()) {
            IncomingKafkaRecordBatchMetadata incomingKafkaRecordBatchMetadata = (IncomingKafkaRecordBatchMetadata) metadata.get();
            channel = incomingKafkaRecordBatchMetadata.getChannel();
            hashMap = (Map) incomingKafkaRecordBatchMetadata.getOffsets().entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return new OffsetAndMetadata(((OffsetAndMetadata) entry.getValue()).offset() + 1);
            }));
        } else {
            if (!metadata2.isPresent()) {
                throw KafkaExceptions.ex.noKafkaMetadataFound(message);
            }
            IncomingKafkaRecordMetadata incomingKafkaRecordMetadata = (IncomingKafkaRecordMetadata) metadata2.get();
            channel = incomingKafkaRecordMetadata.getChannel();
            hashMap = new HashMap();
            hashMap.put(new TopicPartition(incomingKafkaRecordMetadata.getTopic(), incomingKafkaRecordMetadata.getPartition()), new OffsetAndMetadata(incomingKafkaRecordMetadata.getOffset() + 1));
        }
        KafkaConsumer consumer = this.clientService.getConsumer(channel);
        if (consumer == null) {
            throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
        }
        if (this.currentTransaction != null) {
            throw KafkaExceptions.ex.transactionInProgress(this.name);
        }
        HashMap hashMap2 = hashMap;
        return new Transaction(consumer.consumerGroupMetadata().chain(consumerGroupMetadata -> {
            return this.producer.sendOffsetsToTransaction(hashMap2, consumerGroupMetadata);
        }), obj -> {
            return Uni.createFrom().item(obj);
        }, VOID_UNI, th -> {
            return consumer.resetToLastCommittedPositions().chain(() -> {
                return Uni.createFrom().failure(th);
            });
        }).execute(function);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> Uni<R> defaultAfterCommit(R r) {
        return Uni.createFrom().item(r);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> Uni<R> defaultAfterAbort(Throwable th) {
        return Uni.createFrom().failure(th);
    }
}
