package io.smallrye.reactive.messaging.pulsar;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck;
import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarExceptions;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.vertx.mutiny.core.Vertx;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(PulsarConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "client-configuration", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier."), @ConnectorAttribute(name = "serviceUrl", type = "string", defaultValue = "pulsar://localhost:6650", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The service URL for the Pulsar service"), @ConnectorAttribute(name = "topic", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The consumed / populated Pulsar topic. If not set, the channel name is used"), @ConnectorAttribute(name = "schema", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed `Schema` qualified with `@Identifier` and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used."), @ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true"), @ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true"), @ConnectorAttribute(name = "consumer-configuration", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier."), @ConnectorAttribute(name = "ack-strategy", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be `ack`, `cumulative`.", defaultValue = PulsarMessageAck.STRATEGY_NAME), @ConnectorAttribute(name = "failure-strategy", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `nack` (default), `fail`, `ignore` or `reconsume-later", defaultValue = PulsarNack.STRATEGY_NAME), @ConnectorAttribute(name = "reconsumeLater.delay", type = "long", direction = ConnectorAttribute.Direction.INCOMING, description = "Default delay for reconsume failure-strategy, in seconds", defaultValue = "3"), @ConnectorAttribute(name = "negativeAck.redeliveryBackoff", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier."), @ConnectorAttribute(name = "ackTimeout.redeliveryBackoff", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier."), @ConnectorAttribute(name = "deadLetterPolicy.maxRedeliverCount", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "Maximum number of times that a message will be redelivered before being sent to the dead letter topic"), @ConnectorAttribute(name = "deadLetterPolicy.deadLetterTopic", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Name of the dead letter topic where the failing messages will be sent"), @ConnectorAttribute(name = "deadLetterPolicy.retryLetterTopic", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Name of the retry topic where the failing messages will be sent"), @ConnectorAttribute(name = "deadLetterPolicy.initialSubscriptionName", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "Name of the initial subscription name of the dead letter topic"), @ConnectorAttribute(name = "batchReceive", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether batch receive is used to consume messages", defaultValue = "false"), @ConnectorAttribute(name = "producer-configuration", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier."), @ConnectorAttribute(name = "max-inflight-messages", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker. Defaults to 1000 messages"), @ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "Whether the client waits for the broker to acknowledge the written record before acknowledging the message", defaultValue = "true")})
/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/PulsarConnector.class */
public class PulsarConnector implements InboundConnector, OutboundConnector, HealthReporter {
    public static final String CONNECTOR_NAME = "smallrye-pulsar";
    private final Map<String, PulsarClient> clients = new ConcurrentHashMap();
    private final Map<String, PulsarClient> clientsByChannel = new ConcurrentHashMap();
    private final List<PulsarOutgoingChannel<?>> outgoingChannels = new CopyOnWriteArrayList();
    private final List<PulsarIncomingChannel<?>> incomingChannels = new CopyOnWriteArrayList();

    @Inject
    private ExecutionHolder executionHolder;
    private Vertx vertx;

    @Inject
    private SchemaResolver schemaResolver;

    @Inject
    private ConfigResolver configResolver;

    @Inject
    @Any
    private Instance<PulsarAckHandler.Factory> ackHandlerFactories;

    @Inject
    @Any
    private Instance<PulsarFailureHandler.Factory> failureHandlerFactories;

    @Inject
    private Instance<OpenTelemetry> openTelemetryInstance;

    @PostConstruct
    void init() {
        this.vertx = this.executionHolder.vertx();
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration = new PulsarConnectorIncomingConfiguration(config);
        ClientConfigurationData clientConf = this.configResolver.getClientConf(pulsarConnectorIncomingConfiguration);
        PulsarClient computeIfAbsent = this.clients.computeIfAbsent(clientHash(clientConf), str -> {
            return createPulsarClient(clientConf);
        });
        this.clientsByChannel.put(pulsarConnectorIncomingConfiguration.getChannel(), computeIfAbsent);
        try {
            PulsarIncomingChannel<?> pulsarIncomingChannel = new PulsarIncomingChannel<>(computeIfAbsent, this.vertx, this.schemaResolver.getSchema(pulsarConnectorIncomingConfiguration), (PulsarAckHandler.Factory) CDIUtils.getInstanceById(this.ackHandlerFactories, pulsarConnectorIncomingConfiguration.getAckStrategy()).get(), (PulsarFailureHandler.Factory) CDIUtils.getInstanceById(this.failureHandlerFactories, pulsarConnectorIncomingConfiguration.getFailureStrategy()).get(), pulsarConnectorIncomingConfiguration, this.configResolver, this.openTelemetryInstance);
            this.incomingChannels.add(pulsarIncomingChannel);
            return pulsarIncomingChannel.getPublisher();
        } catch (PulsarClientException e) {
            throw PulsarExceptions.ex.illegalStateUnableToBuildConsumer(e);
        }
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        PulsarConnectorOutgoingConfiguration pulsarConnectorOutgoingConfiguration = new PulsarConnectorOutgoingConfiguration(config);
        ClientConfigurationData clientConf = this.configResolver.getClientConf(pulsarConnectorOutgoingConfiguration);
        PulsarClient computeIfAbsent = this.clients.computeIfAbsent(clientHash(clientConf), str -> {
            return createPulsarClient(clientConf);
        });
        this.clientsByChannel.put(pulsarConnectorOutgoingConfiguration.getChannel(), computeIfAbsent);
        try {
            PulsarOutgoingChannel<?> pulsarOutgoingChannel = new PulsarOutgoingChannel<>(computeIfAbsent, this.schemaResolver.getSchema(pulsarConnectorOutgoingConfiguration), pulsarConnectorOutgoingConfiguration, this.configResolver, this.openTelemetryInstance);
            this.outgoingChannels.add(pulsarOutgoingChannel);
            return pulsarOutgoingChannel.getSubscriber();
        } catch (PulsarClientException e) {
            throw PulsarExceptions.ex.illegalStateUnableToBuildProducer(e);
        }
    }

    private String clientHash(ClientConfigurationData clientConfigurationData) {
        return HashUtil.sha256(clientConfigurationData.toString());
    }

    public void terminate(@Priority(50) @Observes(notifyObserver = Reception.IF_EXISTS) @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        this.incomingChannels.forEach((v0) -> {
            v0.close();
        });
        this.outgoingChannels.forEach((v0) -> {
            v0.close();
        });
        Iterator<PulsarClient> it = this.clients.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (PulsarClientException e) {
                PulsarLogging.log.unableToCloseClient(e);
            }
        }
        this.incomingChannels.clear();
        this.outgoingChannels.clear();
        this.clients.clear();
        this.clientsByChannel.clear();
    }

    private PulsarClientImpl createPulsarClient(ClientConfigurationData clientConfigurationData) {
        try {
            setAuth(clientConfigurationData);
            PulsarLogging.log.createdClientWithConfig(clientConfigurationData);
            return new PulsarClientImpl(clientConfigurationData, this.vertx.nettyEventLoopGroup());
        } catch (PulsarClientException e) {
            throw PulsarExceptions.ex.illegalStateUnableToBuildClient(e);
        }
    }

    private void setAuth(ClientConfigurationData clientConfigurationData) throws PulsarClientException {
        if (Validation.isBlank(clientConfigurationData.getAuthPluginClassName())) {
            return;
        }
        if (Validation.isBlank(clientConfigurationData.getAuthParams()) && clientConfigurationData.getAuthParamMap() == null) {
            return;
        }
        if (!Validation.isBlank(clientConfigurationData.getAuthParams())) {
            clientConfigurationData.setAuthentication(AuthenticationFactory.create(clientConfigurationData.getAuthPluginClassName(), clientConfigurationData.getAuthParams()));
        } else if (clientConfigurationData.getAuthParamMap() != null) {
            clientConfigurationData.setAuthentication(AuthenticationFactory.create(clientConfigurationData.getAuthPluginClassName(), clientConfigurationData.getAuthParamMap()));
        }
    }

    public PulsarClient getClient(String str) {
        return this.clientsByChannel.get(str);
    }

    public <T> Consumer<T> getConsumer(String str) {
        return (Consumer) this.incomingChannels.stream().filter(pulsarIncomingChannel -> {
            return pulsarIncomingChannel.getChannel().equals(str);
        }).map(pulsarIncomingChannel2 -> {
            return pulsarIncomingChannel2.getConsumer();
        }).findFirst().orElse(null);
    }

    public <T> Producer<T> getProducer(String str) {
        return (Producer) this.outgoingChannels.stream().filter(pulsarOutgoingChannel -> {
            return pulsarOutgoingChannel.getChannel().equals(str);
        }).map(pulsarOutgoingChannel2 -> {
            return pulsarOutgoingChannel2.getProducer();
        }).findFirst().orElse(null);
    }

    public Set<String> getConsumerChannels() {
        return (Set) this.incomingChannels.stream().map((v0) -> {
            return v0.getChannel();
        }).collect(Collectors.toSet());
    }

    public Set<String> getProducerChannels() {
        return (Set) this.outgoingChannels.stream().map((v0) -> {
            return v0.getChannel();
        }).collect(Collectors.toSet());
    }

    public HealthReport getStartup() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        Iterator<PulsarIncomingChannel<?>> it = this.incomingChannels.iterator();
        while (it.hasNext()) {
            it.next().isStarted(builder);
        }
        Iterator<PulsarOutgoingChannel<?>> it2 = this.outgoingChannels.iterator();
        while (it2.hasNext()) {
            it2.next().isStarted(builder);
        }
        return builder.build();
    }

    public HealthReport getReadiness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        Iterator<PulsarIncomingChannel<?>> it = this.incomingChannels.iterator();
        while (it.hasNext()) {
            it.next().isReady(builder);
        }
        Iterator<PulsarOutgoingChannel<?>> it2 = this.outgoingChannels.iterator();
        while (it2.hasNext()) {
            it2.next().isReady(builder);
        }
        return builder.build();
    }

    public HealthReport getLiveness() {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        Iterator<PulsarIncomingChannel<?>> it = this.incomingChannels.iterator();
        while (it.hasNext()) {
            it.next().isAlive(builder);
        }
        Iterator<PulsarOutgoingChannel<?>> it2 = this.outgoingChannels.iterator();
        while (it2.hasNext()) {
            it2.next().isAlive(builder);
        }
        return builder.build();
    }
}
