package org.apache.pulsar.zookeeper;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieNode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.Configuration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.class */
public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping implements ZooKeeperCacheListener<BookiesRackConfiguration>, RackChangeNotifier {
    public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
    public static final String ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE = "zk_data_cache_bk_rack_conf_instance";
    private ZooKeeperDataCache<BookiesRackConfiguration> bookieMappingCache = null;
    private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
    private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
    private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZkBookieRackAffinityMapping.class);
    private static final ObjectMapper jsonMapper = ObjectMapperFactory.create();

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.AbstractDNSToSwitchMapping, org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.Configurable
    public void setConf(Configuration configuration) {
        super.setConf(configuration);
        if (configuration.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE) != null) {
            this.bookieMappingCache = (ZooKeeperDataCache) configuration.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE);
            this.bookieMappingCache.registerListener(this);
        } else {
            this.bookieMappingCache = getAndSetZkCache(configuration);
            configuration.setProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE, this.bookieMappingCache);
        }
    }

    private void updateRacksWithHost(BookiesRackConfiguration bookiesRackConfiguration) {
        BookiesRackConfiguration bookiesRackConfiguration2 = new BookiesRackConfiguration();
        HashMap hashMap = new HashMap();
        bookiesRackConfiguration.forEach((str, map) -> {
            map.forEach((str, bookieInfo) -> {
                try {
                    BookieId parse = BookieId.parse(str);
                    BookieAddressResolver bookieAddressResolver = getBookieAddressResolver();
                    if (bookieAddressResolver == null) {
                        LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
                    } else {
                        BookieSocketAddress resolve = bookieAddressResolver.resolve(parse);
                        bookiesRackConfiguration2.updateBookie(str, resolve.toString(), bookieInfo);
                        hashMap.put(resolve.getSocketAddress().getHostName(), bookieInfo);
                        InetAddress address = resolve.getSocketAddress().getAddress();
                        if (null != address) {
                            String hostAddress = address.getHostAddress();
                            if (null != hostAddress) {
                                hashMap.put(hostAddress, bookieInfo);
                            }
                        } else {
                            LOG.info("Network address for {} is unresolvable yet.", str);
                        }
                    }
                } catch (BookieAddressResolver.BookieIdNotResolvedException e) {
                    LOG.info("Network address for {} is unresolvable yet. error is {}", str, e);
                }
            });
        });
        this.racksWithHost = bookiesRackConfiguration2;
        this.bookieInfoMap = hashMap;
    }

    private ZooKeeperDataCache<BookiesRackConfiguration> getAndSetZkCache(Configuration configuration) {
        ZooKeeperCache zooKeeperCache = null;
        if (configuration.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) != null) {
            zooKeeperCache = (ZooKeeperCache) configuration.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE);
        } else if (configuration instanceof ClientConfiguration) {
            int zkTimeout = ((ClientConfiguration) configuration).getZkTimeout();
            String zkServers = ((ClientConfiguration) configuration).getZkServers();
            String zkLedgersRootPath = ((ClientConfiguration) configuration).getZkLedgersRootPath();
            try {
                String substring = zkLedgersRootPath.substring(0, zkLedgersRootPath.contains("/") ? zkLedgersRootPath.lastIndexOf("/") : 0);
                zooKeeperCache = new ZooKeeperCache("bookies-racks", ZooKeeperClient.newBuilder().connectString(zkServers + (substring.startsWith("/") ? substring : "/" + substring)).sessionTimeoutMs(zkTimeout).build(), (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) { // from class: org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping.1
                };
                configuration.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zooKeeperCache);
            } catch (Exception e) {
                LOG.error("Error creating zookeeper client", (Throwable) e);
            }
        } else {
            LOG.error("No zk configurations available");
        }
        ZooKeeperDataCache<BookiesRackConfiguration> zkBookieRackMappingCache = getZkBookieRackMappingCache(zooKeeperCache);
        zkBookieRackMappingCache.registerListener(this);
        return zkBookieRackMappingCache;
    }

    private ZooKeeperDataCache<BookiesRackConfiguration> getZkBookieRackMappingCache(ZooKeeperCache zooKeeperCache) {
        return new ZooKeeperDataCache<BookiesRackConfiguration>(zooKeeperCache) { // from class: org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping.2
            @Override // org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer
            public BookiesRackConfiguration deserialize(String str, byte[] bArr) throws Exception {
                ZkBookieRackAffinityMapping.LOG.info("Reloading the bookie rack affinity mapping cache.");
                if (ZkBookieRackAffinityMapping.LOG.isDebugEnabled()) {
                    ZkBookieRackAffinityMapping.LOG.debug("Loading the bookie mappings with bookie info data: {}", new String(bArr));
                }
                return (BookiesRackConfiguration) ZkBookieRackAffinityMapping.jsonMapper.readValue(bArr, BookiesRackConfiguration.class);
            }
        };
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.DNSToSwitchMapping
    public List<String> resolve(List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(getRack(it.next()));
        }
        return arrayList;
    }

    private String getRack(String str) {
        try {
            Optional<BookiesRackConfiguration> optional = this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
            updateRacksWithHost(optional.orElseGet(BookiesRackConfiguration::new));
            if (!optional.isPresent()) {
                return null;
            }
            BookieInfo bookieInfo = this.bookieInfoMap.get(str);
            if (bookieInfo == null) {
                Optional<BookieInfo> bookie = this.racksWithHost.getBookie(str);
                if (bookie.isPresent()) {
                    bookieInfo = bookie.get();
                } else {
                    updateRacksWithHost(this.racksWithHost);
                    bookieInfo = this.bookieInfoMap.get(str);
                }
            }
            if (bookieInfo == null) {
                return null;
            }
            String rack = bookieInfo.getRack();
            if (!rack.startsWith("/")) {
                rack = "/" + rack;
            }
            return rack;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String toString() {
        return "zk based bookie rack affinity mapping";
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.DNSToSwitchMapping
    public void reloadCachedMappings() {
    }

    @Override // org.apache.pulsar.zookeeper.ZooKeeperCacheListener
    public void onUpdate(String str, BookiesRackConfiguration bookiesRackConfiguration, Stat stat) {
        if (this.rackawarePolicy != null) {
            LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", bookiesRackConfiguration.toString());
            ArrayList arrayList = new ArrayList();
            Iterator<Map<String, BookieInfo>> it = bookiesRackConfiguration.values().iterator();
            while (it.hasNext()) {
                Iterator<String> it2 = it.next().keySet().iterator();
                while (it2.hasNext()) {
                    arrayList.add(BookieId.parse(it2.next()));
                }
            }
            this.rackawarePolicy.onBookieRackChange(arrayList);
        }
    }

    @Override // org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.RackChangeNotifier
    public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> iTopologyAwareEnsemblePlacementPolicy) {
        this.rackawarePolicy = iTopologyAwareEnsemblePlacementPolicy;
    }
}
