package org.apache.pulsar.functions.worker;

import java.util.function.Supplier;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/LeaderService.class */
public class LeaderService implements AutoCloseable, ConsumerEventListener {
    private final String consumerName;
    private final FunctionAssignmentTailer functionAssignmentTailer;
    private final ErrorNotifier errorNotifier;
    private final SchedulerManager schedulerManager;
    private final FunctionRuntimeManager functionRuntimeManager;
    private final FunctionMetaDataManager functionMetaDataManager;
    private final MembershipManager membershipManager;
    private ConsumerImpl<byte[]> consumer;
    private final WorkerConfig workerConfig;
    private final PulsarClient pulsarClient;
    private boolean isLeader = false;
    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
    private static final Logger log = LoggerFactory.getLogger(LeaderService.class);
    private static String WORKER_IDENTIFIER = "id";

    public LeaderService(WorkerService workerService, PulsarClient pulsarClient, FunctionAssignmentTailer functionAssignmentTailer, SchedulerManager schedulerManager, FunctionRuntimeManager functionRuntimeManager, FunctionMetaDataManager functionMetaDataManager, MembershipManager membershipManager, ErrorNotifier errorNotifier) {
        this.workerConfig = workerService.getWorkerConfig();
        this.pulsarClient = pulsarClient;
        this.functionAssignmentTailer = functionAssignmentTailer;
        this.schedulerManager = schedulerManager;
        this.functionRuntimeManager = functionRuntimeManager;
        this.functionMetaDataManager = functionMetaDataManager;
        this.membershipManager = membershipManager;
        this.errorNotifier = errorNotifier;
        Object[] objArr = new Object[3];
        objArr[0] = this.workerConfig.getWorkerId();
        objArr[1] = this.workerConfig.getWorkerHostname();
        objArr[2] = this.workerConfig.getTlsEnabled() ? this.workerConfig.getWorkerPortTls() : this.workerConfig.getWorkerPort();
        this.consumerName = String.format("%s:%s:%d", objArr);
    }

    public void start() throws PulsarClientException {
        this.consumer = this.pulsarClient.newConsumer().topic(new String[]{this.workerConfig.getClusterCoordinationTopic()}).subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION).subscriptionType(SubscriptionType.Failover).consumerEventListener(this).property(WORKER_IDENTIFIER, this.consumerName).consumerName(this.consumerName).subscribe();
    }

    public void becameActive(Consumer<?> consumer, int i) {
        Supplier<Boolean> supplier;
        Producer<byte[]> producer;
        Producer<byte[]> producer2;
        synchronized (this) {
            if (this.isLeader) {
                return;
            }
            log.info("Worker {} became the leader.", this.consumerName);
            try {
                this.functionMetaDataManager.getIsInitialized().get();
                this.functionRuntimeManager.getIsInitialized().get();
                supplier = () -> {
                    return Boolean.valueOf(this.membershipManager.getLeader().getWorkerId().equals(this.workerConfig.getWorkerId()));
                };
                producer = null;
                producer2 = null;
            } catch (Throwable th) {
                log.error("Encountered error when initializing to become leader", th);
                this.errorNotifier.triggerError(th);
            }
            try {
                producer = this.schedulerManager.acquireExclusiveWrite(supplier);
                producer2 = this.functionMetaDataManager.acquireExclusiveWrite(supplier);
                this.schedulerManager.initialize(producer);
                this.functionAssignmentTailer.triggerReadToTheEndAndExit().get();
                this.functionAssignmentTailer.close();
                this.functionMetaDataManager.acquireLeadership(producer2);
                this.isLeader = true;
                this.schedulerManager.schedule();
            } catch (WorkerUtils.NotLeaderAnymore e) {
                log.info("Worker {} is not leader anymore. Exiting becoming leader routine.", consumer);
                if (producer != null) {
                    producer.close();
                }
                if (producer2 != null) {
                    producer2.close();
                }
            }
        }
    }

    public synchronized void becameInactive(Consumer<?> consumer, int i) {
        if (this.isLeader) {
            log.info("Worker {} lost the leadership.", this.consumerName);
            this.isLeader = false;
            try {
                this.schedulerManager.close();
                if (this.schedulerManager.getLastMessageProduced() == null) {
                    this.functionAssignmentTailer.start();
                } else {
                    this.functionAssignmentTailer.startFromMessage(this.schedulerManager.getLastMessageProduced());
                }
                this.functionMetaDataManager.giveupLeadership();
            } catch (Throwable th) {
                log.error("Encountered error in routine when worker lost leadership", th);
                this.errorNotifier.triggerError(th);
            }
        }
    }

    public synchronized boolean isLeader() {
        return this.isLeader;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}
