package org.apache.pulsar.sql.presto;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.shade.org.glassfish.hk2.utilities.BuilderHelper;

/* loaded from: input_file:org/apache/pulsar/sql/presto/PulsarConnectorCache.class */
public class PulsarConnectorCache {
    private static final Logger log = Logger.get(PulsarConnectorCache.class);

    @VisibleForTesting
    static PulsarConnectorCache instance;
    private final MetadataStoreExtended metadataStore;
    private final ManagedLedgerFactory managedLedgerFactory;
    private final StatsProvider statsProvider;
    private OrderedScheduler offloaderScheduler;
    private final LedgerOffloaderStats offloaderStats;
    private LedgerOffloader defaultOffloader;
    private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
    private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
    private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";
    private OffloadersCache offloadersCache = new OffloadersCache();
    private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap();

    private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(), MetadataStoreConfig.builder().build());
        this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
        this.statsProvider = (StatsProvider) PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(), StatsProvider.class, getClass().getClassLoader());
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        Map<String, String> statsProviderConfigs = pulsarConnectorConfig.getStatsProviderConfigs();
        Objects.requireNonNull(clientConfiguration);
        statsProviderConfigs.forEach((v1, v2) -> {
            r1.setProperty(v1, v2);
        });
        this.statsProvider.start(clientConfiguration);
        initOffloaderScheduler(pulsarConnectorConfig.getOffloadPolices());
        int managedLedgerStatsPeriodSeconds = pulsarConnectorConfig.getManagedLedgerStatsPeriodSeconds();
        this.offloaderStats = LedgerOffloaderStats.create(pulsarConnectorConfig.isExposeManagedLedgerMetricsInPrometheus(), pulsarConnectorConfig.isExposeTopicLevelMetricsInPrometheus(), this.offloaderScheduler, managedLedgerStatsPeriodSeconds);
        this.defaultOffloader = initManagedLedgerOffloader(pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
    }

    public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        synchronized (PulsarConnectorCache.class) {
            if (instance == null) {
                instance = new PulsarConnectorCache(pulsarConnectorConfig);
            }
        }
        return instance;
    }

    private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        ClientConfiguration nettyMaxFrameSizeBytes = new ClientConfiguration().setZkServers(pulsarConnectorConfig.getZookeeperUri()).setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri().replace(",", BuilderHelper.TOKEN_SEPARATOR) + "/ledgers").setClientTcpNoDelay(false).setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol()).setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval()).setStickyReadsEnabled(false).setReadEntryTimeout(60).setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()).setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()).setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()).setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + 10240);
        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
        managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
        managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
        return new ManagedLedgerFactoryImpl(this.metadataStore, nettyMaxFrameSizeBytes, managedLedgerFactoryConfig);
    }

    public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPoliciesImpl offloadPoliciesImpl, PulsarConnectorConfig pulsarConnectorConfig) {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        if (offloadPoliciesImpl == null) {
            managedLedgerConfig.setLedgerOffloader(this.defaultOffloader);
        } else {
            managedLedgerConfig.setLedgerOffloader(this.offloaderMap.compute(namespaceName, (namespaceName2, ledgerOffloader) -> {
                if (ledgerOffloader != null && Objects.equals(ledgerOffloader.getOffloadPolicies(), offloadPoliciesImpl)) {
                    return ledgerOffloader;
                }
                if (ledgerOffloader != null) {
                    ledgerOffloader.close();
                }
                return initManagedLedgerOffloader(offloadPoliciesImpl, pulsarConnectorConfig);
            }));
        }
        return managedLedgerConfig;
    }

    private void initOffloaderScheduler(OffloadPoliciesImpl offloadPoliciesImpl) {
        this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder().numThreads(offloadPoliciesImpl.getManagedLedgerOffloadMaxThreads().intValue()).name("pulsar-offloader").build();
    }

    private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPoliciesImpl, PulsarConnectorConfig pulsarConnectorConfig) {
        try {
            if (!StringUtils.isNotBlank(offloadPoliciesImpl.getManagedLedgerOffloadDriver())) {
                log.info("No ledger offloader configured, using NULL instance");
                return NullLedgerOffloader.INSTANCE;
            }
            Preconditions.checkNotNull(offloadPoliciesImpl.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPoliciesImpl.getManagedLedgerOffloadDriver());
            try {
                return this.offloadersCache.getOrLoadOffloaders(offloadPoliciesImpl.getOffloadersDirectory(), pulsarConnectorConfig.getNarExtractionDirectory()).getOffloaderFactory(offloadPoliciesImpl.getManagedLedgerOffloadDriver()).create(offloadPoliciesImpl, ImmutableMap.of(LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()), this.offloaderScheduler, this.offloaderStats);
            } catch (IOException e) {
                log.error("Failed to create offloader: ", new Object[]{e});
                throw new RuntimeException(e.getMessage(), e.getCause());
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    public StatsProvider getStatsProvider() {
        return this.statsProvider;
    }

    public static void shutdown() throws Exception {
        synchronized (PulsarConnectorCache.class) {
            if (instance != null) {
                instance.statsProvider.stop();
                instance.managedLedgerFactory.shutdown();
                instance.metadataStore.close();
                instance.offloaderScheduler.shutdown();
                instance.offloadersCache.close();
            }
        }
    }
}
