package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaFailStop.class */
public class KafkaFailStop implements KafkaFailureHandler {
    private final String channel;
    private final BiConsumer<Throwable, Boolean> reportFailure;

    @ApplicationScoped
    @Identifier(KafkaFailureHandler.Strategy.FAIL)
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaFailStop$Factory.class */
    public static class Factory implements KafkaFailureHandler.Factory {
        @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler.Factory
        public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            return new KafkaFailStop(kafkaConnectorIncomingConfiguration.getChannel(), biConsumer);
        }
    }

    public <K, V> KafkaFailStop(String str, BiConsumer<Throwable, Boolean> biConsumer) {
        this.channel = str;
        this.reportFailure = biConsumer;
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord, Throwable th, Metadata metadata) {
        KafkaLogging.log.messageNackedFailStop(this.channel);
        this.reportFailure.accept(th, true);
        Uni failure = Uni.createFrom().failure(th);
        Objects.requireNonNull(incomingKafkaRecord);
        return failure.emitOn(incomingKafkaRecord::runOnMessageContext);
    }
}
