package io.smallrye.reactive.messaging.kafka;

import io.smallrye.reactive.messaging.spi.IncomingConnectorFactory;
import io.smallrye.reactive.messaging.spi.OutgoingConnectorFactory;
import io.vertx.reactivex.core.Vertx;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.MessagingProvider;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@ApplicationScoped
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/KafkaMessagingProvider.class */
public class KafkaMessagingProvider implements IncomingConnectorFactory, OutgoingConnectorFactory {

    @Inject
    private Vertx vertx;
    private List<KafkaSource> sources = new CopyOnWriteArrayList();
    private List<KafkaSink> sinks = new CopyOnWriteArrayList();

    public void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        this.sources.forEach((v0) -> {
            v0.close();
        });
        this.sinks.forEach((v0) -> {
            v0.close();
        });
    }

    public Class<? extends MessagingProvider> type() {
        return Kafka.class;
    }

    public PublisherBuilder<KafkaMessage> getPublisherBuilder(Config config) {
        KafkaSource kafkaSource = new KafkaSource(this.vertx, config);
        this.sources.add(kafkaSource);
        return kafkaSource.getSource();
    }

    public SubscriberBuilder<? extends Message, Void> getSubscriberBuilder(Config config) {
        KafkaSink kafkaSink = new KafkaSink(this.vertx, config);
        this.sinks.add(kafkaSink);
        return kafkaSink.getSink();
    }
}
