package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionMetaDataManager.class */
public class FunctionMetaDataManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionMetaDataManager.class);
    private final SchedulerManager schedulerManager;
    private final WorkerConfig workerConfig;
    private final PulsarClient pulsarClient;
    private final ErrorNotifier errorNotifier;
    private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
    private static final String versionTag = "version";

    @VisibleForTesting
    final Map<String, Map<String, Map<String, Function.FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap();
    private volatile MessageId lastMessageSeen = MessageId.earliest;
    private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
    private Producer exclusiveLeaderProducer = null;

    public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, PulsarClient pulsarClient, ErrorNotifier errorNotifier) throws PulsarClientException {
        this.workerConfig = workerConfig;
        this.pulsarClient = pulsarClient;
        this.schedulerManager = schedulerManager;
        this.errorNotifier = errorNotifier;
    }

    public synchronized void initialize() {
        try {
            Reader createReader = FunctionMetaDataTopicTailer.createReader(this.workerConfig, this.pulsarClient.newReader(), MessageId.earliest);
            while (createReader.hasMessageAvailable()) {
                try {
                    processMetaDataTopicMessage(createReader.readNext());
                } finally {
                }
            }
            this.isInitialized.complete(null);
            if (createReader != null) {
                createReader.close();
            }
            log.info("FunctionMetaData Manager initialization complete");
        } catch (Exception e) {
            log.error("Failed to initialize meta data store", e);
            throw new RuntimeException("Failed to initialize Metadata Manager", e);
        }
    }

    public synchronized void start() {
        if (this.exclusiveLeaderProducer == null) {
            try {
                initializeTailer();
            } catch (PulsarClientException e) {
                throw new RuntimeException("Could not start MetaData topic tailer", e);
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.functionMetaDataTopicTailer != null) {
            this.functionMetaDataTopicTailer.close();
        }
        if (this.exclusiveLeaderProducer != null) {
            this.exclusiveLeaderProducer.close();
        }
    }

    public synchronized Function.FunctionMetaData getFunctionMetaData(String str, String str2, String str3) {
        return this.functionMetaDataMap.get(str).get(str2).get(str3);
    }

    public synchronized List<Function.FunctionMetaData> getAllFunctionMetaData() {
        LinkedList linkedList = new LinkedList();
        Iterator<Map<String, Map<String, Function.FunctionMetaData>>> it = this.functionMetaDataMap.values().iterator();
        while (it.hasNext()) {
            Iterator<Map<String, Function.FunctionMetaData>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                linkedList.addAll(it2.next().values());
            }
        }
        return linkedList;
    }

    public synchronized Collection<Function.FunctionMetaData> listFunctions(String str, String str2) {
        LinkedList linkedList = new LinkedList();
        if (this.functionMetaDataMap.containsKey(str) && this.functionMetaDataMap.get(str).containsKey(str2)) {
            Iterator<Function.FunctionMetaData> it = this.functionMetaDataMap.get(str).get(str2).values().iterator();
            while (it.hasNext()) {
                linkedList.add(it.next());
            }
            return linkedList;
        }
        return linkedList;
    }

    public synchronized boolean containsFunction(String str, String str2, String str3) {
        return containsFunctionMetaData(str, str2, str3);
    }

    public synchronized void updateFunctionOnLeader(Function.FunctionMetaData functionMetaData, boolean z) throws IllegalStateException, IllegalArgumentException {
        byte[] byteArray;
        if (this.exclusiveLeaderProducer == null) {
            throw new IllegalStateException("Not the leader");
        }
        boolean proccessDeregister = z ? proccessDeregister(functionMetaData) : processUpdate(functionMetaData);
        if (this.workerConfig.getUseCompactedMetadataTopic().booleanValue()) {
            byteArray = z ? "".getBytes() : functionMetaData.toByteArray();
        } else {
            byteArray = Request.ServiceRequest.newBuilder().setServiceRequestType(z ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE).setFunctionMetaData(functionMetaData).setWorkerId(this.workerConfig.getWorkerId()).setRequestId(UUID.randomUUID().toString()).build().toByteArray();
        }
        try {
            TypedMessageBuilder property = this.exclusiveLeaderProducer.newMessage().value(byteArray).property(versionTag, Long.toString(functionMetaData.getVersion()));
            if (this.workerConfig.getUseCompactedMetadataTopic().booleanValue()) {
                property = property.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
            }
            this.lastMessageSeen = property.send();
            if (proccessDeregister) {
                this.schedulerManager.schedule();
            }
        } catch (Exception e) {
            log.error("Could not write into Function Metadata topic", e);
            throw new IllegalStateException("Internal Error updating function at the leader", e);
        }
    }

    public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> supplier) throws WorkerUtils.NotLeaderAnymore {
        return WorkerUtils.createExclusiveProducerWithRetry(this.pulsarClient, this.workerConfig.getFunctionMetadataTopic(), this.workerConfig.getWorkerId() + "-leader", supplier, 1000);
    }

    public void acquireLeadership(Producer<byte[]> producer) {
        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
        if (this.exclusiveLeaderProducer != null) {
            log.error("FunctionMetaData Manager entered invalid state");
            this.errorNotifier.triggerError(new IllegalStateException());
        }
        this.exclusiveLeaderProducer = producer;
        FunctionMetaDataTopicTailer functionMetaDataTopicTailer = this.functionMetaDataTopicTailer;
        this.functionMetaDataTopicTailer = null;
        if (functionMetaDataTopicTailer != null) {
            try {
                functionMetaDataTopicTailer.stopWhenNoMoreMessages().get();
            } catch (Exception e) {
                log.error("Error while waiting for metadata tailer thread to finish", e);
                this.errorNotifier.triggerError(e);
            }
            functionMetaDataTopicTailer.close();
        }
        log.info("FunctionMetaDataManager done becoming leader");
    }

    public synchronized void giveupLeadership() {
        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
        try {
            this.exclusiveLeaderProducer.close();
            this.exclusiveLeaderProducer = null;
            initializeTailer();
        } catch (PulsarClientException e) {
            log.error("Error closing exclusive producer", e);
            this.errorNotifier.triggerError(e);
        }
    }

    public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
        try {
            if (this.workerConfig.getUseCompactedMetadataTopic().booleanValue()) {
                processCompactedMetaDataTopicMessage(message);
            } else {
                processUncompactedMetaDataTopicMessage(message);
            }
        } catch (IllegalArgumentException e) {
        }
        this.lastMessageSeen = message.getMessageId();
    }

    private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
        Request.ServiceRequest parseFrom = Request.ServiceRequest.parseFrom(message.getData());
        if (log.isDebugEnabled()) {
            log.debug("Received Service Request: {}", parseFrom);
        }
        switch (parseFrom.getServiceRequestType()) {
            case UPDATE:
                processUpdate(parseFrom.getFunctionMetaData());
                return;
            case DELETE:
                proccessDeregister(parseFrom.getFunctionMetaData());
                return;
            default:
                log.warn("Received request with unrecognized type: {}", parseFrom);
                return;
        }
    }

    private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
        long longValue = Long.valueOf(message.getProperty(versionTag)).longValue();
        String extractTenantFromFullyQualifiedName = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
        String extractNamespaceFromFullyQualifiedName = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
        String extractNameFromFullyQualifiedName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
        if (message.getData() == null || message.getData().length == 0) {
            proccessDeregister(extractTenantFromFullyQualifiedName, extractNamespaceFromFullyQualifiedName, extractNameFromFullyQualifiedName, longValue);
        } else {
            processUpdate(Function.FunctionMetaData.parseFrom(message.getData()));
        }
    }

    private boolean containsFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        return containsFunctionMetaData(functionMetaData.getFunctionDetails());
    }

    private boolean containsFunctionMetaData(Function.FunctionDetails functionDetails) {
        return containsFunctionMetaData(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
    }

    private boolean containsFunctionMetaData(String str, String str2, String str3) {
        return this.functionMetaDataMap.containsKey(str) && this.functionMetaDataMap.get(str).containsKey(str2) && this.functionMetaDataMap.get(str).get(str2).containsKey(str3);
    }

    @VisibleForTesting
    synchronized boolean proccessDeregister(Function.FunctionMetaData functionMetaData) throws IllegalArgumentException {
        return proccessDeregister(functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), functionMetaData.getVersion());
    }

    synchronized boolean proccessDeregister(String str, String str2, String str3, long j) throws IllegalArgumentException {
        boolean z = false;
        log.debug("Process deregister request: {}/{}/{}/{}", new Object[]{str, str2, str3, Long.valueOf(j)});
        if (containsFunctionMetaData(str, str2, str3)) {
            if (isRequestOutdated(str, str2, str3, j)) {
                if (log.isDebugEnabled()) {
                    log.debug("{}/{}/{} Ignoring outdated request version: {}", new Object[]{str, str2, str3, Long.valueOf(j)});
                }
                throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
            }
            this.functionMetaDataMap.get(str).get(str2).remove(str3);
            z = true;
        }
        return z;
    }

    @VisibleForTesting
    synchronized boolean processUpdate(Function.FunctionMetaData functionMetaData) throws IllegalArgumentException {
        boolean z;
        log.debug("Process update request: {}", functionMetaData);
        if (!containsFunctionMetaData(functionMetaData)) {
            setFunctionMetaData(functionMetaData);
            z = true;
        } else {
            if (isRequestOutdated(functionMetaData)) {
                throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again.");
            }
            setFunctionMetaData(functionMetaData);
            z = true;
        }
        return z;
    }

    private boolean isRequestOutdated(Function.FunctionMetaData functionMetaData) {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        return isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), functionMetaData.getVersion());
    }

    private boolean isRequestOutdated(String str, String str2, String str3, long j) {
        return this.functionMetaDataMap.get(str).get(str2).get(str3).getVersion() >= j;
    }

    @VisibleForTesting
    void setFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        if (!this.functionMetaDataMap.containsKey(functionDetails.getTenant())) {
            this.functionMetaDataMap.put(functionDetails.getTenant(), new ConcurrentHashMap());
        }
        if (!this.functionMetaDataMap.get(functionDetails.getTenant()).containsKey(functionDetails.getNamespace())) {
            this.functionMetaDataMap.get(functionDetails.getTenant()).put(functionDetails.getNamespace(), new ConcurrentHashMap());
        }
        this.functionMetaDataMap.get(functionDetails.getTenant()).get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
    }

    private void initializeTailer() throws PulsarClientException {
        this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, this.pulsarClient.newReader(), this.workerConfig, this.lastMessageSeen, this.errorNotifier);
        this.functionMetaDataTopicTailer.start();
        log.info("MetaData Manager Tailer started");
    }

    public MessageId getLastMessageSeen() {
        return this.lastMessageSeen;
    }

    public CompletableFuture<Void> getIsInitialized() {
        return this.isInitialized;
    }
}
