package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.prometheus.client.Summary;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.utils.FunctionRecord;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.2.2.jar:org/apache/pulsar/functions/instance/ContextImpl.class */
class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
    private InstanceConfig config;
    private Logger logger;
    private Record<?> record;
    private final ClientBuilder clientBuilder;
    private final PulsarClient client;
    private final PulsarAdmin pulsarAdmin;
    private Map<String, Producer<?>> publishProducers;
    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
    private ProducerBuilderImpl<?> producerBuilder;
    private final TopicSchema topicSchema;
    private final SecretsProvider secretsProvider;
    private final Map<String, Object> secretsMap;

    @VisibleForTesting
    StateManager stateManager;

    @VisibleForTesting
    DefaultStateStore defaultStateStore;
    private Map<String, Object> userConfigs;
    private ComponentStatsManager statsManager;
    private final String[] metricsLabels;
    private final Summary userMetricsSummary;
    private final SubscriptionType subscriptionType;
    private static final String[] userMetricsLabelNames = (String[]) Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
    private boolean exposePulsarAdminClientEnabled;
    private List<Consumer<?>> inputConsumers;
    private final Function.FunctionDetails.ComponentType componentType;
    Map<String, String[]> userMetricsLabels = new HashMap();
    private final Map<TopicName, Consumer> topicConsumers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.2.2.jar:org/apache/pulsar/functions/instance/ContextImpl$MessageBuilderImpl.class */
    public class MessageBuilderImpl<O> implements TypedMessageBuilder<O> {
        private TypedMessageBuilder<O> underlyingBuilder;

        MessageBuilderImpl() {
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public MessageId send() throws PulsarClientException {
            try {
                return sendAsync().get();
            } catch (Exception e) {
                throw PulsarClientException.unwrap(e);
            }
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public CompletableFuture<MessageId> sendAsync() {
            return this.underlyingBuilder.sendAsync().whenComplete((messageId, th) -> {
                if (null != th) {
                    ContextImpl.this.statsManager.incrSysExceptions(th);
                    ContextImpl.this.logger.error("Failed to publish to topic with error", th);
                }
            });
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> key(String str) {
            this.underlyingBuilder.key(str);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> keyBytes(byte[] bArr) {
            this.underlyingBuilder.keyBytes(bArr);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> orderingKey(byte[] bArr) {
            this.underlyingBuilder.orderingKey(bArr);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> value(O o) {
            this.underlyingBuilder.value(o);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> property(String str, String str2) {
            this.underlyingBuilder.property(str, str2);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> properties(Map<String, String> map) {
            this.underlyingBuilder.properties(map);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> eventTime(long j) {
            this.underlyingBuilder.eventTime(j);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> sequenceId(long j) {
            this.underlyingBuilder.sequenceId(j);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> replicationClusters(List<String> list) {
            this.underlyingBuilder.replicationClusters(list);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> disableReplication() {
            this.underlyingBuilder.disableReplication();
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> loadConf(Map<String, Object> map) {
            this.underlyingBuilder.loadConf(map);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> deliverAfter(long j, TimeUnit timeUnit) {
            this.underlyingBuilder.deliverAfter(j, timeUnit);
            return this;
        }

        @Override // org.apache.pulsar.client.api.TypedMessageBuilder
        public TypedMessageBuilder<O> deliverAt(long j) {
            this.underlyingBuilder.deliverAt(j);
            return this;
        }

        public void setUnderlyingBuilder(TypedMessageBuilder<O> typedMessageBuilder) {
            this.underlyingBuilder = typedMessageBuilder;
        }
    }

    public ContextImpl(InstanceConfig instanceConfig, Logger logger, PulsarClient pulsarClient, SecretsProvider secretsProvider, FunctionCollectorRegistry functionCollectorRegistry, String[] strArr, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager componentStatsManager, StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder) throws PulsarClientException {
        String str;
        this.config = instanceConfig;
        this.logger = logger;
        this.clientBuilder = clientBuilder;
        this.client = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.statsManager = componentStatsManager;
        this.producerBuilder = (ProducerBuilderImpl) pulsarClient.newProducer().blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS);
        boolean z = false;
        Function.ProducerSpec producerSpec = instanceConfig.getFunctionDetails().getSink().getProducerSpec();
        if (producerSpec != null) {
            if (producerSpec.getMaxPendingMessages() != 0) {
                this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
            }
            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
                this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
            }
            if (producerSpec.getBatchBuilder() != null) {
                if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
                    this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
                } else {
                    this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
                }
            }
            z = producerSpec.getUseThreadLocalProducers();
        }
        if (z) {
            this.tlPublishProducers = new ThreadLocal<>();
        } else {
            this.publishProducers = new ConcurrentHashMap();
        }
        if (instanceConfig.getFunctionDetails().getUserConfig().isEmpty()) {
            this.userConfigs = new HashMap();
        } else {
            this.userConfigs = (Map) new Gson().fromJson(instanceConfig.getFunctionDetails().getUserConfig(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.instance.ContextImpl.1
            }.getType());
        }
        this.secretsProvider = secretsProvider;
        if (StringUtils.isEmpty(instanceConfig.getFunctionDetails().getSecretsMap())) {
            this.secretsMap = new HashMap();
        } else {
            this.secretsMap = (Map) new Gson().fromJson(instanceConfig.getFunctionDetails().getSecretsMap(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.instance.ContextImpl.2
            }.getType());
        }
        this.metricsLabels = strArr;
        switch (componentType) {
            case FUNCTION:
                str = FunctionStatsManager.PULSAR_FUNCTION_METRICS_PREFIX;
                break;
            case SINK:
                str = SinkStatsManager.PULSAR_SINK_METRICS_PREFIX;
                break;
            case SOURCE:
                str = SourceStatsManager.PULSAR_SOURCE_METRICS_PREFIX;
                break;
            default:
                throw new RuntimeException("Unknown component type: " + componentType);
        }
        this.userMetricsSummary = (Summary) functionCollectorRegistry.registerIfNotExist(str + ComponentStatsManager.USER_METRIC_PREFIX, Summary.build().name(str + ComponentStatsManager.USER_METRIC_PREFIX).help("User defined metric.").labelNames(userMetricsLabelNames).quantile(0.5d, 0.01d).quantile(0.9d, 0.01d).quantile(0.99d, 0.01d).quantile(0.999d, 0.01d).create());
        this.componentType = componentType;
        this.stateManager = stateManager;
        this.defaultStateStore = (DefaultStateStore) stateManager.getStore(instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace(), instanceConfig.getFunctionDetails().getName());
        this.exposePulsarAdminClientEnabled = instanceConfig.isExposePulsarAdminClientEnabled();
        switch (instanceConfig.getFunctionDetails().getSource().getSubscriptionType()) {
            case FAILOVER:
                this.subscriptionType = SubscriptionType.Failover;
                return;
            case KEY_SHARED:
                this.subscriptionType = SubscriptionType.Key_Shared;
                return;
            default:
                this.subscriptionType = SubscriptionType.Shared;
                return;
        }
    }

    public void setCurrentMessageContext(Record<?> record) {
        this.record = record;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Record<?> getCurrentRecord() {
        return this.record;
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SinkContext
    public Collection<String> getInputTopics() {
        return this.config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SourceContext
    public String getOutputTopic() {
        return this.config.getFunctionDetails().getSink().getTopic();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getOutputSchemaType() {
        Function.SinkSpec sink = this.config.getFunctionDetails().getSink();
        return !StringUtils.isEmpty(sink.getSchemaType()) ? sink.getSchemaType() : sink.getSerDeClassName();
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public String getTenant() {
        return this.config.getFunctionDetails().getTenant();
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public String getNamespace() {
        return this.config.getFunctionDetails().getNamespace();
    }

    @Override // org.apache.pulsar.io.core.SinkContext
    public String getSinkName() {
        return this.config.getFunctionDetails().getName();
    }

    @Override // org.apache.pulsar.io.core.SourceContext
    public String getSourceName() {
        return this.config.getFunctionDetails().getName();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionName() {
        return this.config.getFunctionDetails().getName();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionId() {
        return this.config.getFunctionId();
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public int getInstanceId() {
        return this.config.getInstanceId();
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public int getNumInstances() {
        return this.config.getFunctionDetails().getParallelism();
    }

    @Override // org.apache.pulsar.functions.api.Context
    public String getFunctionVersion() {
        return this.config.getFunctionVersion();
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public Logger getLogger() {
        return this.logger;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Optional<Object> getUserConfigValue(String str) {
        Object orDefault = this.userConfigs.getOrDefault(str, null);
        if (!(orDefault instanceof String) || !((String) orDefault).startsWith("$")) {
            return Optional.ofNullable(orDefault);
        }
        try {
            return Optional.ofNullable(System.getenv(((String) orDefault).substring(1)));
        } catch (SecurityException e) {
            throw new RuntimeException("Access to environment variable " + orDefault + " is not allowed.", e);
        }
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Object getUserConfigValueOrDefault(String str, Object obj) {
        return getUserConfigValue(str).orElse(obj);
    }

    @Override // org.apache.pulsar.functions.api.Context
    public Map<String, Object> getUserConfigMap() {
        return this.userConfigs;
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public String getSecret(String str) {
        if (this.secretsMap.containsKey(str)) {
            return this.secretsProvider.provideSecret(str, this.secretsMap.get(str));
        }
        return null;
    }

    @Override // org.apache.pulsar.functions.api.Context
    public PulsarAdmin getPulsarAdmin() {
        if (this.exposePulsarAdminClientEnabled) {
            return this.pulsarAdmin;
        }
        throw new IllegalStateException("PulsarAdmin is not enabled in function worker");
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public <S extends StateStore> S getStateStore(String str) {
        return (S) getStateStore(this.config.getFunctionDetails().getTenant(), this.config.getFunctionDetails().getNamespace(), str);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public <S extends StateStore> S getStateStore(String str, String str2, String str3) {
        return (S) this.stateManager.getStore(str, str2, str3);
    }

    private void ensureStateEnabled() {
        Preconditions.checkState(null != this.defaultStateStore, "State %s/%s/%s is not enabled.", this.config.getFunctionDetails().getTenant(), this.config.getFunctionDetails().getNamespace(), this.config.getFunctionDetails().getName());
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public CompletableFuture<Void> incrCounterAsync(String str, long j) {
        ensureStateEnabled();
        return this.defaultStateStore.incrCounterAsync(str, j);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public void incrCounter(String str, long j) {
        ensureStateEnabled();
        this.defaultStateStore.incrCounter(str, j);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public CompletableFuture<Long> getCounterAsync(String str) {
        ensureStateEnabled();
        return this.defaultStateStore.getCounterAsync(str);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public long getCounter(String str) {
        ensureStateEnabled();
        return this.defaultStateStore.getCounter(str);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public CompletableFuture<Void> putStateAsync(String str, ByteBuffer byteBuffer) {
        ensureStateEnabled();
        return this.defaultStateStore.putAsync(str, byteBuffer);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public void putState(String str, ByteBuffer byteBuffer) {
        ensureStateEnabled();
        this.defaultStateStore.put(str, byteBuffer);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public CompletableFuture<Void> deleteStateAsync(String str) {
        ensureStateEnabled();
        return this.defaultStateStore.deleteAsync(str);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public void deleteState(String str) {
        ensureStateEnabled();
        this.defaultStateStore.delete(str);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public CompletableFuture<ByteBuffer> getStateAsync(String str) {
        ensureStateEnabled();
        return this.defaultStateStore.getAsync(str);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public ByteBuffer getState(String str) {
        ensureStateEnabled();
        return this.defaultStateStore.get(str);
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <O> CompletableFuture<Void> publish(String str, O o) {
        return publish(str, (String) o, "");
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <O> CompletableFuture<Void> publish(String str, O o, String str2) {
        return publish(str, (String) o, (Schema<String>) this.topicSchema.getSchema(str, (Object) o, str2, false));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r6v0, types: [org.apache.pulsar.client.api.Schema<O>, org.apache.pulsar.client.api.Schema] */
    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SourceContext
    public <O> TypedMessageBuilder<O> newOutputMessage(String str, Schema<O> schema) throws PulsarClientException {
        MessageBuilderImpl messageBuilderImpl = new MessageBuilderImpl();
        Producer<O> producer = getProducer(str, schema);
        messageBuilderImpl.setUnderlyingBuilder(schema != 0 ? producer.newMessage((Schema) schema) : producer.newMessage());
        return messageBuilderImpl;
    }

    @Override // org.apache.pulsar.functions.api.Context, org.apache.pulsar.io.core.SourceContext
    public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
        return this.client.newConsumer(schema);
    }

    @Override // org.apache.pulsar.functions.api.Context
    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder(Schema<X> schema) {
        return FunctionRecord.from(this, schema);
    }

    @Override // org.apache.pulsar.io.core.SinkContext
    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }

    public <O> CompletableFuture<Void> publish(String str, O o, Schema<O> schema) {
        try {
            return newOutputMessage(str, schema).value(o).sendAsync().thenApply(messageId -> {
                return null;
            });
        } catch (PulsarClientException e) {
            this.logger.error("Failed to create Producer while doing user publish", (Throwable) e);
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public void recordMetric(String str, double d) {
        String[] strArr = this.userMetricsLabels.get(str);
        if (strArr != null) {
            this.userMetricsSummary.labels(strArr).observe(d);
            return;
        }
        String[] strArr2 = (String[]) Arrays.copyOf(this.metricsLabels, this.metricsLabels.length + 1);
        strArr2[strArr2.length - 1] = str;
        this.userMetricsSummary.labels(strArr2).observe(d);
        this.userMetricsLabels.put(str, strArr2);
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public PulsarClient getPulsarClient() {
        return this.client;
    }

    @Override // org.apache.pulsar.functions.api.BaseContext
    public ClientBuilder getPulsarClientBuilder() {
        return this.clientBuilder;
    }

    private <O> Producer<O> getProducer(String str, Schema<O> schema) throws PulsarClientException {
        Producer<?> producer;
        if (this.tlPublishProducers != null) {
            Map<String, Producer<?>> map = this.tlPublishProducers.get();
            if (map == null) {
                map = new HashMap();
                this.tlPublishProducers.set(map);
            }
            producer = map.get(str);
        } else {
            producer = this.publishProducers.get(str);
        }
        if (producer == null) {
            Producer<?> create = ((ProducerBuilderImpl) this.producerBuilder.m8537clone()).schema(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(str).properties(InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName(this.config.getFunctionDetails().getTenant(), this.config.getFunctionDetails().getNamespace(), this.config.getFunctionDetails().getName()), this.config.getInstanceId())).create();
            if (this.tlPublishProducers != null) {
                this.tlPublishProducers.get().put(str, create);
            } else {
                Producer<?> putIfAbsent = this.publishProducers.putIfAbsent(str, create);
                if (putIfAbsent != null) {
                    create.close();
                    producer = putIfAbsent;
                } else {
                    producer = create;
                }
            }
        }
        return (Producer<O>) producer;
    }

    public Map<String, Double> getAndResetMetrics() {
        Map<String, Double> metrics = getMetrics();
        resetMetrics();
        return metrics;
    }

    public void resetMetrics() {
        this.userMetricsSummary.clear();
    }

    public Map<String, Double> getMetrics() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String[]> entry : this.userMetricsLabels.entrySet()) {
            String key = entry.getKey();
            Summary.Child.Value value = this.userMetricsSummary.labels(entry.getValue()).get();
            hashMap.put(String.format("%s%s_sum", ComponentStatsManager.USER_METRIC_PREFIX, key), Double.valueOf(value.sum));
            hashMap.put(String.format("%s%s_count", ComponentStatsManager.USER_METRIC_PREFIX, key), Double.valueOf(value.count));
            for (Map.Entry<Double, Double> entry2 : value.quantiles.entrySet()) {
                hashMap.put(String.format("%s%s_%s", ComponentStatsManager.USER_METRIC_PREFIX, key, entry2.getKey()), entry2.getValue());
            }
        }
        return hashMap;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LinkedList linkedList = new LinkedList();
        if (this.publishProducers != null) {
            Iterator<Producer<?>> it = this.publishProducers.values().iterator();
            while (it.hasNext()) {
                linkedList.add(it.next().closeAsync());
            }
        }
        if (this.tlPublishProducers != null) {
            Iterator<Producer<?>> it2 = this.tlPublishProducers.get().values().iterator();
            while (it2.hasNext()) {
                linkedList.add(it2.next().closeAsync());
            }
        }
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) linkedList.toArray(new CompletableFuture[0])).get();
        } catch (InterruptedException | ExecutionException e) {
            this.logger.warn("Failed to close producers", e);
        }
    }

    @Override // org.apache.pulsar.io.core.SinkContext
    public void seek(String str, int i, MessageId messageId) throws PulsarClientException {
        getConsumer(str, i).seek(messageId);
    }

    @Override // org.apache.pulsar.io.core.SinkContext
    public void pause(String str, int i) throws PulsarClientException {
        getConsumer(str, i).pause();
    }

    @Override // org.apache.pulsar.io.core.SinkContext
    public void resume(String str, int i) throws PulsarClientException {
        getConsumer(str, i).resume();
    }

    public void setInputConsumers(List<Consumer<?>> list) {
        this.inputConsumers = list;
        list.stream().flatMap(consumer -> {
            return consumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) consumer).getConsumers().stream() : Stream.of(consumer);
        }).forEach(consumer2 -> {
            this.topicConsumers.putIfAbsent(TopicName.get(consumer2.getTopic()), consumer2);
        });
    }

    private void reloadConsumersFromMultiTopicsConsumers() {
        this.inputConsumers.stream().flatMap(consumer -> {
            return consumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) consumer).getConsumers().stream() : Stream.empty();
        }).forEach(consumerImpl -> {
            this.topicConsumers.putIfAbsent(TopicName.get(consumerImpl.getTopic()), consumerImpl);
        });
    }

    private Consumer<?> tryGetConsumer(String str, int i) {
        Consumer<?> consumer;
        return (i != 0 || (consumer = this.topicConsumers.get(TopicName.get(str))) == null) ? this.topicConsumers.get(TopicName.get(str).getPartition(i)) : consumer;
    }

    @VisibleForTesting
    Consumer<?> getConsumer(String str, int i) throws PulsarClientException {
        if (this.inputConsumers == null) {
            throw new PulsarClientException("Getting consumer is not supported");
        }
        Consumer<?> tryGetConsumer = tryGetConsumer(str, i);
        if (tryGetConsumer == null) {
            reloadConsumersFromMultiTopicsConsumers();
            tryGetConsumer = tryGetConsumer(str, i);
        }
        if (tryGetConsumer != null) {
            return tryGetConsumer;
        }
        throw new PulsarClientException("Consumer for topic " + str + " partition " + i + " is not found");
    }

    public String toString() {
        return "ContextImpl(config=" + this.config + ", logger=" + getLogger() + ", record=" + this.record + ", clientBuilder=" + this.clientBuilder + ", client=" + this.client + ", publishProducers=" + this.publishProducers + ", tlPublishProducers=" + this.tlPublishProducers + ", producerBuilder=" + this.producerBuilder + ", topicSchema=" + this.topicSchema + ", secretsProvider=" + this.secretsProvider + ", secretsMap=" + this.secretsMap + ", stateManager=" + this.stateManager + ", defaultStateStore=" + this.defaultStateStore + ", userConfigs=" + this.userConfigs + ", statsManager=" + this.statsManager + ", userMetricsLabels=" + this.userMetricsLabels + ", metricsLabels=" + Arrays.deepToString(this.metricsLabels) + ", userMetricsSummary=" + this.userMetricsSummary + ", subscriptionType=" + getSubscriptionType() + ", exposePulsarAdminClientEnabled=" + this.exposePulsarAdminClientEnabled + ", inputConsumers=" + this.inputConsumers + ", topicConsumers=" + this.topicConsumers + ", componentType=" + this.componentType + DefaultExpressionEngine.DEFAULT_INDEX_END;
    }

    static {
        userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
    }
}
