package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.validator.TransactionBatchedWriteValidator;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/PulsarService.class */
public class PulsarService implements AutoCloseable, ShutdownService {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
    private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
    private final ServiceConfiguration config;
    private NamespaceService nsService;
    private ManagedLedgerStorage managedLedgerClientFactory;
    private LeaderElectionService leaderElectionService;
    private BrokerService brokerService;
    private WebService webService;
    private WebSocketService webSocketService;
    private TopicPoliciesService topicPoliciesService;
    private BookKeeperClientFactory bkClientFactory;
    protected CompactionServiceFactory compactionServiceFactory;
    private StrategicTwoPhaseCompactor strategicCompactor;
    private ResourceUsageTransportManager resourceUsageTransportManager;
    private ResourceGroupService resourceGroupServiceManager;
    private final ScheduledExecutorService executor;
    private OrderedExecutor orderedExecutor;
    private final ScheduledExecutorService loadManagerExecutor;
    private ScheduledExecutorService compactorExecutor;
    private OrderedScheduler offloaderScheduler;
    private OffloadersCache offloadersCache;
    private LedgerOffloader defaultOffloader;
    private LedgerOffloaderStats offloaderStats;
    private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap;
    private ScheduledFuture<?> loadReportTask;
    private LoadSheddingTask loadSheddingTask;
    private ScheduledFuture<?> loadResourceQuotaTask;
    private final AtomicReference<LoadManager> loadManager;
    private PulsarAdmin adminClient;
    private PulsarClient client;
    private final String bindAddress;
    private final String advertisedAddress;
    private String webServiceAddress;
    private String webServiceAddressTls;
    private String brokerServiceUrl;
    private String brokerServiceUrlTls;
    private final String brokerVersion;
    private SchemaStorage schemaStorage;
    private SchemaRegistryService schemaRegistryService;
    private final WorkerConfig workerConfig;
    private final Optional<WorkerService> functionWorkerService;
    private ProtocolHandlers protocolHandlers;
    private final Consumer<Integer> processTerminator;
    protected final EventLoopGroup ioEventLoopGroup;
    private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
    private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
    private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
    private final Timer brokerClientSharedTimer;
    private final ExecutorProvider brokerClientSharedLookupExecutorProvider;
    private MetricsGenerator metricsGenerator;
    private TransactionMetadataStoreService transactionMetadataStoreService;
    private TransactionBufferProvider transactionBufferProvider;
    private TransactionBufferClient transactionBufferClient;
    private HashedWheelTimer transactionTimer;
    private BrokerInterceptor brokerInterceptor;
    private AdditionalServlets brokerAdditionalServlets;
    private PackagesManagement packagesManagement;
    private PulsarPrometheusMetricsServlet metricsServlet;
    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
    private MetadataStoreExtended localMetadataStore;
    private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
    private CoordinationService coordinationService;
    private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
    private MetadataStore configurationMetadataStore;
    private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
    private boolean shouldShutdownConfigurationMetadataStore;
    private PulsarResources pulsarResources;
    private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
    private final ExecutorProvider transactionExecutorProvider;
    private String brokerId;
    private final CompletableFuture<Void> readyForIncomingRequestsFuture;
    private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests;
    private volatile State state;
    private final ReentrantLock mutex;
    private final Condition isClosedCondition;
    private volatile CompletableFuture<Void> closeFuture;
    private Map<String, AdvertisedListener> advertisedListeners;

    /* loaded from: input_file:org/apache/pulsar/broker/PulsarService$State.class */
    public enum State {
        Init,
        Started,
        Closing,
        Closed
    }

    public PulsarService(ServiceConfiguration serviceConfiguration) {
        this(serviceConfiguration, Optional.empty(), num -> {
            LOG.info("Process termination requested with code {}. Ignoring, as this constructor is intended for tests. ", num);
        });
    }

    public PulsarService(ServiceConfiguration serviceConfiguration, Optional<WorkerService> optional, Consumer<Integer> consumer) {
        this(serviceConfiguration, new WorkerConfig(), optional, consumer);
    }

    public PulsarService(ServiceConfiguration serviceConfiguration, WorkerConfig workerConfig, Optional<WorkerService> optional, Consumer<Integer> consumer) {
        this.nsService = null;
        this.managedLedgerClientFactory = null;
        this.leaderElectionService = null;
        this.brokerService = null;
        this.webService = null;
        this.webSocketService = null;
        this.topicPoliciesService = TopicPoliciesService.DISABLED;
        this.offloadersCache = new OffloadersCache();
        this.ledgerOffloaderMap = new ConcurrentHashMap();
        this.loadReportTask = null;
        this.loadSheddingTask = null;
        this.loadResourceQuotaTask = null;
        this.loadManager = new AtomicReference<>();
        this.adminClient = null;
        this.client = null;
        this.schemaStorage = null;
        this.schemaRegistryService = null;
        this.protocolHandlers = null;
        this.packagesManagement = null;
        this.readyForIncomingRequestsFuture = new CompletableFuture<>();
        this.pendingTasksBeforeReadyForIncomingRequests = new ArrayList();
        this.mutex = new ReentrantLock();
        this.isClosedCondition = this.mutex.newCondition();
        this.state = State.Init;
        PulsarConfigurationLoader.isComplete(serviceConfiguration);
        TransactionBatchedWriteValidator.validate(serviceConfiguration);
        this.config = serviceConfiguration;
        this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(serviceConfiguration);
        this.advertisedAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getAdvertisedAddress());
        this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getBindAddress());
        this.brokerVersion = PulsarVersion.getVersion();
        this.processTerminator = consumer;
        this.loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
        this.workerConfig = workerConfig;
        this.functionWorkerService = optional;
        this.executor = Executors.newScheduledThreadPool(serviceConfiguration.getNumExecutorThreadPoolSize(), new ExecutorProvider.ExtendedThreadFactory("pulsar"));
        if (serviceConfiguration.isTransactionCoordinatorEnabled()) {
            this.transactionExecutorProvider = new ExecutorProvider(getConfiguration().getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-executor");
        } else {
            this.transactionExecutorProvider = null;
        }
        this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(serviceConfiguration.getNumIOThreads(), serviceConfiguration.isEnableBusyWait(), new DefaultThreadFactory("pulsar-io"));
        this.brokerClientSharedInternalExecutorProvider = new ExecutorProvider(1, "broker-client-shared-internal-executor");
        this.brokerClientSharedExternalExecutorProvider = new ExecutorProvider(1, "broker-client-shared-external-executor");
        this.brokerClientSharedScheduledExecutorProvider = new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
        this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1L, TimeUnit.MILLISECONDS);
        this.brokerClientSharedLookupExecutorProvider = new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");
        this.offloaderStats = LedgerOffloaderStats.create(false, false, (ScheduledExecutorService) null, 0);
    }

    public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer) throws MetadataStoreException {
        return MetadataStoreFactory.create(this.config.getConfigurationMetadataStoreUrl(), MetadataStoreConfig.builder().sessionTimeoutMillis((int) this.config.getMetadataStoreSessionTimeoutMillis()).allowReadOnlyOperations(this.config.isMetadataStoreAllowReadOnlyOperations()).configFilePath(this.config.getMetadataStoreConfigPath()).batchingEnabled(this.config.isMetadataStoreBatchingEnabled()).batchingMaxDelayMillis(this.config.getMetadataStoreBatchingMaxDelayMillis()).batchingMaxOperations(this.config.getMetadataStoreBatchingMaxOperations()).batchingMaxSizeKb(this.config.getMetadataStoreBatchingMaxSizeKb()).metadataStoreName("configuration-metadata-store").synchronizer(pulsarMetadataEventSynchronizer).build());
    }

    public void closeMetadataServiceSession() throws Exception {
        this.localMetadataStore.close();
    }

    private void closeLeaderElectionService() throws Exception {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
            ExtensibleLoadManagerImpl.get(this.loadManager.get()).getLeaderElectionService().close();
        } else if (this.leaderElectionService != null) {
            this.leaderElectionService.close();
            this.leaderElectionService = null;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws PulsarServerException {
        try {
            closeAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            PulsarServerException cause = e2.getCause();
            if (cause instanceof PulsarServerException) {
                throw cause;
            }
            if (getConfiguration().getBrokerShutdownTimeoutMs() != 0 || (!(cause instanceof TimeoutException) && !(cause instanceof CancellationException))) {
                throw new PulsarServerException(cause);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Shutdown timeout ignored when timeout is 0, which is primarily used in tests to forcefully shutdown the broker", cause);
            }
        }
    }

    public CompletableFuture<Void> closeAsync() {
        this.mutex.lock();
        try {
            try {
                if (this.closeFuture != null) {
                    CompletableFuture<Void> completableFuture = this.closeFuture;
                    this.mutex.unlock();
                    return completableFuture;
                }
                LOG.info("Closing PulsarService");
                if (this.brokerService != null) {
                    this.brokerService.unloadNamespaceBundlesGracefully();
                }
                this.state = State.Closing;
                if (this.resourceUsageTransportManager != null) {
                    try {
                        this.resourceUsageTransportManager.close();
                    } catch (Exception e) {
                        LOG.warn("ResourceUsageTransportManager closing failed {}", e.getMessage());
                    }
                    this.resourceUsageTransportManager = null;
                }
                if (this.resourceGroupServiceManager != null) {
                    try {
                        this.resourceGroupServiceManager.close();
                    } catch (Exception e2) {
                        LOG.warn("ResourceGroupServiceManager closing failed {}", e2.getMessage());
                    }
                    this.resourceGroupServiceManager = null;
                }
                if (this.webService != null) {
                    try {
                        this.webService.close();
                        this.webService = null;
                    } catch (Exception e3) {
                        LOG.error("Web service closing failed", e3);
                    }
                }
                resetMetricsServlet();
                if (this.compactionServiceFactory != null) {
                    try {
                        this.compactionServiceFactory.close();
                    } catch (Exception e4) {
                        LOG.warn("CompactionServiceFactory closing failed {}", e4.getMessage());
                    }
                    this.compactionServiceFactory = null;
                }
                if (this.webSocketService != null) {
                    this.webSocketService.close();
                }
                if (this.brokerAdditionalServlets != null) {
                    this.brokerAdditionalServlets.close();
                    this.brokerAdditionalServlets = null;
                }
                GracefulExecutorServicesShutdown timeout = GracefulExecutorServicesShutdown.initiate().timeout(Duration.ofMillis((long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * getConfiguration().getBrokerShutdownTimeoutMs())));
                if (this.protocolHandlers != null) {
                    this.protocolHandlers.close();
                    this.protocolHandlers = null;
                }
                if (this.loadSheddingTask != null) {
                    this.loadSheddingTask.cancel();
                }
                timeout.shutdown(new ExecutorService[]{this.loadManagerExecutor});
                ArrayList arrayList = new ArrayList();
                if (this.brokerService != null) {
                    CompletableFuture<Void> closeAsync = this.brokerService.closeAsync();
                    if (this.transactionMetadataStoreService != null) {
                        arrayList.add(closeAsync.whenComplete((r4, th) -> {
                            this.transactionMetadataStoreService.close();
                            this.transactionMetadataStoreService = null;
                        }));
                    } else {
                        arrayList.add(closeAsync);
                    }
                    this.brokerService = null;
                }
                if (this.managedLedgerClientFactory != null) {
                    try {
                        this.managedLedgerClientFactory.close();
                    } catch (Exception e5) {
                        LOG.warn("ManagedLedgerClientFactory closing failed {}", e5.getMessage());
                    }
                    this.managedLedgerClientFactory = null;
                }
                if (this.bkClientFactory != null) {
                    this.bkClientFactory.close();
                    this.bkClientFactory = null;
                }
                closeLeaderElectionService();
                if (this.adminClient != null) {
                    this.adminClient.close();
                    this.adminClient = null;
                }
                if (this.transactionBufferSnapshotServiceFactory != null) {
                    this.transactionBufferSnapshotServiceFactory.close();
                    this.transactionBufferSnapshotServiceFactory = null;
                }
                if (this.transactionBufferClient != null) {
                    this.transactionBufferClient.close();
                }
                if (this.topicPoliciesService != null) {
                    this.topicPoliciesService.close();
                    this.topicPoliciesService = null;
                }
                if (this.client != null) {
                    this.client.close();
                    this.client = null;
                }
                if (this.nsService != null) {
                    this.nsService.close();
                    this.nsService = null;
                }
                timeout.shutdown(new ExecutorService[]{this.compactorExecutor});
                timeout.shutdown(new ExecutorService[]{this.offloaderScheduler});
                timeout.shutdown(new ExecutorService[]{this.executor});
                timeout.shutdown(new ExecutorService[]{this.orderedExecutor});
                LoadManager loadManager = this.loadManager.get();
                if (loadManager != null) {
                    loadManager.stop();
                }
                if (this.schemaRegistryService != null) {
                    this.schemaRegistryService.close();
                }
                this.offloadersCache.close();
                if (this.coordinationService != null) {
                    try {
                        this.coordinationService.close();
                    } catch (Exception e6) {
                        if (!(FutureUtil.unwrapCompletionException(e6) instanceof MetadataStoreException.AlreadyClosedException)) {
                            throw e6;
                        }
                    }
                }
                arrayList.add(closeLocalMetadataStore());
                if (this.configMetadataSynchronizer != null) {
                    arrayList.add(this.configMetadataSynchronizer.closeAsync());
                }
                if (this.configurationMetadataStore != null && this.shouldShutdownConfigurationMetadataStore) {
                    this.configurationMetadataStore.close();
                }
                if (this.transactionExecutorProvider != null) {
                    this.transactionExecutorProvider.shutdownNow();
                }
                MLPendingAckStoreProvider.closeBufferedWriterMetrics();
                MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
                if (this.offloaderStats != null) {
                    this.offloaderStats.close();
                }
                this.brokerClientSharedExternalExecutorProvider.shutdownNow();
                this.brokerClientSharedInternalExecutorProvider.shutdownNow();
                this.brokerClientSharedScheduledExecutorProvider.shutdownNow();
                this.brokerClientSharedLookupExecutorProvider.shutdownNow();
                this.brokerClientSharedTimer.stop();
                arrayList.add(EventLoopUtil.shutdownGracefully(this.ioEventLoopGroup));
                arrayList.add(timeout.handle());
                this.closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(arrayList));
                this.closeFuture.handle((r5, th2) -> {
                    if (th2 == null) {
                        LOG.info("Closed");
                    } else if (th2 instanceof CancellationException) {
                        LOG.info("Closed (shutdown cancelled)");
                    } else if (th2 instanceof TimeoutException) {
                        LOG.info("Closed (shutdown timeout)");
                    } else {
                        LOG.warn("Closed with errors", th2);
                    }
                    this.state = State.Closed;
                    this.isClosedCondition.signalAll();
                    return null;
                });
                CompletableFuture<Void> completableFuture2 = this.closeFuture;
                this.mutex.unlock();
                return completableFuture2;
            } catch (Exception e7) {
                CompletableFuture<Void> failedFuture = FutureUtil.failedFuture(((e7 instanceof CompletionException) && (e7.getCause() instanceof MetadataStoreException)) ? new PulsarServerException(MetadataStoreException.unwrap(e7)) : ((e7.getCause() instanceof CompletionException) && (e7.getCause().getCause() instanceof MetadataStoreException)) ? new PulsarServerException(MetadataStoreException.unwrap(e7.getCause())) : new PulsarServerException(e7));
                this.mutex.unlock();
                return failedFuture;
            }
        } catch (Throwable th3) {
            this.mutex.unlock();
            throw th3;
        }
    }

    private synchronized void resetMetricsServlet() {
        this.metricsServlet = null;
    }

    private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> completableFuture) {
        long brokerShutdownTimeoutMs = getConfiguration().getBrokerShutdownTimeoutMs();
        if (brokerShutdownTimeoutMs <= 0) {
            return completableFuture;
        }
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown"));
        FutureUtil.addTimeoutHandling(completableFuture, Duration.ofMillis(brokerShutdownTimeoutMs), newSingleThreadScheduledExecutor, () -> {
            return FutureUtil.createTimeoutException("Timeout in close", getClass(), "close");
        });
        completableFuture.handle((r8, th) -> {
            if (th instanceof TimeoutException) {
                LOG.info("Shutdown timed out after {} ms", Long.valueOf(brokerShutdownTimeoutMs));
                LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
            }
            newSingleThreadScheduledExecutor.shutdownNow();
            return null;
        });
        return completableFuture;
    }

    public ServiceConfiguration getConfiguration() {
        return this.config;
    }

    public Optional<WorkerConfig> getWorkerConfig() {
        return this.functionWorkerService.map(workerService -> {
            return this.workerConfig;
        });
    }

    public Map<String, String> getProtocolDataToAdvertise() {
        return null == this.protocolHandlers ? Collections.emptyMap() : this.protocolHandlers.getProtocolDataToAdvertise();
    }

    public void start() throws PulsarServerException {
        ArrayList arrayList;
        LOG.info("Starting Pulsar Broker service; version: '{}'", this.brokerVersion != null ? this.brokerVersion : "unknown");
        LOG.info("Git Revision {}", PulsarVersion.getGitSha());
        LOG.info("Git Branch {}", PulsarVersion.getGitBranch());
        LOG.info("Built by {} on {} at {}", new Object[]{PulsarVersion.getBuildUser(), PulsarVersion.getBuildHost(), PulsarVersion.getBuildTime()});
        long currentTimeMillis = System.currentTimeMillis();
        this.mutex.lock();
        try {
            try {
                if (this.state != State.Init) {
                    throw new PulsarServerException("Cannot start the service once it was stopped");
                }
                if (this.config.getWebServicePort().isEmpty() && this.config.getWebServicePortTls().isEmpty()) {
                    throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
                }
                if (this.config.isAuthorizationEnabled() && !this.config.isAuthenticationEnabled()) {
                    throw new IllegalStateException("Invalid broker configuration. Authentication must be enabled with authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
                }
                if (this.config.getDefaultRetentionSizeInMB() > 0 && this.config.getBacklogQuotaDefaultLimitBytes() > 0 && this.config.getBacklogQuotaDefaultLimitBytes() >= this.config.getDefaultRetentionSizeInMB() * 1024 * 1024) {
                    throw new IllegalArgumentException(String.format("The retention size must > the backlog quota limit size, but the configured backlog quota limit bytes is %d, the retention size is %d", Long.valueOf(this.config.getBacklogQuotaDefaultLimitBytes()), Long.valueOf(this.config.getDefaultRetentionSizeInMB() * 1024 * 1024)));
                }
                if (this.config.getDefaultRetentionTimeInMinutes() > 0 && this.config.getBacklogQuotaDefaultLimitSecond() > 0 && this.config.getBacklogQuotaDefaultLimitSecond() >= this.config.getDefaultRetentionTimeInMinutes() * 60) {
                    throw new IllegalArgumentException(String.format("The retention time must > the backlog quota limit time, but the configured backlog quota limit time duration is %d, the retention time duration is %d", Integer.valueOf(this.config.getBacklogQuotaDefaultLimitSecond()), Integer.valueOf(this.config.getDefaultRetentionTimeInMinutes() * 60)));
                }
                this.localMetadataSynchronizer = StringUtils.isNotBlank(this.config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, this.config.getMetadataSyncEventTopic()) : null;
                this.localMetadataStore = createLocalMetadataStore(this.localMetadataSynchronizer);
                this.localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
                this.coordinationService = new CoordinationServiceImpl(this.localMetadataStore);
                if (this.config.isConfigurationStoreSeparated()) {
                    this.configMetadataSynchronizer = StringUtils.isNotBlank(this.config.getConfigurationMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, this.config.getConfigurationMetadataSyncEventTopic()) : null;
                    this.configurationMetadataStore = createConfigurationMetadataStore(this.configMetadataSynchronizer);
                    this.shouldShutdownConfigurationMetadataStore = true;
                } else {
                    this.configurationMetadataStore = this.localMetadataStore;
                    this.shouldShutdownConfigurationMetadataStore = false;
                }
                this.pulsarResources = newPulsarResources();
                this.orderedExecutor = newOrderedExecutor();
                this.protocolHandlers = ProtocolHandlers.load(this.config);
                this.protocolHandlers.initialize(this.config);
                this.bkClientFactory = newBookKeeperClientFactory();
                this.managedLedgerClientFactory = newManagedLedgerClientFactory();
                this.brokerService = newBrokerService(this);
                this.loadManager.set(LoadManager.create(this));
                startNamespaceService();
                this.schemaStorage = createAndStartSchemaStorage();
                this.schemaRegistryService = SchemaRegistryService.create(this.schemaStorage, this.config.getSchemaRegistryCompatibilityCheckers(), this.executor);
                OffloadPoliciesImpl create = OffloadPoliciesImpl.create(getConfiguration().getProperties());
                this.offloaderStats = LedgerOffloaderStats.create(this.config.isExposeManagedLedgerMetricsInPrometheus(), this.config.isExposeTopicLevelMetricsInPrometheus(), getOffloaderScheduler(create), this.config.getManagedLedgerStatsPeriodSeconds());
                this.defaultOffloader = createManagedLedgerOffloader(create);
                setBrokerInterceptor(newBrokerInterceptor());
                BrokerInterceptor brokerInterceptor = getBrokerInterceptor();
                if (brokerInterceptor != null) {
                    this.brokerService.setInterceptor(brokerInterceptor);
                    brokerInterceptor.initialize(this);
                }
                this.brokerService.start();
                this.brokerAdditionalServlets = AdditionalServlets.load(this.config);
                this.webService = new WebService(this);
                createMetricsServlet();
                addWebServerHandlers(this.webService, this.metricsServlet, this.config);
                this.webService.start();
                if (this.config.getBrokerServicePort().equals(Optional.of(0))) {
                    this.config.setBrokerServicePort(this.brokerService.getListenPort());
                }
                if (this.config.getBrokerServicePortTls().equals(Optional.of(0))) {
                    this.config.setBrokerServicePortTls(this.brokerService.getListenPortTls());
                }
                this.webServiceAddress = webAddress(this.config);
                this.webServiceAddressTls = webAddressTls(this.config);
                this.brokerServiceUrl = brokerUrl(this.config);
                this.brokerServiceUrlTls = brokerUrlTls(this.config);
                Optional webServicePort = this.config.getWebServicePort();
                ServiceConfiguration serviceConfiguration = this.config;
                Objects.requireNonNull(serviceConfiguration);
                this.brokerId = String.format("%s:%s", this.advertisedAddress, webServicePort.or(serviceConfiguration::getWebServicePortTls).orElseThrow());
                if (this.compactionServiceFactory == null) {
                    this.compactionServiceFactory = loadCompactionServiceFactory();
                }
                if (null != this.webSocketService) {
                    this.webSocketService.setLocalCluster(ClusterDataImpl.builder().serviceUrl(this.webServiceAddress).serviceUrlTls(this.webServiceAddressTls).brokerServiceUrl(this.brokerServiceUrl).brokerServiceUrlTls(this.brokerServiceUrlTls).build());
                }
                this.nsService.initialize();
                startLeaderElectionService();
                startLoadManagementService();
                if (this.config.isTopicLevelPoliciesEnabled() && this.config.isSystemTopicEnabled()) {
                    this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
                }
                this.topicPoliciesService.start();
                this.nsService.registerBootstrapNamespaces();
                if (this.config.isTransactionCoordinatorEnabled()) {
                    MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
                    MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
                    this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);
                    this.transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
                    this.transactionBufferClient = TransactionBufferClientImpl.create(this, this.transactionTimer, this.config.getTransactionBufferClientMaxConcurrentRequests(), this.config.getTransactionBufferClientOperationTimeoutInMills());
                    this.transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider.newProvider(this.config.getTransactionMetadataStoreProviderClassName()), this, this.transactionBufferClient, this.transactionTimer);
                    this.transactionBufferProvider = TransactionBufferProvider.newProvider(this.config.getTransactionBufferProviderClassName());
                    this.transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider.newProvider(this.config.getTransactionPendingAckStoreProviderClassName());
                }
                this.metricsGenerator = new MetricsGenerator(this);
                synchronized (this.pendingTasksBeforeReadyForIncomingRequests) {
                    arrayList = new ArrayList(this.pendingTasksBeforeReadyForIncomingRequests);
                    this.pendingTasksBeforeReadyForIncomingRequests.clear();
                    this.readyForIncomingRequestsFuture.complete(null);
                }
                arrayList.forEach((v0) -> {
                    v0.run();
                });
                this.protocolHandlers.start(this.brokerService);
                this.brokerService.startProtocolHandlers(this.protocolHandlers.newChannelInitializers());
                acquireSLANamespace();
                startWorkerService(this.brokerService.getAuthenticationService(), this.brokerService.getAuthorizationService());
                if (this.config.isEnablePackagesManagement()) {
                    startPackagesManagementService();
                }
                this.resourceUsageTransportManager = ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
                if (StringUtils.isNotBlank(this.config.getResourceUsageTransportClassName())) {
                    this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) Class.forName(this.config.getResourceUsageTransportClassName()).getConstructor(PulsarService.class).newInstance(this);
                }
                this.resourceGroupServiceManager = new ResourceGroupService(this);
                if (this.localMetadataSynchronizer != null) {
                    this.localMetadataSynchronizer.start();
                }
                if (this.configMetadataSynchronizer != null) {
                    this.configMetadataSynchronizer.start();
                }
                LOG.info("messaging service is ready, bootstrap_seconds={}, {}, cluster={}, configs={}", new Object[]{Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTimeMillis)), "bootstrap service " + (this.config.getWebServicePort().isPresent() ? "port = " + this.config.getWebServicePort().get() : "") + (this.config.getWebServicePortTls().isPresent() ? ", tls-port = " + this.config.getWebServicePortTls() : "") + (StringUtils.isNotEmpty(this.brokerServiceUrl) ? ", broker url= " + this.brokerServiceUrl : "") + (StringUtils.isNotEmpty(this.brokerServiceUrlTls) ? ", broker tls url= " + this.brokerServiceUrlTls : ""), this.config.getClusterName(), this.config});
                this.state = State.Started;
                this.mutex.unlock();
            } catch (Exception e) {
                LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
                PulsarServerException from = PulsarServerException.from(e);
                this.readyForIncomingRequestsFuture.completeExceptionally(from);
                throw from;
            }
        } catch (Throwable th) {
            this.mutex.unlock();
            throw th;
        }
    }

    public void runWhenReadyForIncomingRequests(Runnable runnable) {
        boolean z;
        synchronized (this.pendingTasksBeforeReadyForIncomingRequests) {
            if (this.readyForIncomingRequestsFuture.isDone()) {
                z = false;
            } else {
                this.pendingTasksBeforeReadyForIncomingRequests.add(runnable);
                z = true;
            }
        }
        if (z) {
            return;
        }
        runnable.run();
    }

    public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {
        this.readyForIncomingRequestsFuture.get();
    }

    protected BrokerInterceptor newBrokerInterceptor() throws IOException {
        return BrokerInterceptors.load(this.config);
    }

    @VisibleForTesting
    protected OrderedExecutor newOrderedExecutor() {
        return OrderedExecutor.newBuilder().numThreads(this.config.getNumOrderedExecutorThreads()).name("pulsar-ordered").build();
    }

    @VisibleForTesting
    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception {
        return ManagedLedgerStorage.create(this.config, this.localMetadataStore, this.bkClientFactory, this.ioEventLoopGroup);
    }

    @VisibleForTesting
    protected PulsarResources newPulsarResources() {
        PulsarResources pulsarResources = new PulsarResources(this.localMetadataStore, this.configurationMetadataStore, this.config.getMetadataStoreOperationTimeoutSeconds(), getExecutor());
        pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
        return pulsarResources;
    }

    private synchronized void createMetricsServlet() {
        this.metricsServlet = new PulsarPrometheusMetricsServlet(this, this.config.isExposeTopicLevelMetricsInPrometheus(), this.config.isExposeConsumerLevelMetricsInPrometheus(), this.config.isExposeProducerLevelMetricsInPrometheus(), this.config.isSplitTopicAndPartitionLabelInPrometheus());
        if (this.pendingMetricsProviders != null) {
            this.pendingMetricsProviders.forEach(prometheusRawMetricsProvider -> {
                this.metricsServlet.addRawMetricsProvider(prometheusRawMetricsProvider);
            });
            this.pendingMetricsProviders = null;
        }
    }

    private void addWebServerHandlers(WebService webService, PulsarPrometheusMetricsServlet pulsarPrometheusMetricsServlet, ServiceConfiguration serviceConfiguration) throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException {
        HashMap hashMap = new HashMap();
        hashMap.put("pulsar", this);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("statusFilePath", serviceConfiguration.getStatusFilePath());
        hashMap2.put("isReadyProbe", () -> {
            return Boolean.valueOf(this.state == State.Started);
        });
        webService.addRestResource("/", false, hashMap2, false, VipStatus.class);
        webService.addRestResources("/admin", true, hashMap, false, "org.apache.pulsar.broker.admin.v1");
        webService.addRestResources("/admin/v2", true, hashMap, true, "org.apache.pulsar.broker.admin.v2");
        webService.addRestResources("/admin/v3", true, hashMap, true, "org.apache.pulsar.broker.admin.v3");
        webService.addRestResource("/lookup", true, hashMap, true, TopicLookup.class, org.apache.pulsar.broker.lookup.v2.TopicLookup.class);
        webService.addRestResource("/topics", true, hashMap, true, Topics.class);
        webService.addServlet("/metrics", new ServletHolder(pulsarPrometheusMetricsServlet), serviceConfiguration.isAuthenticateMetricsEndpoint(), hashMap);
        addWebSocketServiceHandler(webService, hashMap, serviceConfiguration);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Attempting to add static directory");
        }
        webService.addStaticResources("/static", "/static");
        addBrokerAdditionalServlets(webService, hashMap, serviceConfiguration);
    }

    private void handleMetadataSessionEvent(SessionEvent sessionEvent) {
        LOG.info("Received metadata service session event: {}", sessionEvent);
        if (sessionEvent == SessionEvent.SessionLost && this.config.getZookeeperSessionExpiredPolicy() == MetadataSessionExpiredPolicy.shutdown) {
            LOG.warn("The session with metadata service was lost. Shutting down.\n{}\n", ThreadDumpUtil.buildThreadDiagnosticString());
            shutdownNow();
        }
    }

    private void addBrokerAdditionalServlets(WebService webService, Map<String, Object> map, ServiceConfiguration serviceConfiguration) {
        if (getBrokerAdditionalServlets() != null) {
            for (AdditionalServletWithClassLoader additionalServletWithClassLoader : getBrokerAdditionalServlets().getServlets().values()) {
                additionalServletWithClassLoader.loadConfig(serviceConfiguration);
                AdditionalServlet servlet = additionalServletWithClassLoader.getServlet();
                if (servlet instanceof AdditionalServletWithPulsarService) {
                    ((AdditionalServletWithPulsarService) servlet).setPulsarService(this);
                }
                webService.addServlet(additionalServletWithClassLoader.getBasePath(), additionalServletWithClassLoader.getServletHolder(), serviceConfiguration.isAuthenticationEnabled(), map);
                LOG.info("Broker add additional servlet basePath {} ", additionalServletWithClassLoader.getBasePath());
            }
        }
    }

    private void addWebSocketServiceHandler(WebService webService, Map<String, Object> map, ServiceConfiguration serviceConfiguration) throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException {
        if (serviceConfiguration.isWebSocketServiceEnabled()) {
            this.webSocketService = new WebSocketService((ClusterData) null, serviceConfiguration);
            this.webSocketService.start();
            WebSocketProducerServlet webSocketProducerServlet = new WebSocketProducerServlet(this.webSocketService);
            webService.addServlet("/ws/producer", new ServletHolder(webSocketProducerServlet), true, map);
            webService.addServlet("/ws/v2/producer", new ServletHolder(webSocketProducerServlet), true, map);
            WebSocketConsumerServlet webSocketConsumerServlet = new WebSocketConsumerServlet(this.webSocketService);
            webService.addServlet("/ws/consumer", new ServletHolder(webSocketConsumerServlet), true, map);
            webService.addServlet("/ws/v2/consumer", new ServletHolder(webSocketConsumerServlet), true, map);
            WebSocketReaderServlet webSocketReaderServlet = new WebSocketReaderServlet(this.webSocketService);
            webService.addServlet("/ws/reader", new ServletHolder(webSocketReaderServlet), true, map);
            webService.addServlet("/ws/v2/reader", new ServletHolder(webSocketReaderServlet), true, map);
        }
    }

    private void handleDeleteCluster(Notification notification) {
        if (isRunning() && ClusterResources.pathRepresentsClusterName(notification.getPath()) && notification.getType() == NotificationType.Deleted) {
            getBrokerService().closeAndRemoveReplicationClient(ClusterResources.clusterNameFromPath(notification.getPath()));
        }
    }

    public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer) throws MetadataStoreException, PulsarServerException {
        return MetadataStoreExtended.create(this.config.getMetadataStoreUrl(), MetadataStoreConfig.builder().sessionTimeoutMillis((int) this.config.getMetadataStoreSessionTimeoutMillis()).allowReadOnlyOperations(this.config.isMetadataStoreAllowReadOnlyOperations()).configFilePath(this.config.getMetadataStoreConfigPath()).batchingEnabled(this.config.isMetadataStoreBatchingEnabled()).batchingMaxDelayMillis(this.config.getMetadataStoreBatchingMaxDelayMillis()).batchingMaxOperations(this.config.getMetadataStoreBatchingMaxOperations()).batchingMaxSizeKb(this.config.getMetadataStoreBatchingMaxSizeKb()).synchronizer(pulsarMetadataEventSynchronizer).metadataStoreName("metadata-store").build());
    }

    protected CompletableFuture<Void> closeLocalMetadataStore() throws Exception {
        if (this.localMetadataStore != null) {
            this.localMetadataStore.close();
        }
        if (this.localMetadataSynchronizer == null) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> closeAsync = this.localMetadataSynchronizer.closeAsync();
        this.localMetadataSynchronizer = null;
        return closeAsync;
    }

    protected void startLeaderElectionService() {
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
            LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
        } else {
            this.leaderElectionService = new LeaderElectionService(this.coordinationService, getBrokerId(), getSafeWebServiceAddress(), leaderElectionState -> {
                if (leaderElectionState == LeaderElectionState.Leading) {
                    LOG.info("This broker was elected leader");
                    if (getConfiguration().isLoadBalancerEnabled()) {
                        long millis = TimeUnit.MINUTES.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
                        if (this.loadSheddingTask != null) {
                            this.loadSheddingTask.cancel();
                        }
                        if (this.loadResourceQuotaTask != null) {
                            this.loadResourceQuotaTask.cancel(false);
                        }
                        this.loadSheddingTask = new LoadSheddingTask(this.loadManager, this.loadManagerExecutor, this.config);
                        this.loadSheddingTask.start();
                        this.loadResourceQuotaTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadResourceQuotaUpdaterTask(this.loadManager), millis, millis, TimeUnit.MILLISECONDS);
                        return;
                    }
                    return;
                }
                if (this.leaderElectionService != null) {
                    Optional<LeaderBroker> currentLeader = this.leaderElectionService.getCurrentLeader();
                    if (currentLeader.isPresent()) {
                        LOG.info("This broker is a follower. Current leader is {}", currentLeader);
                    } else {
                        LOG.info("This broker is a follower. No leader has been elected yet");
                    }
                }
                if (this.loadSheddingTask != null) {
                    this.loadSheddingTask.cancel();
                    this.loadSheddingTask = null;
                }
                if (this.loadResourceQuotaTask != null) {
                    this.loadResourceQuotaTask.cancel(false);
                    this.loadResourceQuotaTask = null;
                }
            });
            this.leaderElectionService.start();
        }
    }

    protected void acquireSLANamespace() {
        boolean z;
        try {
            NamespaceName sLAMonitorNamespace = NamespaceService.getSLAMonitorNamespace(getBrokerId(), this.config);
            if (!this.pulsarResources.getNamespaceResources().namespaceExists(sLAMonitorNamespace)) {
                LOG.info("SLA Namespace = {} doesn't exist.", sLAMonitorNamespace);
                return;
            }
            try {
                z = this.nsService.registerSLANamespace();
                LOG.info("Register SLA Namespace = {}, returned - {}.", sLAMonitorNamespace, Boolean.valueOf(z));
            } catch (PulsarServerException e) {
                z = false;
            }
            if (!z) {
                this.nsService.unloadSLANamespace();
            }
        } catch (Exception e2) {
            LOG.warn("Exception while trying to unload the SLA namespace, will try to unload the namespace again after 1 minute. Exception:", e2);
            this.executor.schedule(this::acquireSLANamespace, 1L, TimeUnit.MINUTES);
        } catch (Throwable th) {
            LOG.warn("Exception while trying to unload the SLA namespace, will not try to unload the namespace again. Exception:", th);
        }
    }

    public void waitUntilClosed() throws InterruptedException {
        this.mutex.lock();
        while (this.state != State.Closed) {
            try {
                this.isClosedCondition.await();
            } finally {
                this.mutex.unlock();
            }
        }
    }

    protected void startNamespaceService() throws PulsarServerException {
        LOG.info("Starting name space service, bootstrap namespaces=" + this.config.getBootstrapNamespaces());
        this.nsService = getNamespaceServiceProvider().get();
    }

    public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException {
        return () -> {
            return new NamespaceService(this);
        };
    }

    protected void startLoadManagementService() throws PulsarServerException {
        LOG.info("Starting load management service ...");
        this.loadManager.get().start();
        if (!this.config.isLoadBalancerEnabled() || ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
            return;
        }
        LOG.info("Starting load balancer");
        if (this.loadReportTask == null) {
            long loadBalancerReportUpdateMinIntervalMillis = this.config.getLoadBalancerReportUpdateMinIntervalMillis();
            this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadReportUpdaterTask(this.loadManager), loadBalancerReportUpdateMinIntervalMillis, loadBalancerReportUpdateMinIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void loadNamespaceTopics(NamespaceBundle namespaceBundle) {
        this.executor.submit(() -> {
            CompletableFuture<Optional<Topic>> topicIfExists;
            LOG.info("Loading all topics on bundle: {}", namespaceBundle);
            NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
            ArrayList arrayList = new ArrayList();
            long nanoTime = System.nanoTime();
            for (String str : getNamespaceService().getListOfPersistentTopics(namespaceObject).get(this.config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
                try {
                    TopicName topicName = TopicName.get(str);
                    if (namespaceBundle.includes(topicName) && !SystemTopicNames.isTransactionInternalName(topicName) && (topicIfExists = this.brokerService.getTopicIfExists(str)) != null) {
                        arrayList.add(topicIfExists);
                    }
                } catch (Throwable th) {
                    LOG.warn("Failed to preload topic {}", str, th);
                }
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            FutureUtil.waitForAll(arrayList).thenRun(() -> {
                LOG.info("Loaded {} topics on {} -- time taken: {} seconds", new Object[]{Long.valueOf(arrayList.stream().filter(completableFuture -> {
                    return ((Optional) completableFuture.getNow(Optional.empty())).isPresent();
                }).count()), namespaceBundle, Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) / 1000.0d)});
            });
            return null;
        });
    }

    public String getStatusFilePath() {
        if (this.config == null) {
            return null;
        }
        return this.config.getStatusFilePath();
    }

    public InternalConfigurationData getInternalConfigurationData() {
        return new InternalConfigurationData(this.config.getMetadataStoreUrl(), this.config.getConfigurationMetadataStoreUrl(), new ClientConfiguration().getZkLedgersRootPath(), this.config.isBookkeeperMetadataStoreSeparated() ? this.config.getBookkeeperMetadataStoreUrl() : null, (String) getWorkerConfig().map((v0) -> {
            return v0.getStateStorageServiceUrl();
        }).orElse(null));
    }

    public State getState() {
        return this.state;
    }

    public boolean isRunning() {
        return this.state == State.Started || this.state == State.Init;
    }

    public LeaderElectionService getLeaderElectionService() {
        return ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this) ? ExtensibleLoadManagerImpl.get(this.loadManager.get()).getLeaderElectionService() : this.leaderElectionService;
    }

    public NamespaceService getNamespaceService() {
        return this.nsService;
    }

    public Optional<WorkerService> getWorkerServiceOpt() {
        return this.functionWorkerService;
    }

    public WorkerService getWorkerService() throws UnsupportedOperationException {
        return this.functionWorkerService.orElseThrow(() -> {
            return new UnsupportedOperationException("Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false");
        });
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public BookKeeper getBookKeeperClient() {
        return getManagedLedgerClientFactory().getBookKeeperClient();
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return getManagedLedgerClientFactory().getManagedLedgerFactory();
    }

    public ManagedLedgerStorage getManagedLedgerClientFactory() {
        return this.managedLedgerClientFactory;
    }

    public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, OffloadPoliciesImpl offloadPoliciesImpl) {
        return offloadPoliciesImpl == null ? getDefaultOffloader() : this.ledgerOffloaderMap.compute(namespaceName, (namespaceName2, ledgerOffloader) -> {
            if (ledgerOffloader != null) {
                try {
                    if (Objects.equals(ledgerOffloader.getOffloadPolicies(), offloadPoliciesImpl)) {
                        return ledgerOffloader;
                    }
                } catch (PulsarServerException e) {
                    LOG.error("create ledgerOffloader failed for namespace {}", namespaceName.toString(), e);
                    return new NullLedgerOffloader();
                }
            }
            if (ledgerOffloader != null) {
                ledgerOffloader.close();
            }
            return createManagedLedgerOffloader(offloadPoliciesImpl);
        });
    }

    public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPoliciesImpl) throws PulsarServerException {
        LedgerOffloader create;
        try {
            if (!StringUtils.isNotBlank(offloadPoliciesImpl.getManagedLedgerOffloadDriver())) {
                LOG.debug("No ledger offloader configured, using NULL instance");
                return NullLedgerOffloader.INSTANCE;
            }
            Preconditions.checkNotNull(offloadPoliciesImpl.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPoliciesImpl.getManagedLedgerOffloadDriver());
            synchronized (this) {
                try {
                    create = this.offloadersCache.getOrLoadOffloaders(offloadPoliciesImpl.getOffloadersDirectory(), this.config.getNarExtractionDirectory()).getOffloaderFactory(offloadPoliciesImpl.getManagedLedgerOffloadDriver()).create(offloadPoliciesImpl, Map.of("S3ManagedLedgerOffloaderSoftwareVersion".toLowerCase(), PulsarVersion.getVersion(), "S3ManagedLedgerOffloaderSoftwareGitSha".toLowerCase(), PulsarVersion.getGitSha(), "pulsarClusterName".toLowerCase(), this.config.getClusterName()), this.schemaStorage, getOffloaderScheduler(offloadPoliciesImpl), this.offloaderStats);
                } catch (IOException e) {
                    throw new PulsarServerException(e.getMessage(), e.getCause());
                }
            }
            return create;
        } catch (Throwable th) {
            throw new PulsarServerException(th);
        }
    }

    private SchemaStorage createAndStartSchemaStorage() throws Exception {
        SchemaStorage create = ((SchemaStorageFactory) Reflections.createInstance(this.config.getSchemaRegistryStorageClassName(), SchemaStorageFactory.class, Thread.currentThread().getContextClassLoader())).create(this);
        create.start();
        return create;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public ExecutorProvider getTransactionExecutorProvider() {
        return this.transactionExecutorProvider;
    }

    public ScheduledExecutorService getLoadManagerExecutor() {
        return this.loadManagerExecutor;
    }

    public OrderedExecutor getOrderedExecutor() {
        return this.orderedExecutor;
    }

    public BookKeeperClientFactory newBookKeeperClientFactory() {
        return new BookKeeperClientFactoryImpl();
    }

    public BookKeeperClientFactory getBookKeeperClientFactory() {
        return this.bkClientFactory;
    }

    public synchronized ScheduledExecutorService getCompactorExecutor() {
        if (this.compactorExecutor == null) {
            this.compactorExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("compaction"));
        }
        return this.compactorExecutor;
    }

    public Compactor getNullableCompactor() {
        CompactionServiceFactory compactionServiceFactory = this.compactionServiceFactory;
        if (compactionServiceFactory instanceof PulsarCompactionServiceFactory) {
            return ((PulsarCompactionServiceFactory) compactionServiceFactory).getNullableCompactor();
        }
        return null;
    }

    public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
        return new StrategicTwoPhaseCompactor(getConfiguration(), getClient(), getBookKeeperClient(), getCompactorExecutor());
    }

    public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException {
        if (this.strategicCompactor == null) {
            this.strategicCompactor = newStrategicCompactor();
        }
        return this.strategicCompactor;
    }

    protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPoliciesImpl) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder().numThreads(offloadPoliciesImpl.getManagedLedgerOffloadMaxThreads().intValue()).name("offloader").build();
        }
        return this.offloaderScheduler;
    }

    public PulsarClientImpl createClientImpl(ClientConfigurationData clientConfigurationData) throws PulsarClientException {
        return PulsarClientImpl.builder().conf(clientConfigurationData).eventLoopGroup(this.ioEventLoopGroup).timer(this.brokerClientSharedTimer).internalExecutorProvider(this.brokerClientSharedInternalExecutorProvider).externalExecutorProvider(this.brokerClientSharedExternalExecutorProvider).scheduledExecutorProvider(this.brokerClientSharedScheduledExecutorProvider).lookupExecutorProvider(this.brokerClientSharedLookupExecutorProvider).build();
    }

    public synchronized PulsarClient getClient() throws PulsarServerException {
        if (this.client == null) {
            try {
                ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
                clientConfigurationData.setMemoryLimitBytes(0L);
                clientConfigurationData.setStatsIntervalSeconds(0L);
                ClientConfigurationData clientConfigurationData2 = (ClientConfigurationData) ConfigurationDataUtils.loadData(PropertiesUtils.filterAndMapProperties(getConfiguration().getProperties(), "brokerClient_"), clientConfigurationData, ClientConfigurationData.class);
                clientConfigurationData2.setConnectionMaxIdleSeconds(-1);
                boolean isBrokerClientTlsEnabled = getConfiguration().isBrokerClientTlsEnabled();
                clientConfigurationData2.setServiceUrl(isBrokerClientTlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
                if (isBrokerClientTlsEnabled) {
                    clientConfigurationData2.setTlsCiphers(getConfiguration().getBrokerClientTlsCiphers());
                    clientConfigurationData2.setTlsProtocols(getConfiguration().getBrokerClientTlsProtocols());
                    clientConfigurationData2.setTlsAllowInsecureConnection(getConfiguration().isTlsAllowInsecureConnection());
                    clientConfigurationData2.setTlsHostnameVerificationEnable(getConfiguration().isTlsHostnameVerificationEnabled());
                    if (getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
                        clientConfigurationData2.setUseKeyStoreTls(true);
                        clientConfigurationData2.setTlsTrustStoreType(getConfiguration().getBrokerClientTlsTrustStoreType());
                        clientConfigurationData2.setTlsTrustStorePath(getConfiguration().getBrokerClientTlsTrustStore());
                        clientConfigurationData2.setTlsTrustStorePassword(getConfiguration().getBrokerClientTlsTrustStorePassword());
                        clientConfigurationData2.setTlsKeyStoreType(getConfiguration().getBrokerClientTlsKeyStoreType());
                        clientConfigurationData2.setTlsKeyStorePath(getConfiguration().getBrokerClientTlsKeyStore());
                        clientConfigurationData2.setTlsKeyStorePassword(getConfiguration().getBrokerClientTlsKeyStorePassword());
                    } else {
                        clientConfigurationData2.setTlsTrustCertsFilePath(StringUtils.isNotBlank(getConfiguration().getBrokerClientTrustCertsFilePath()) ? getConfiguration().getBrokerClientTrustCertsFilePath() : getConfiguration().getTlsTrustCertsFilePath());
                        clientConfigurationData2.setTlsKeyFilePath(getConfiguration().getBrokerClientKeyFilePath());
                        clientConfigurationData2.setTlsCertificateFilePath(getConfiguration().getBrokerClientCertificateFilePath());
                    }
                }
                if (StringUtils.isNotBlank(getConfiguration().getBrokerClientAuthenticationPlugin())) {
                    clientConfigurationData2.setAuthPluginClassName(getConfiguration().getBrokerClientAuthenticationPlugin());
                    clientConfigurationData2.setAuthParams(getConfiguration().getBrokerClientAuthenticationParameters());
                    clientConfigurationData2.setAuthParamMap((Map) null);
                    clientConfigurationData2.setAuthentication(AuthenticationFactory.create(getConfiguration().getBrokerClientAuthenticationPlugin(), getConfiguration().getBrokerClientAuthenticationParameters()));
                }
                this.client = createClientImpl(clientConfigurationData2);
            } catch (Exception e) {
                throw new PulsarServerException(e);
            }
        }
        return this.client;
    }

    public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
        if (this.adminClient == null) {
            try {
                ServiceConfiguration configuration = getConfiguration();
                String str = configuration.isBrokerClientTlsEnabled() ? this.webServiceAddressTls : this.webServiceAddress;
                if (str == null) {
                    throw new IllegalArgumentException("Web service address was not set properly , isBrokerClientTlsEnabled: " + configuration.isBrokerClientTlsEnabled() + ", webServiceAddressTls: " + this.webServiceAddressTls + ", webServiceAddress: " + this.webServiceAddress);
                }
                PulsarAdminBuilder serviceHttpUrl = PulsarAdmin.builder().serviceHttpUrl(str);
                serviceHttpUrl.loadConf(PropertiesUtils.filterAndMapProperties(this.config.getProperties(), "brokerClient_"));
                serviceHttpUrl.authentication(configuration.getBrokerClientAuthenticationPlugin(), configuration.getBrokerClientAuthenticationParameters());
                if (configuration.isBrokerClientTlsEnabled()) {
                    serviceHttpUrl.tlsCiphers(this.config.getBrokerClientTlsCiphers()).tlsProtocols(this.config.getBrokerClientTlsProtocols());
                    if (configuration.isBrokerClientTlsEnabledWithKeyStore()) {
                        serviceHttpUrl.useKeyStoreTls(true).tlsTrustStoreType(configuration.getBrokerClientTlsTrustStoreType()).tlsTrustStorePath(configuration.getBrokerClientTlsTrustStore()).tlsTrustStorePassword(configuration.getBrokerClientTlsTrustStorePassword()).tlsKeyStoreType(configuration.getBrokerClientTlsKeyStoreType()).tlsKeyStorePath(configuration.getBrokerClientTlsKeyStore()).tlsKeyStorePassword(configuration.getBrokerClientTlsKeyStorePassword());
                    } else {
                        serviceHttpUrl.tlsTrustCertsFilePath(configuration.getBrokerClientTrustCertsFilePath()).tlsKeyFilePath(configuration.getBrokerClientKeyFilePath()).tlsCertificateFilePath(configuration.getBrokerClientCertificateFilePath());
                    }
                    serviceHttpUrl.allowTlsInsecureConnection(configuration.isTlsAllowInsecureConnection()).enableTlsHostnameVerification(configuration.isTlsHostnameVerificationEnabled());
                }
                serviceHttpUrl.readTimeout(configuration.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.adminClient = serviceHttpUrl.build();
                LOG.info("created admin with url {} ", str);
            } catch (Exception e) {
                throw new PulsarServerException(e);
            }
        }
        return this.adminClient;
    }

    public MetricsGenerator getMetricsGenerator() {
        return this.metricsGenerator;
    }

    public TransactionMetadataStoreService getTransactionMetadataStoreService() {
        return this.transactionMetadataStoreService;
    }

    public TransactionBufferProvider getTransactionBufferProvider() {
        return this.transactionBufferProvider;
    }

    public TransactionBufferClient getTransactionBufferClient() {
        return this.transactionBufferClient;
    }

    protected String brokerUrl(ServiceConfiguration serviceConfiguration) {
        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(serviceConfiguration, "pulsar");
        if (internalListener.getBrokerServiceUrl() != null) {
            return internalListener.getBrokerServiceUrl().toString();
        }
        return null;
    }

    public static String brokerUrl(String str, int i) {
        return ServiceConfigurationUtils.brokerUrl(str, i);
    }

    public String brokerUrlTls(ServiceConfiguration serviceConfiguration) {
        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(serviceConfiguration, "pulsar+ssl");
        if (internalListener.getBrokerServiceUrlTls() != null) {
            return internalListener.getBrokerServiceUrlTls().toString();
        }
        return null;
    }

    public static String brokerUrlTls(String str, int i) {
        return ServiceConfigurationUtils.brokerUrlTls(str, i);
    }

    public String webAddress(ServiceConfiguration serviceConfiguration) {
        if (!serviceConfiguration.getWebServicePort().isPresent()) {
            return null;
        }
        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(serviceConfiguration, "http");
        return internalListener.getBrokerHttpUrl() != null ? internalListener.getBrokerHttpUrl().toString() : webAddress(ServiceConfigurationUtils.getWebServiceAddress(serviceConfiguration), getListenPortHTTP().orElseThrow().intValue());
    }

    public static String webAddress(String str, int i) {
        return String.format("http://%s:%d", str, Integer.valueOf(i));
    }

    public String webAddressTls(ServiceConfiguration serviceConfiguration) {
        if (!serviceConfiguration.getWebServicePortTls().isPresent()) {
            return null;
        }
        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(serviceConfiguration, "https");
        return internalListener.getBrokerHttpsUrl() != null ? internalListener.getBrokerHttpsUrl().toString() : webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(serviceConfiguration), getListenPortHTTPS().orElseThrow().intValue());
    }

    public static String webAddressTls(String str, int i) {
        return String.format("https://%s:%d", str, Integer.valueOf(i));
    }

    public String getSafeWebServiceAddress() {
        return this.webServiceAddressTls != null ? this.webServiceAddressTls : this.webServiceAddress;
    }

    @Deprecated
    public String getSafeBrokerServiceUrl() {
        return this.brokerServiceUrlTls != null ? this.brokerServiceUrlTls : this.brokerServiceUrl;
    }

    public String getBrokerId() {
        return (String) Objects.requireNonNull(this.brokerId, "brokerId is not initialized before start has been called");
    }

    public TopicPoliciesService getTopicPoliciesService() {
        return this.topicPoliciesService;
    }

    public ResourceUsageTransportManager getResourceUsageTransportManager() {
        return this.resourceUsageTransportManager;
    }

    public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider prometheusRawMetricsProvider) {
        if (this.metricsServlet != null) {
            this.metricsServlet.addRawMetricsProvider(prometheusRawMetricsProvider);
            return;
        }
        if (this.pendingMetricsProviders == null) {
            this.pendingMetricsProviders = new LinkedList();
        }
        this.pendingMetricsProviders.add(prometheusRawMetricsProvider);
    }

    private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws Exception {
        if (this.functionWorkerService.isPresent()) {
            if (this.workerConfig.isUseTls() || this.brokerServiceUrl == null) {
                this.workerConfig.setPulsarServiceUrl(this.brokerServiceUrlTls);
            } else {
                this.workerConfig.setPulsarServiceUrl(this.brokerServiceUrl);
            }
            if (this.workerConfig.isUseTls() || this.webServiceAddress == null) {
                this.workerConfig.setPulsarWebServiceUrl(this.webServiceAddressTls);
                this.workerConfig.setFunctionWebServiceUrl(this.webServiceAddressTls);
            } else {
                this.workerConfig.setPulsarWebServiceUrl(this.webServiceAddress);
                this.workerConfig.setFunctionWebServiceUrl(this.webServiceAddress);
            }
            LOG.info("Starting function worker service: serviceUrl = {}, webServiceUrl = {}, functionWebServiceUrl = {}", new Object[]{this.workerConfig.getPulsarServiceUrl(), this.workerConfig.getPulsarWebServiceUrl(), this.workerConfig.getFunctionWebServiceUrl()});
            this.functionWorkerService.get().initInBroker(this.config, this.workerConfig, this.pulsarResources, getInternalConfigurationData());
            this.functionWorkerService.get().start(authenticationService, authorizationService, ErrorNotifier.getShutdownServiceImpl(this));
            LOG.info("Function worker service started");
        }
    }

    public PackagesManagement getPackagesManagement() throws UnsupportedOperationException {
        if (this.packagesManagement == null) {
            throw new UnsupportedOperationException("Package Management Service is not enabled in the broker.");
        }
        return this.packagesManagement;
    }

    private void startPackagesManagementService() throws IOException {
        this.packagesManagement = new PackagesManagementImpl();
        PackagesStorageProvider newProvider = PackagesStorageProvider.newProvider(this.config.getPackagesManagementStorageProvider());
        DefaultPackagesStorageConfiguration defaultPackagesStorageConfiguration = new DefaultPackagesStorageConfiguration();
        defaultPackagesStorageConfiguration.setProperty(this.config.getProperties());
        PackagesStorage storage = newProvider.getStorage(defaultPackagesStorageConfiguration);
        storage.initialize();
        this.packagesManagement.initialize(storage);
    }

    public Optional<Integer> getListenPortHTTP() {
        return this.webService.getListenPortHTTP();
    }

    public Optional<Integer> getListenPortHTTPS() {
        return this.webService.getListenPortHTTPS();
    }

    public Optional<Integer> getBrokerListenPort() {
        return this.brokerService.getListenPort();
    }

    public Optional<Integer> getBrokerListenPortTls() {
        return this.brokerService.getListenPortTls();
    }

    public MetadataStoreExtended getLocalMetadataStore() {
        return this.localMetadataStore;
    }

    public CoordinationService getCoordinationService() {
        return this.coordinationService;
    }

    public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration serviceConfiguration, String str) throws IOException {
        WorkerConfig load = WorkerConfig.load(str);
        load.setWorkerPort((Integer) serviceConfiguration.getWebServicePort().orElse(null));
        load.setWorkerPortTls((Integer) serviceConfiguration.getWebServicePortTls().orElse(null));
        String defaultOrConfiguredAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getAdvertisedAddress());
        load.setWorkerHostname(defaultOrConfiguredAddress);
        load.setPulsarFunctionsCluster(serviceConfiguration.getClusterName());
        load.setAuthenticationEnabled(serviceConfiguration.isAuthenticationEnabled());
        load.setAuthenticationProviders(serviceConfiguration.getAuthenticationProviders());
        load.setAuthorizationEnabled(serviceConfiguration.isAuthorizationEnabled());
        load.setAuthorizationProvider(serviceConfiguration.getAuthorizationProvider());
        load.setConfigurationMetadataStoreUrl(serviceConfiguration.getConfigurationMetadataStoreUrl());
        load.setMetadataStoreSessionTimeoutMillis(serviceConfiguration.getMetadataStoreSessionTimeoutMillis());
        load.setMetadataStoreOperationTimeoutSeconds(serviceConfiguration.getMetadataStoreOperationTimeoutSeconds());
        load.setMetadataStoreCacheExpirySeconds(serviceConfiguration.getMetadataStoreCacheExpirySeconds());
        load.setTlsAllowInsecureConnection(serviceConfiguration.isTlsAllowInsecureConnection());
        load.setTlsEnabled(serviceConfiguration.isTlsEnabled());
        load.setTlsEnableHostnameVerification(serviceConfiguration.isTlsHostnameVerificationEnabled());
        load.setBrokerClientTrustCertsFilePath(serviceConfiguration.getBrokerClientTrustCertsFilePath());
        load.setTlsTrustCertsFilePath(serviceConfiguration.getTlsTrustCertsFilePath());
        load.setBrokerClientAuthenticationPlugin(serviceConfiguration.getBrokerClientAuthenticationPlugin());
        load.setBrokerClientAuthenticationParameters(serviceConfiguration.getBrokerClientAuthenticationParameters());
        load.setSuperUserRoles(serviceConfiguration.getSuperUserRoles());
        load.setProxyRoles(serviceConfiguration.getProxyRoles());
        load.setFunctionsWorkerEnablePackageManagement(serviceConfiguration.isFunctionsWorkerEnablePackageManagement());
        if (StringUtils.isBlank(load.getFunctionsWorkerServiceNarPackage())) {
            load.setFunctionsWorkerServiceNarPackage(serviceConfiguration.getFunctionsWorkerServiceNarPackage());
        }
        load.setWorkerId("c-" + serviceConfiguration.getClusterName() + "-fw-" + defaultOrConfiguredAddress + "-" + (load.getTlsEnabled() ? load.getWorkerPortTls() : load.getWorkerPort()));
        return load;
    }

    public void shutdownNow() {
        LOG.info("Invoking Pulsar service immediate shutdown");
        try {
            closeMetadataServiceSession();
        } catch (Exception e) {
            LOG.warn("Failed to close metadata service session: {}", e.getMessage());
        }
        this.processTerminator.accept(-1);
    }

    @VisibleForTesting
    protected BrokerService newBrokerService(PulsarService pulsarService) throws Exception {
        return new BrokerService(pulsarService, this.ioEventLoopGroup);
    }

    @VisibleForTesting
    public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
        this.transactionBufferProvider = transactionBufferProvider;
    }

    public void initConfigMetadataSynchronizerIfNeeded() {
        this.mutex.lock();
        try {
            String configurationMetadataSyncEventTopic = this.config.getConfigurationMetadataSyncEventTopic();
            PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer = this.configMetadataSynchronizer;
            if (!(this.configurationMetadataStore instanceof MetadataStoreExtended)) {
                LOG.info("Skip to update Metadata Synchronizer because of the Configuration Metadata Store using[{}] does not support.", this.configurationMetadataStore.getClass().getName());
                this.mutex.unlock();
                return;
            }
            if (pulsarMetadataEventSynchronizer == null && StringUtils.isBlank(configurationMetadataSyncEventTopic)) {
                LOG.info("Skip to update Metadata Synchronizer because the topic[null] does not changed.");
            }
            if (StringUtils.isNotBlank(configurationMetadataSyncEventTopic) && pulsarMetadataEventSynchronizer != null) {
                TopicName topicName = TopicName.get(configurationMetadataSyncEventTopic);
                TopicName topicName2 = TopicName.get(pulsarMetadataEventSynchronizer.getTopicName());
                if (topicName.equals(topicName2)) {
                    LOG.info("Skip to update Metadata Synchronizer because the topic[{}] does not changed.", topicName2);
                }
            }
            if (StringUtils.isBlank(configurationMetadataSyncEventTopic)) {
                this.configMetadataSynchronizer = null;
            } else {
                this.configMetadataSynchronizer = new PulsarMetadataEventSynchronizer(this, configurationMetadataSyncEventTopic);
            }
            PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer2 = this.configMetadataSynchronizer;
            this.configurationMetadataStore.updateMetadataEventSynchronizer(pulsarMetadataEventSynchronizer2);
            Runnable runnable = () -> {
                if (pulsarMetadataEventSynchronizer2 == null) {
                    return;
                }
                try {
                    pulsarMetadataEventSynchronizer2.start();
                } catch (Exception e) {
                    LOG.error("Start Metadata Synchronizer with topic {} failed.", configurationMetadataSyncEventTopic, e);
                }
            };
            this.executor.submit(() -> {
                if (pulsarMetadataEventSynchronizer != null) {
                    pulsarMetadataEventSynchronizer.closeAsync().whenComplete((r3, th) -> {
                        runnable.run();
                    });
                } else {
                    runnable.run();
                }
            });
            this.mutex.unlock();
        } catch (Throwable th) {
            this.mutex.unlock();
            throw th;
        }
    }

    private CompactionServiceFactory loadCompactionServiceFactory() {
        CompactionServiceFactory compactionServiceFactory = (CompactionServiceFactory) Reflections.createInstance(this.config.getCompactionServiceFactoryClassName(), CompactionServiceFactory.class, Thread.currentThread().getContextClassLoader());
        compactionServiceFactory.initialize(this).join();
        return compactionServiceFactory;
    }

    public CompletableFuture<TopicCompactionService> newTopicCompactionService(String str) {
        try {
            return getCompactionServiceFactory().newTopicCompactionService(str);
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    public ServiceConfiguration getConfig() {
        return this.config;
    }

    public NamespaceService getNsService() {
        return this.nsService;
    }

    public WebService getWebService() {
        return this.webService;
    }

    public WebSocketService getWebSocketService() {
        return this.webSocketService;
    }

    public BookKeeperClientFactory getBkClientFactory() {
        return this.bkClientFactory;
    }

    public CompactionServiceFactory getCompactionServiceFactory() {
        return this.compactionServiceFactory;
    }

    public ResourceGroupService getResourceGroupServiceManager() {
        return this.resourceGroupServiceManager;
    }

    public OrderedScheduler getOffloaderScheduler() {
        return this.offloaderScheduler;
    }

    public OffloadersCache getOffloadersCache() {
        return this.offloadersCache;
    }

    public LedgerOffloader getDefaultOffloader() {
        return this.defaultOffloader;
    }

    public LedgerOffloaderStats getOffloaderStats() {
        return this.offloaderStats;
    }

    public Map<NamespaceName, LedgerOffloader> getLedgerOffloaderMap() {
        return this.ledgerOffloaderMap;
    }

    public ScheduledFuture<?> getLoadReportTask() {
        return this.loadReportTask;
    }

    public LoadSheddingTask getLoadSheddingTask() {
        return this.loadSheddingTask;
    }

    public ScheduledFuture<?> getLoadResourceQuotaTask() {
        return this.loadResourceQuotaTask;
    }

    public AtomicReference<LoadManager> getLoadManager() {
        return this.loadManager;
    }

    public String getBindAddress() {
        return this.bindAddress;
    }

    public String getAdvertisedAddress() {
        return this.advertisedAddress;
    }

    public String getWebServiceAddress() {
        return this.webServiceAddress;
    }

    public String getWebServiceAddressTls() {
        return this.webServiceAddressTls;
    }

    public String getBrokerServiceUrl() {
        return this.brokerServiceUrl;
    }

    public String getBrokerServiceUrlTls() {
        return this.brokerServiceUrlTls;
    }

    public String getBrokerVersion() {
        return this.brokerVersion;
    }

    public SchemaStorage getSchemaStorage() {
        return this.schemaStorage;
    }

    public SchemaRegistryService getSchemaRegistryService() {
        return this.schemaRegistryService;
    }

    public Optional<WorkerService> getFunctionWorkerService() {
        return this.functionWorkerService;
    }

    public ProtocolHandlers getProtocolHandlers() {
        return this.protocolHandlers;
    }

    public Consumer<Integer> getProcessTerminator() {
        return this.processTerminator;
    }

    public EventLoopGroup getIoEventLoopGroup() {
        return this.ioEventLoopGroup;
    }

    public ExecutorProvider getBrokerClientSharedInternalExecutorProvider() {
        return this.brokerClientSharedInternalExecutorProvider;
    }

    public ExecutorProvider getBrokerClientSharedExternalExecutorProvider() {
        return this.brokerClientSharedExternalExecutorProvider;
    }

    public ScheduledExecutorProvider getBrokerClientSharedScheduledExecutorProvider() {
        return this.brokerClientSharedScheduledExecutorProvider;
    }

    public Timer getBrokerClientSharedTimer() {
        return this.brokerClientSharedTimer;
    }

    public ExecutorProvider getBrokerClientSharedLookupExecutorProvider() {
        return this.brokerClientSharedLookupExecutorProvider;
    }

    public HashedWheelTimer getTransactionTimer() {
        return this.transactionTimer;
    }

    public BrokerInterceptor getBrokerInterceptor() {
        return this.brokerInterceptor;
    }

    public AdditionalServlets getBrokerAdditionalServlets() {
        return this.brokerAdditionalServlets;
    }

    public PulsarPrometheusMetricsServlet getMetricsServlet() {
        return this.metricsServlet;
    }

    public List<PrometheusRawMetricsProvider> getPendingMetricsProviders() {
        return this.pendingMetricsProviders;
    }

    public PulsarMetadataEventSynchronizer getLocalMetadataSynchronizer() {
        return this.localMetadataSynchronizer;
    }

    public TransactionBufferSnapshotServiceFactory getTransactionBufferSnapshotServiceFactory() {
        return this.transactionBufferSnapshotServiceFactory;
    }

    public MetadataStore getConfigurationMetadataStore() {
        return this.configurationMetadataStore;
    }

    public PulsarMetadataEventSynchronizer getConfigMetadataSynchronizer() {
        return this.configMetadataSynchronizer;
    }

    public boolean isShouldShutdownConfigurationMetadataStore() {
        return this.shouldShutdownConfigurationMetadataStore;
    }

    public PulsarResources getPulsarResources() {
        return this.pulsarResources;
    }

    public TransactionPendingAckStoreProvider getTransactionPendingAckStoreProvider() {
        return this.transactionPendingAckStoreProvider;
    }

    public CompletableFuture<Void> getReadyForIncomingRequestsFuture() {
        return this.readyForIncomingRequestsFuture;
    }

    public List<Runnable> getPendingTasksBeforeReadyForIncomingRequests() {
        return this.pendingTasksBeforeReadyForIncomingRequests;
    }

    public ReentrantLock getMutex() {
        return this.mutex;
    }

    public Condition getIsClosedCondition() {
        return this.isClosedCondition;
    }

    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    public Map<String, AdvertisedListener> getAdvertisedListeners() {
        return this.advertisedListeners;
    }

    protected void setNsService(NamespaceService namespaceService) {
        this.nsService = namespaceService;
    }

    protected void setManagedLedgerClientFactory(ManagedLedgerStorage managedLedgerStorage) {
        this.managedLedgerClientFactory = managedLedgerStorage;
    }

    protected void setLeaderElectionService(LeaderElectionService leaderElectionService) {
        this.leaderElectionService = leaderElectionService;
    }

    protected void setBrokerService(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    protected void setWebService(WebService webService) {
        this.webService = webService;
    }

    protected void setWebSocketService(WebSocketService webSocketService) {
        this.webSocketService = webSocketService;
    }

    protected void setTopicPoliciesService(TopicPoliciesService topicPoliciesService) {
        this.topicPoliciesService = topicPoliciesService;
    }

    protected void setBkClientFactory(BookKeeperClientFactory bookKeeperClientFactory) {
        this.bkClientFactory = bookKeeperClientFactory;
    }

    protected void setCompactionServiceFactory(CompactionServiceFactory compactionServiceFactory) {
        this.compactionServiceFactory = compactionServiceFactory;
    }

    protected void setStrategicCompactor(StrategicTwoPhaseCompactor strategicTwoPhaseCompactor) {
        this.strategicCompactor = strategicTwoPhaseCompactor;
    }

    protected void setResourceUsageTransportManager(ResourceUsageTransportManager resourceUsageTransportManager) {
        this.resourceUsageTransportManager = resourceUsageTransportManager;
    }

    protected void setResourceGroupServiceManager(ResourceGroupService resourceGroupService) {
        this.resourceGroupServiceManager = resourceGroupService;
    }

    protected void setOrderedExecutor(OrderedExecutor orderedExecutor) {
        this.orderedExecutor = orderedExecutor;
    }

    protected void setCompactorExecutor(ScheduledExecutorService scheduledExecutorService) {
        this.compactorExecutor = scheduledExecutorService;
    }

    protected void setOffloaderScheduler(OrderedScheduler orderedScheduler) {
        this.offloaderScheduler = orderedScheduler;
    }

    protected void setOffloadersCache(OffloadersCache offloadersCache) {
        this.offloadersCache = offloadersCache;
    }

    protected void setDefaultOffloader(LedgerOffloader ledgerOffloader) {
        this.defaultOffloader = ledgerOffloader;
    }

    protected void setOffloaderStats(LedgerOffloaderStats ledgerOffloaderStats) {
        this.offloaderStats = ledgerOffloaderStats;
    }

    protected void setLedgerOffloaderMap(Map<NamespaceName, LedgerOffloader> map) {
        this.ledgerOffloaderMap = map;
    }

    protected void setLoadReportTask(ScheduledFuture<?> scheduledFuture) {
        this.loadReportTask = scheduledFuture;
    }

    protected void setLoadSheddingTask(LoadSheddingTask loadSheddingTask) {
        this.loadSheddingTask = loadSheddingTask;
    }

    protected void setLoadResourceQuotaTask(ScheduledFuture<?> scheduledFuture) {
        this.loadResourceQuotaTask = scheduledFuture;
    }

    protected void setAdminClient(PulsarAdmin pulsarAdmin) {
        this.adminClient = pulsarAdmin;
    }

    protected void setClient(PulsarClient pulsarClient) {
        this.client = pulsarClient;
    }

    protected void setWebServiceAddress(String str) {
        this.webServiceAddress = str;
    }

    protected void setWebServiceAddressTls(String str) {
        this.webServiceAddressTls = str;
    }

    protected void setBrokerServiceUrl(String str) {
        this.brokerServiceUrl = str;
    }

    protected void setBrokerServiceUrlTls(String str) {
        this.brokerServiceUrlTls = str;
    }

    protected void setSchemaStorage(SchemaStorage schemaStorage) {
        this.schemaStorage = schemaStorage;
    }

    protected void setSchemaRegistryService(SchemaRegistryService schemaRegistryService) {
        this.schemaRegistryService = schemaRegistryService;
    }

    protected void setProtocolHandlers(ProtocolHandlers protocolHandlers) {
        this.protocolHandlers = protocolHandlers;
    }

    protected void setMetricsGenerator(MetricsGenerator metricsGenerator) {
        this.metricsGenerator = metricsGenerator;
    }

    protected void setTransactionMetadataStoreService(TransactionMetadataStoreService transactionMetadataStoreService) {
        this.transactionMetadataStoreService = transactionMetadataStoreService;
    }

    protected void setTransactionBufferProvider(TransactionBufferProvider transactionBufferProvider) {
        this.transactionBufferProvider = transactionBufferProvider;
    }

    protected void setTransactionBufferClient(TransactionBufferClient transactionBufferClient) {
        this.transactionBufferClient = transactionBufferClient;
    }

    protected void setTransactionTimer(HashedWheelTimer hashedWheelTimer) {
        this.transactionTimer = hashedWheelTimer;
    }

    protected void setBrokerInterceptor(BrokerInterceptor brokerInterceptor) {
        this.brokerInterceptor = brokerInterceptor;
    }

    protected void setBrokerAdditionalServlets(AdditionalServlets additionalServlets) {
        this.brokerAdditionalServlets = additionalServlets;
    }

    protected void setPackagesManagement(PackagesManagement packagesManagement) {
        this.packagesManagement = packagesManagement;
    }

    protected void setMetricsServlet(PulsarPrometheusMetricsServlet pulsarPrometheusMetricsServlet) {
        this.metricsServlet = pulsarPrometheusMetricsServlet;
    }

    protected void setPendingMetricsProviders(List<PrometheusRawMetricsProvider> list) {
        this.pendingMetricsProviders = list;
    }

    protected void setLocalMetadataStore(MetadataStoreExtended metadataStoreExtended) {
        this.localMetadataStore = metadataStoreExtended;
    }

    protected void setLocalMetadataSynchronizer(PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer) {
        this.localMetadataSynchronizer = pulsarMetadataEventSynchronizer;
    }

    protected void setCoordinationService(CoordinationService coordinationService) {
        this.coordinationService = coordinationService;
    }

    protected void setTransactionBufferSnapshotServiceFactory(TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory) {
        this.transactionBufferSnapshotServiceFactory = transactionBufferSnapshotServiceFactory;
    }

    protected void setConfigurationMetadataStore(MetadataStore metadataStore) {
        this.configurationMetadataStore = metadataStore;
    }

    protected void setConfigMetadataSynchronizer(PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer) {
        this.configMetadataSynchronizer = pulsarMetadataEventSynchronizer;
    }

    protected void setShouldShutdownConfigurationMetadataStore(boolean z) {
        this.shouldShutdownConfigurationMetadataStore = z;
    }

    protected void setPulsarResources(PulsarResources pulsarResources) {
        this.pulsarResources = pulsarResources;
    }

    protected void setTransactionPendingAckStoreProvider(TransactionPendingAckStoreProvider transactionPendingAckStoreProvider) {
        this.transactionPendingAckStoreProvider = transactionPendingAckStoreProvider;
    }

    protected void setBrokerId(String str) {
        this.brokerId = str;
    }

    protected void setState(State state) {
        this.state = state;
    }

    protected void setCloseFuture(CompletableFuture<Void> completableFuture) {
        this.closeFuture = completableFuture;
    }

    protected void setAdvertisedListeners(Map<String, AdvertisedListener> map) {
        this.advertisedListeners = map;
    }
}
