package org.apache.pulsar.functions.instance;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.7.2.1.1.21.jar:org/apache/pulsar/functions/instance/JavaInstance.class */
public class JavaInstance implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) JavaInstance.class);
    private final ContextImpl context;
    private Function function;
    private java.util.function.Function javaUtilFunction;
    private final InstanceConfig instanceConfig;
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final LinkedBlockingQueue<CompletableFuture<Void>> pendingAsyncRequests;

    public JavaInstance(ContextImpl contextImpl, Object obj, InstanceConfig instanceConfig) {
        this.context = contextImpl;
        this.instanceConfig = instanceConfig;
        this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
        if (obj instanceof Function) {
            this.function = (Function) obj;
        } else {
            this.javaUtilFunction = (java.util.function.Function) obj;
        }
    }

    public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object obj) {
        if (this.context != null) {
            this.context.setCurrentMessageContext(record);
        }
        CompletableFuture<JavaExecutionResult> completableFuture = new CompletableFuture<>();
        JavaExecutionResult javaExecutionResult = new JavaExecutionResult();
        try {
            Object process = this.function != null ? this.function.process(obj, this.context) : this.javaUtilFunction.apply(obj);
            if (process instanceof CompletableFuture) {
                try {
                    this.pendingAsyncRequests.put((CompletableFuture) process);
                    Object obj2 = process;
                    ((CompletableFuture) process).whenCompleteAsync((obj3, obj4) -> {
                        if (log.isDebugEnabled()) {
                            log.debug("Got result async: object: {}, throwable: {}", obj3, obj4);
                        }
                        if (obj4 == null) {
                            javaExecutionResult.setResult(obj3);
                            this.pendingAsyncRequests.remove(obj2);
                            completableFuture.complete(javaExecutionResult);
                        } else {
                            log.warn("function CompletableFuture throwable: {}", obj4);
                            javaExecutionResult.setUserException(new Exception((Throwable) obj4));
                            this.pendingAsyncRequests.remove(obj2);
                            completableFuture.complete(javaExecutionResult);
                        }
                    }, this.executor);
                } catch (InterruptedException e) {
                    log.warn("Exception while put Async requests", (Throwable) e);
                    javaExecutionResult.setUserException(e);
                    completableFuture.complete(javaExecutionResult);
                    return completableFuture;
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Got result: object: {}", process);
                }
                javaExecutionResult.setResult(process);
                completableFuture.complete(javaExecutionResult);
            }
            return completableFuture;
        } catch (Exception e2) {
            javaExecutionResult.setUserException(e2);
            completableFuture.complete(javaExecutionResult);
            return completableFuture;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.context.close();
    }

    public Map<String, Double> getAndResetMetrics() {
        return this.context.getAndResetMetrics();
    }

    public void resetMetrics() {
        this.context.resetMetrics();
    }

    public Map<String, Double> getMetrics() {
        return this.context.getMetrics();
    }

    ContextImpl getContext() {
        return this.context;
    }

    public LinkedBlockingQueue<CompletableFuture<Void>> getPendingAsyncRequests() {
        return this.pendingAsyncRequests;
    }
}
