package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.vertx.mutiny.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import javax.enterprise.context.ApplicationScoped;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaLatestCommit.class */
public class KafkaLatestCommit extends ContextHolder implements KafkaCommitHandler {
    private KafkaConsumer<?, ?> consumer;
    private final Map<TopicPartition, Long> offsets;

    @Identifier(KafkaCommitHandler.Strategy.LATEST)
    @ApplicationScoped
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaLatestCommit$Factory.class */
    public static class Factory implements KafkaCommitHandler.Factory {
        @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler.Factory
        public KafkaLatestCommit create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            return new KafkaLatestCommit(vertx, kafkaConnectorIncomingConfiguration, kafkaConsumer);
        }

        @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler.Factory
        public /* bridge */ /* synthetic */ KafkaCommitHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer kafkaConsumer, BiConsumer biConsumer) {
            return create(kafkaConnectorIncomingConfiguration, vertx, (KafkaConsumer<?, ?>) kafkaConsumer, (BiConsumer<Throwable, Boolean>) biConsumer);
        }
    }

    public KafkaLatestCommit(Vertx vertx, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaConsumer<?, ?> kafkaConsumer) {
        super(vertx, ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue());
        this.offsets = new HashMap();
        this.consumer = kafkaConsumer;
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        runOnContext(() -> {
            HashMap hashMap = new HashMap();
            TopicPartition topicPartition = TopicPartitions.getTopicPartition(incomingKafkaRecord);
            Long l = this.offsets.get(topicPartition);
            if (l == null || l.longValue() < incomingKafkaRecord.getOffset() + 1) {
                this.offsets.put(topicPartition, Long.valueOf(incomingKafkaRecord.getOffset() + 1));
                hashMap.put(topicPartition, new OffsetAndMetadata(incomingKafkaRecord.getOffset() + 1, (String) null));
                this.consumer.commitAsync(hashMap).subscribe().with(r1 -> {
                }, th -> {
                    KafkaLogging.log.failedToCommitAsync(topicPartition, incomingKafkaRecord.getOffset() + 1);
                });
            }
        });
        Uni voidItem = Uni.createFrom().voidItem();
        Objects.requireNonNull(incomingKafkaRecord);
        return voidItem.runSubscriptionOn(incomingKafkaRecord::runOnMessageContext);
    }
}
