package io.smallrye.reactive.messaging.pulsar;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.groups.MultiRepetition;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.class */
public class PulsarIncomingChannel<T> {
    private final Consumer<T> consumer;
    private final Flow.Publisher<? extends Message<?>> publisher;
    private final String channel;
    private final PulsarAckHandler ackHandler;
    private final PulsarFailureHandler failureHandler;
    private final ContextInternal context;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final List<Throwable> failures = new ArrayList();
    private final boolean healthEnabled;
    private final boolean tracingEnabled;
    private final PulsarOpenTelemetryInstrumenter instrumenter;

    public PulsarIncomingChannel(PulsarClient pulsarClient, Vertx vertx, Schema<T> schema, PulsarAckHandler.Factory factory, PulsarFailureHandler.Factory factory2, PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration, ConfigResolver configResolver, Instance<OpenTelemetry> instance) throws PulsarClientException {
        this.channel = pulsarConnectorIncomingConfiguration.getChannel();
        this.healthEnabled = pulsarConnectorIncomingConfiguration.getHealthEnabled().booleanValue();
        this.tracingEnabled = pulsarConnectorIncomingConfiguration.getTracingEnabled().booleanValue();
        ConsumerBuilder newConsumer = pulsarClient.newConsumer(schema);
        ConsumerConfigurationData<?> consumerConf = configResolver.getConsumerConf(pulsarConnectorIncomingConfiguration);
        if (consumerConf.getSubscriptionName() == null) {
            String uuid = UUID.randomUUID().toString();
            PulsarLogging.log.noSubscriptionName(uuid);
            consumerConf.setSubscriptionName(uuid);
        }
        if (!hasTopicConfig(consumerConf)) {
            consumerConf.setTopicNames((Set) Arrays.stream(pulsarConnectorIncomingConfiguration.getTopic().orElse(this.channel).split(",")).collect(Collectors.toSet()));
        }
        if (consumerConf.getConsumerName() == null) {
            consumerConf.setConsumerName(this.channel);
        }
        newConsumer.loadConf(configResolver.configToMap(consumerConf));
        pulsarConnectorIncomingConfiguration.getDeadLetterPolicyMaxRedeliverCount().ifPresent(num -> {
            newConsumer.deadLetterPolicy(getDeadLetterPolicy(pulsarConnectorIncomingConfiguration, num));
        });
        pulsarConnectorIncomingConfiguration.getNegativeAckRedeliveryBackoff().ifPresent(str -> {
            newConsumer.negativeAckRedeliveryBackoff(parseBackoff(str));
        });
        pulsarConnectorIncomingConfiguration.getAckTimeoutRedeliveryBackoff().ifPresent(str2 -> {
            newConsumer.ackTimeoutRedeliveryBackoff(parseBackoff(str2));
        });
        if (consumerConf.getConsumerEventListener() != null) {
            newConsumer.consumerEventListener(consumerConf.getConsumerEventListener());
        }
        if (consumerConf.getPayloadProcessor() != null) {
            newConsumer.messagePayloadProcessor(consumerConf.getPayloadProcessor());
        }
        if (consumerConf.getKeySharedPolicy() != null) {
            newConsumer.keySharedPolicy(consumerConf.getKeySharedPolicy());
        } else if (consumerConf.getSubscriptionType() == SubscriptionType.Key_Shared) {
            newConsumer.keySharedPolicy(KeySharedPolicy.autoSplitHashRange());
        }
        if (consumerConf.getCryptoKeyReader() != null) {
            newConsumer.cryptoKeyReader(consumerConf.getCryptoKeyReader());
        }
        if (consumerConf.getMessageCrypto() != null) {
            newConsumer.messageCrypto(consumerConf.getMessageCrypto());
        }
        if (pulsarConnectorIncomingConfiguration.getBatchReceive().booleanValue() && consumerConf.getBatchReceivePolicy() == null) {
            newConsumer.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY);
        }
        this.consumer = newConsumer.subscribe();
        PulsarLogging.log.createdConsumerWithConfig(this.channel, SchemaResolver.getSchemaName(schema), consumerConf);
        this.ackHandler = factory.create(this.consumer, pulsarConnectorIncomingConfiguration);
        this.failureHandler = factory2.create(this.consumer, pulsarConnectorIncomingConfiguration, (v1, v2) -> {
            reportFailure(v1, v2);
        });
        this.context = vertx.getDelegate().createEventLoopContext();
        if (pulsarConnectorIncomingConfiguration.getBatchReceive().booleanValue()) {
            MultiRepetition repeating = Multi.createBy().repeating();
            Consumer<T> consumer = this.consumer;
            Objects.requireNonNull(consumer);
            Multi invoke = repeating.completionStage(consumer::batchReceiveAsync).until(messages -> {
                return this.closed.get();
            }).filter(messages2 -> {
                return messages2.size() > 0;
            }).plug(multi -> {
                return schemaRequiresBlockingFetch(schema) ? multi.onItem().invoke(messages3 -> {
                    messages3.forEach((v0) -> {
                        v0.getValue();
                    });
                }) : multi;
            }).emitOn(runnable -> {
                this.context.runOnContext(r3 -> {
                    runnable.run();
                });
            }).onItem().transform(messages3 -> {
                return new PulsarIncomingBatchMessage(messages3, this.ackHandler, this.failureHandler);
            }).onFailure(th -> {
                return isEndOfStream(pulsarClient, th);
            }).recoverWithCompletion().onFailure().invoke(th2 -> {
                PulsarLogging.log.failedToReceiveFromConsumer(this.channel, th2);
                reportFailure(th2, false);
            });
            this.publisher = this.tracingEnabled ? invoke.onItem().invoke(this::incomingBatchTrace) : invoke;
        } else {
            MultiRepetition repeating2 = Multi.createBy().repeating();
            Consumer<T> consumer2 = this.consumer;
            Objects.requireNonNull(consumer2);
            Multi invoke2 = repeating2.completionStage(consumer2::receiveAsync).until(message -> {
                return this.closed.get();
            }).plug(multi2 -> {
                return schemaRequiresBlockingFetch(schema) ? multi2.onItem().invoke((v0) -> {
                    v0.getValue();
                }) : multi2;
            }).emitOn(runnable2 -> {
                this.context.runOnContext(r3 -> {
                    runnable2.run();
                });
            }).onItem().transform(message2 -> {
                return new PulsarIncomingMessage(message2, this.ackHandler, this.failureHandler);
            }).onFailure(th3 -> {
                return isEndOfStream(pulsarClient, th3);
            }).recoverWithCompletion().onFailure().invoke(th4 -> {
                PulsarLogging.log.failedToReceiveFromConsumer(this.channel, th4);
                reportFailure(th4, false);
            });
            this.publisher = this.tracingEnabled ? invoke2.onItem().invoke((v1) -> {
                incomingTrace(v1);
            }) : invoke2;
        }
        if (this.tracingEnabled) {
            this.instrumenter = PulsarOpenTelemetryInstrumenter.createForSource(instance);
        } else {
            this.instrumenter = null;
        }
    }

    private static <T> boolean schemaRequiresBlockingFetch(Schema<T> schema) {
        return schema.requireFetchingSchemaInfo() || (schema instanceof AvroSchema) || (schema instanceof GenericAvroSchema) || (schema instanceof GenericJsonSchema) || (schema instanceof GenericProtobufNativeSchema);
    }

    public void incomingTrace(PulsarMessage<T> pulsarMessage) {
        this.instrumenter.traceIncoming(pulsarMessage, new PulsarTrace.Builder().withConsumerName(this.consumer.getConsumerName()).withMessage(((PulsarIncomingMessageMetadata) pulsarMessage.getMetadata(PulsarIncomingMessageMetadata.class).get()).getMessage()).build());
    }

    public void incomingBatchTrace(PulsarIncomingBatchMessage<T> pulsarIncomingBatchMessage) {
        Iterator<PulsarMessage<T>> it = pulsarIncomingBatchMessage.getMessages().iterator();
        while (it.hasNext()) {
            incomingTrace(it.next());
        }
    }

    private boolean isEndOfStream(PulsarClient pulsarClient, Throwable th) {
        if (this.closed.get()) {
            return true;
        }
        if (this.consumer.hasReachedEndOfTopic()) {
            PulsarLogging.log.consumerReachedEndOfTopic(this.channel);
            return true;
        }
        if (!pulsarClient.isClosed()) {
            return false;
        }
        PulsarLogging.log.clientClosed(this.channel, th);
        return true;
    }

    private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration, Integer num) {
        return DeadLetterPolicy.builder().maxRedeliverCount(num.intValue()).deadLetterTopic(pulsarConnectorIncomingConfiguration.getDeadLetterPolicyDeadLetterTopic().orElse(null)).retryLetterTopic(pulsarConnectorIncomingConfiguration.getDeadLetterPolicyRetryLetterTopic().orElse(null)).initialSubscriptionName(pulsarConnectorIncomingConfiguration.getDeadLetterPolicyInitialSubscriptionName().orElse(null)).build();
    }

    private RedeliveryBackoff parseBackoff(String str) {
        String[] split = str.split(",");
        try {
            return MultiplierRedeliveryBackoff.builder().minDelayMs(Long.parseLong(split[0])).maxDelayMs(Long.parseLong(split[1])).multiplier(Double.parseDouble(split[2])).build();
        } catch (Exception e) {
            PulsarLogging.log.unableToParseRedeliveryBackoff(str, this.channel);
            return null;
        }
    }

    static boolean hasTopicConfig(ConsumerConfigurationData<?> consumerConfigurationData) {
        return (consumerConfigurationData.getTopicsPattern() == null && (consumerConfigurationData.getTopicNames() == null || consumerConfigurationData.getTopicNames().isEmpty())) ? false : true;
    }

    public Flow.Publisher<? extends Message<?>> getPublisher() {
        return this.publisher;
    }

    public String getChannel() {
        return this.channel;
    }

    public Consumer<T> getConsumer() {
        return this.consumer;
    }

    public void close() {
        this.closed.set(true);
        try {
            this.consumer.close();
        } catch (PulsarClientException e) {
            PulsarLogging.log.unableToCloseConsumer(e);
        }
    }

    public synchronized void reportFailure(Throwable th, boolean z) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
        if (z) {
            close();
        }
    }

    public void isStarted(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.consumer.isConnected());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        isStarted(healthReportBuilder);
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.healthEnabled) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }
}
