package org.apache.pulsar.functions.instance;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.prometheus.client.CollectorRegistry;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/instance/JavaInstanceRunnable.class */
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
    private static final Logger log;
    private final InstanceConfig instanceConfig;
    private final FunctionCacheManager fnCache;
    private final String jarFile;
    private final PulsarClientImpl client;
    private LogAppender logAppender;
    private final String stateStorageServiceUrl;
    private StateStoreProvider stateStoreProvider;
    private StateManager stateManager;
    private JavaInstance javaInstance;
    private Throwable deathException;
    private ComponentStatsManager stats;
    private Record<?> currentRecord;
    private Source source;
    private Sink sink;
    private final SecretsProvider secretsProvider;
    private CollectorRegistry collectorRegistry;
    private final String[] metricsLabels;
    private InstanceCache instanceCache;
    private final Function.FunctionDetails.ComponentType componentType;
    private final Map<String, String> properties;
    private final ClassLoader instanceClassLoader = Thread.currentThread().getContextClassLoader();
    private ClassLoader functionClassLoader;
    private String narExtractionDirectory;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.functions.instance.JavaInstanceRunnable$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/functions/instance/JavaInstanceRunnable$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionPosition;
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionType = new int[Function.SubscriptionType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionType[Function.SubscriptionType.FAILOVER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionType[Function.SubscriptionType.KEY_SHARED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionPosition = new int[Function.SubscriptionPosition.values().length];
            try {
                $SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionPosition[Function.SubscriptionPosition.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager functionCacheManager, String str, PulsarClient pulsarClient, String str2, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String str3) {
        this.instanceConfig = instanceConfig;
        this.fnCache = functionCacheManager;
        this.jarFile = str;
        this.client = (PulsarClientImpl) pulsarClient;
        this.stateStorageServiceUrl = str2;
        this.secretsProvider = secretsProvider;
        this.collectorRegistry = collectorRegistry;
        this.narExtractionDirectory = str3;
        this.metricsLabels = new String[]{instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace()), instanceConfig.getFunctionDetails().getName(), String.valueOf(instanceConfig.getInstanceId()), instanceConfig.getClusterName(), FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())};
        this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
        this.properties = InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId());
        this.collectorRegistry = collectorRegistry;
    }

    private synchronized void setup() throws Exception {
        this.instanceCache = InstanceCache.getInstanceCache();
        if (this.collectorRegistry == null) {
            this.collectorRegistry = new CollectorRegistry();
        }
        this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), this.componentType);
        ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
        ThreadContext.put("functionname", this.instanceConfig.getFunctionDetails().getName());
        ThreadContext.put("instance", this.instanceConfig.getInstanceName());
        log.info("Starting Java Instance {} : \n Details = {}", this.instanceConfig.getFunctionDetails().getName(), this.instanceConfig.getFunctionDetails());
        this.functionClassLoader = loadJars();
        Object createInstance = this.instanceConfig.getFunctionDetails().getClassName().equals(WindowFunctionExecutor.class.getName()) ? Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), this.instanceClassLoader) : Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), this.functionClassLoader);
        if (!(createInstance instanceof org.apache.pulsar.functions.api.Function) && !(createInstance instanceof java.util.function.Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        setupStateStore();
        ContextImpl contextImpl = setupContext();
        setupOutput(contextImpl);
        setupInput(contextImpl);
        setupLogHandler();
        this.javaInstance = new JavaInstance(contextImpl, createInstance, this.instanceConfig);
    }

    ContextImpl setupContext() {
        return new ContextImpl(this.instanceConfig, LoggerFactory.getILoggerFactory().getLogger("function-" + this.instanceConfig.getFunctionDetails().getName()), this.client, this.secretsProvider, this.collectorRegistry, this.metricsLabels, this.componentType, this.stats, this.stateManager);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                setup();
                while (true) {
                    this.currentRecord = readInput();
                    this.stats.incrTotalReceived();
                    if (this.instanceConfig.getFunctionDetails().getProcessingGuarantees() == Function.ProcessingGuarantees.ATMOST_ONCE && this.instanceConfig.getFunctionDetails().getAutoAck()) {
                        this.currentRecord.ack();
                    }
                    addLogTopicHandler();
                    CompletableFuture<JavaExecutionResult> completableFuture = null;
                    this.stats.setLastInvocation(System.currentTimeMillis());
                    this.stats.processTimeStart();
                    boolean z = true;
                    Thread.currentThread().setContextClassLoader(this.functionClassLoader);
                    try {
                        completableFuture = this.javaInstance.handleMessage(this.currentRecord, this.currentRecord.getValue());
                    } catch (Exception e) {
                        log.warn("Function currentRecord {} failed to handleMessage with exception {}", this.currentRecord, e);
                        z = false;
                    }
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                    this.stats.processTimeEnd();
                    removeLogTopicHandler();
                    if (!z || completableFuture == null) {
                        this.currentRecord.fail();
                    } else {
                        try {
                            processResult(this.currentRecord, completableFuture);
                        } catch (Exception e2) {
                            log.warn("Failed to process result of message {} exception {}", this.currentRecord, e2);
                            this.currentRecord.fail();
                        }
                    }
                }
            } catch (Throwable th) {
                log.info("Closing instance");
                close();
                throw th;
            }
        } catch (Throwable th2) {
            log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName(), this.instanceConfig.getInstanceId()), th2);
            this.deathException = th2;
            if (this.stats != null) {
                this.stats.incrSysExceptions(th2);
            }
            log.info("Closing instance");
            close();
        }
    }

    private ClassLoader loadJars() throws Exception {
        boolean z = false;
        if (FileUtils.mayBeANarArchive(new File(this.jarFile))) {
            try {
                log.info("Trying Loading file as NAR file: {}", this.jarFile);
                this.fnCache.registerFunctionInstanceWithArchive(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName(), this.jarFile, this.narExtractionDirectory);
                z = true;
            } catch (FileNotFoundException e) {
                log.error("The file {} does not look like a .nar file", this.jarFile, e.toString());
            }
        }
        if (!z) {
            log.info("Load file as simple JAR file: {}", this.jarFile);
            this.fnCache.registerFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName(), Arrays.asList(this.jarFile), Collections.emptyList());
        }
        log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}", this.instanceConfig.getFunctionDetails().getName(), this.fnCache.getClassLoader(this.instanceConfig.getFunctionId()));
        ClassLoader classLoader = this.fnCache.getClassLoader(this.instanceConfig.getFunctionId());
        if (null == classLoader) {
            throw new Exception("No function class loader available.");
        }
        return classLoader;
    }

    private void setupStateStore() throws Exception {
        this.stateManager = new InstanceStateManager();
        if (null == this.stateStorageServiceUrl) {
            this.stateStoreProvider = StateStoreProvider.NULL;
            return;
        }
        this.stateStoreProvider = new BKStateStoreProviderImpl();
        HashMap hashMap = new HashMap();
        hashMap.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL, this.stateStorageServiceUrl);
        this.stateStoreProvider.init(hashMap, this.instanceConfig.getFunctionDetails());
        StateStore stateStore = this.stateStoreProvider.getStateStore(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName());
        stateStore.init(new StateStoreContextImpl());
        this.stateManager.registerStore(stateStore);
    }

    private void processResult(Record record, CompletableFuture<JavaExecutionResult> completableFuture) throws Exception {
        completableFuture.whenComplete((javaExecutionResult, th) -> {
            if (th != null || javaExecutionResult.getUserException() != null) {
                Throwable userException = th != null ? th : javaExecutionResult.getUserException();
                log.warn("Encountered exception when processing message {}", record, userException);
                this.stats.incrUserExceptions(userException);
                record.fail();
                return;
            }
            if (javaExecutionResult.getResult() != null) {
                sendOutputMessage(record, javaExecutionResult.getResult());
            } else if (this.instanceConfig.getFunctionDetails().getAutoAck()) {
                record.ack();
            }
            this.stats.incrTotalProcessedSuccessfully();
        });
    }

    private void sendOutputMessage(Record record, Object obj) {
        if (!(this.sink instanceof PulsarSink)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            try {
                this.sink.write(new SinkRecord(record, obj));
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            } catch (Exception e) {
                log.info("Encountered exception in sink write: ", e);
                this.stats.incrSinkExceptions(e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            throw th;
        }
    }

    private Record readInput() throws Exception {
        if (!(this.source instanceof PulsarSource)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            try {
                Record read = this.source.read();
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                if (read == null) {
                    throw new IllegalArgumentException("The record returned by the source cannot be null");
                }
                if (read.getValue() == null) {
                    throw new IllegalArgumentException("The value in the record returned by the source cannot be null");
                }
                return read;
            } catch (Exception e) {
                this.stats.incrSourceExceptions(e);
                log.error("Encountered exception in source read", e);
                throw e;
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            throw th;
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (this.stats != null) {
            this.stats.close();
            this.stats = null;
        }
        if (this.source != null) {
            if (!(this.source instanceof PulsarSource)) {
                Thread.currentThread().setContextClassLoader(this.functionClassLoader);
            }
            try {
                try {
                    this.source.close();
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                } catch (Throwable th) {
                    log.error("Failed to close source {}", this.instanceConfig.getFunctionDetails().getSource().getClassName(), th);
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                }
                this.source = null;
            } finally {
            }
        }
        if (this.sink != null) {
            if (!(this.sink instanceof PulsarSink)) {
                Thread.currentThread().setContextClassLoader(this.functionClassLoader);
            }
            try {
                try {
                    this.sink.close();
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                } catch (Throwable th2) {
                    log.error("Failed to close sink {}", this.instanceConfig.getFunctionDetails().getSource().getClassName(), th2);
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                }
                this.sink = null;
            } finally {
            }
        }
        if (null != this.javaInstance) {
            this.javaInstance.close();
            this.javaInstance = null;
        }
        if (null != this.stateManager) {
            this.stateManager.close();
        }
        if (null != this.stateStoreProvider) {
            this.stateStoreProvider.close();
        }
        if (this.instanceCache != null) {
            this.fnCache.unregisterFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName());
            log.info("Unloading JAR files for function {}", this.instanceConfig);
            this.instanceCache = null;
        }
        if (this.logAppender != null) {
            removeLogTopicAppender(LoggerContext.getContext());
            removeLogTopicAppender(LoggerContext.getContext(false));
            this.logAppender.stop();
            this.logAppender = null;
        }
    }

    public synchronized String getStatsAsString() throws IOException {
        return this.stats != null ? this.stats.getStatsAsString() : "";
    }

    public synchronized InstanceCommunication.MetricsData getAndResetMetrics() {
        InstanceCommunication.MetricsData internalGetMetrics = internalGetMetrics();
        internalResetMetrics();
        return internalGetMetrics;
    }

    public synchronized InstanceCommunication.MetricsData getMetrics() {
        return internalGetMetrics();
    }

    public synchronized void resetMetrics() {
        internalResetMetrics();
    }

    private InstanceCommunication.MetricsData internalGetMetrics() {
        Map<String, Double> metrics;
        InstanceCommunication.MetricsData.Builder createMetricsDataBuilder = createMetricsDataBuilder();
        if (this.javaInstance != null && (metrics = this.javaInstance.getMetrics()) != null) {
            createMetricsDataBuilder.putAllUserMetrics(metrics);
        }
        return createMetricsDataBuilder.build();
    }

    private void internalResetMetrics() {
        if (this.stats != null) {
            this.stats.reset();
        }
        if (this.javaInstance != null) {
            this.javaInstance.resetMetrics();
        }
    }

    private InstanceCommunication.MetricsData.Builder createMetricsDataBuilder() {
        InstanceCommunication.MetricsData.Builder newBuilder = InstanceCommunication.MetricsData.newBuilder();
        if (this.stats != null) {
            newBuilder.setProcessedSuccessfullyTotal((long) this.stats.getTotalProcessedSuccessfully());
            newBuilder.setSystemExceptionsTotal((long) this.stats.getTotalSysExceptions());
            newBuilder.setUserExceptionsTotal((long) this.stats.getTotalUserExceptions());
            newBuilder.setReceivedTotal((long) this.stats.getTotalRecordsReceived());
            newBuilder.setAvgProcessLatency(this.stats.getAvgProcessLatency());
            newBuilder.setLastInvocation((long) this.stats.getLastInvocation());
            newBuilder.setProcessedSuccessfullyTotal1Min((long) this.stats.getTotalProcessedSuccessfully1min());
            newBuilder.setSystemExceptionsTotal1Min((long) this.stats.getTotalSysExceptions1min());
            newBuilder.setUserExceptionsTotal1Min((long) this.stats.getTotalUserExceptions1min());
            newBuilder.setReceivedTotal1Min((long) this.stats.getTotalRecordsReceived1min());
            newBuilder.setAvgProcessLatency1Min(this.stats.getAvgProcessLatency1min());
        }
        return newBuilder;
    }

    public synchronized InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
        InstanceCommunication.FunctionStatus.Builder newBuilder = InstanceCommunication.FunctionStatus.newBuilder();
        if (this.stats != null) {
            newBuilder.setNumReceived((long) this.stats.getTotalRecordsReceived());
            newBuilder.setNumSuccessfullyProcessed((long) this.stats.getTotalProcessedSuccessfully());
            newBuilder.setNumUserExceptions((long) this.stats.getTotalUserExceptions());
            this.stats.getLatestUserExceptions().forEach(exceptionInformation -> {
                newBuilder.addLatestUserExceptions(exceptionInformation);
            });
            newBuilder.setNumSystemExceptions((long) this.stats.getTotalSysExceptions());
            this.stats.getLatestSystemExceptions().forEach(exceptionInformation2 -> {
                newBuilder.addLatestSystemExceptions(exceptionInformation2);
            });
            this.stats.getLatestSourceExceptions().forEach(exceptionInformation3 -> {
                newBuilder.addLatestSourceExceptions(exceptionInformation3);
            });
            this.stats.getLatestSinkExceptions().forEach(exceptionInformation4 -> {
                newBuilder.addLatestSinkExceptions(exceptionInformation4);
            });
            newBuilder.setAverageLatency(this.stats.getAvgProcessLatency());
            newBuilder.setLastInvocationTime((long) this.stats.getLastInvocation());
        }
        return newBuilder;
    }

    private void setupLogHandler() {
        if (this.instanceConfig.getFunctionDetails().getLogTopic() == null || this.instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
            return;
        }
        new Crc32cIntChecksum();
        this.logAppender = new LogAppender(this.client, this.instanceConfig.getFunctionDetails().getLogTopic(), FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
        this.logAppender.start();
        setupLogTopicAppender(LoggerContext.getContext());
    }

    private void addLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        setupLogTopicAppender(LoggerContext.getContext(false));
    }

    private void setupLogTopicAppender(LoggerContext loggerContext) {
        Configuration configuration = loggerContext.getConfiguration();
        configuration.addAppender(this.logAppender);
        Iterator it = configuration.getLoggers().values().iterator();
        while (it.hasNext()) {
            ((LoggerConfig) it.next()).addAppender(this.logAppender, (Level) null, (Filter) null);
        }
        configuration.getRootLogger().addAppender(this.logAppender, (Level) null, (Filter) null);
        loggerContext.updateLoggers();
    }

    private void removeLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        removeLogTopicAppender(LoggerContext.getContext(false));
    }

    private void removeLogTopicAppender(LoggerContext loggerContext) {
        Configuration configuration = loggerContext.getConfiguration();
        Iterator it = configuration.getLoggers().values().iterator();
        while (it.hasNext()) {
            ((LoggerConfig) it.next()).removeAppender(this.logAppender.getName());
        }
        configuration.getRootLogger().removeAppender(this.logAppender.getName());
        loggerContext.updateLoggers();
    }

    /* JADX WARN: Type inference failed for: r3v0, types: [org.apache.pulsar.functions.instance.JavaInstanceRunnable$1] */
    private void setupInput(ContextImpl contextImpl) throws Exception {
        Object createInstance;
        Function.SourceSpec source = this.instanceConfig.getFunctionDetails().getSource();
        if (source.getClassName().isEmpty()) {
            PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
            source.getInputSpecsMap().forEach((str, consumerSpec) -> {
                ConsumerConfig build = ConsumerConfig.builder().isRegexPattern(consumerSpec.getIsRegexPattern()).build();
                if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
                    build.setSchemaType(consumerSpec.getSchemaType());
                } else if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
                    build.setSerdeClassName(consumerSpec.getSerdeClassName());
                }
                build.setSchemaProperties(consumerSpec.getSchemaPropertiesMap());
                build.setConsumerProperties(consumerSpec.getConsumerPropertiesMap());
                if (consumerSpec.hasReceiverQueueSize()) {
                    build.setReceiverQueueSize(Integer.valueOf(consumerSpec.getReceiverQueueSize().getValue()));
                }
                if (consumerSpec.hasCryptoSpec()) {
                    build.setCryptoConfig(CryptoUtils.convertFromSpec(consumerSpec.getCryptoSpec()));
                }
                pulsarSourceConfig.getTopicSchema().put(str, build);
            });
            source.getTopicsToSerDeClassNameMap().forEach((str2, str3) -> {
                pulsarSourceConfig.getTopicSchema().put(str2, ConsumerConfig.builder().serdeClassName(str3).isRegexPattern(false).build());
            });
            if (!StringUtils.isEmpty(source.getTopicsPattern())) {
                pulsarSourceConfig.getTopicSchema().get(source.getTopicsPattern()).setRegexPattern(true);
            }
            pulsarSourceConfig.setSubscriptionName(StringUtils.isNotBlank(source.getSubscriptionName()) ? source.getSubscriptionName() : InstanceUtils.getDefaultSubscriptionName(this.instanceConfig.getFunctionDetails()));
            pulsarSourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
            switch (AnonymousClass3.$SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionPosition[source.getSubscriptionPosition().ordinal()]) {
                case 1:
                    pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
                    break;
                default:
                    pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
                    break;
            }
            switch (AnonymousClass3.$SwitchMap$org$apache$pulsar$functions$proto$Function$SubscriptionType[source.getSubscriptionType().ordinal()]) {
                case 1:
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
                    break;
                case 2:
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared);
                    break;
                default:
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
                    break;
            }
            pulsarSourceConfig.setTypeClassName(source.getTypeClassName());
            if (source.getTimeoutMs() > 0) {
                pulsarSourceConfig.setTimeoutMs(Long.valueOf(source.getTimeoutMs()));
            }
            if (source.getNegativeAckRedeliveryDelayMs() > 0) {
                pulsarSourceConfig.setNegativeAckRedeliveryDelayMs(Long.valueOf(source.getNegativeAckRedeliveryDelayMs()));
            }
            if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
                pulsarSourceConfig.setMaxMessageRetries(Integer.valueOf(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()));
                pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
            }
            createInstance = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
        } else {
            createInstance = source.getClassName().equals(BatchSourceExecutor.class.getName()) ? Reflections.createInstance(source.getClassName(), this.instanceClassLoader) : Reflections.createInstance(source.getClassName(), this.functionClassLoader);
        }
        if (!(createInstance instanceof Source)) {
            throw new RuntimeException("Source does not implement correct interface");
        }
        Class[] resolveRawArguments = TypeResolver.resolveRawArguments(Source.class, createInstance.getClass());
        if (!$assertionsDisabled && resolveRawArguments.length <= 0) {
            throw new AssertionError();
        }
        this.source = (Source) createInstance;
        if (!(this.source instanceof PulsarSource)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            try {
                if (source.getConfigs().isEmpty()) {
                    this.source.open(new HashMap(), contextImpl);
                } else {
                    this.source.open((Map) new Gson().fromJson(source.getConfigs(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.instance.JavaInstanceRunnable.1
                    }.getType()), contextImpl);
                }
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            } catch (Exception e) {
                log.error("Source open produced uncaught exception: ", e);
                throw e;
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r3v0, types: [org.apache.pulsar.functions.instance.JavaInstanceRunnable$2] */
    private void setupOutput(ContextImpl contextImpl) throws Exception {
        Object createInstance;
        Function.SinkSpec sink = this.instanceConfig.getFunctionDetails().getSink();
        if (!sink.getClassName().isEmpty()) {
            createInstance = Reflections.createInstance(sink.getClassName(), this.functionClassLoader);
        } else if (StringUtils.isEmpty(sink.getTopic())) {
            createInstance = PulsarSinkDisable.INSTANCE;
        } else {
            PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
            pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
            pulsarSinkConfig.setTopic(sink.getTopic());
            pulsarSinkConfig.setForwardSourceMessageProperty(this.instanceConfig.getFunctionDetails().getSink().getForwardSourceMessageProperty());
            if (!StringUtils.isEmpty(sink.getSchemaType())) {
                pulsarSinkConfig.setSchemaType(sink.getSchemaType());
            } else if (!StringUtils.isEmpty(sink.getSerDeClassName())) {
                pulsarSinkConfig.setSerdeClassName(sink.getSerDeClassName());
            }
            pulsarSinkConfig.setTypeClassName(sink.getTypeClassName());
            pulsarSinkConfig.setSchemaProperties(sink.getSchemaPropertiesMap());
            if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
                Function.ProducerSpec producerSpec = this.instanceConfig.getFunctionDetails().getSink().getProducerSpec();
                pulsarSinkConfig.setProducerConfig(ProducerConfig.builder().maxPendingMessages(Integer.valueOf(producerSpec.getMaxPendingMessages())).maxPendingMessagesAcrossPartitions(Integer.valueOf(producerSpec.getMaxPendingMessagesAcrossPartitions())).batchBuilder(producerSpec.getBatchBuilder()).useThreadLocalProducers(Boolean.valueOf(producerSpec.getUseThreadLocalProducers())).cryptoConfig(CryptoUtils.convertFromSpec(producerSpec.getCryptoSpec())).build());
            }
            createInstance = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
        }
        if (!(createInstance instanceof Sink)) {
            throw new RuntimeException("Sink does not implement correct interface");
        }
        this.sink = (Sink) createInstance;
        if (!(this.sink instanceof PulsarSink)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            try {
                if (sink.getConfigs().isEmpty()) {
                    this.sink.open(new HashMap(), contextImpl);
                } else {
                    this.sink.open((Map) new Gson().fromJson(sink.getConfigs(), new TypeToken<Map<String, Object>>() { // from class: org.apache.pulsar.functions.instance.JavaInstanceRunnable.2
                    }.getType()), contextImpl);
                }
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            } catch (Exception e) {
                log.error("Sink open produced uncaught exception: ", e);
                throw e;
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            throw th;
        }
    }

    public Throwable getDeathException() {
        return this.deathException;
    }

    static {
        $assertionsDisabled = !JavaInstanceRunnable.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(JavaInstanceRunnable.class);
    }
}
