package org.apache.pulsar.metadata.bookkeeper;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.5.4.jar:org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.class */
public class PulsarRegistrationClient implements RegistrationClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarRegistrationClient.class);
    private final MetadataStore store;
    private final String ledgersRootPath;
    private final String bookieRegistrationPath;
    private final String bookieAllRegistrationPath;
    private final String bookieReadonlyRegistrationPath;
    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
    private final Set<RegistrationClient.RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet();
    private final Set<RegistrationClient.RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet();
    private final FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
    private final Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo = new ConcurrentHashMap();
    private final Map<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo = new ConcurrentHashMap();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));

    public PulsarRegistrationClient(MetadataStore metadataStore, String str) {
        this.store = metadataStore;
        this.ledgersRootPath = str;
        this.bookieServiceInfoMetadataCache = metadataStore.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
        this.bookieRegistrationPath = str + NodeBase.PATH_SEPARATOR_STR + BookKeeperConstants.AVAILABLE_NODE;
        this.bookieAllRegistrationPath = str + NodeBase.PATH_SEPARATOR_STR + BookKeeperConstants.COOKIE_NODE;
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + NodeBase.PATH_SEPARATOR_STR + BookKeeperConstants.READONLY;
        metadataStore.registerListener(this::updatedBookies);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient, java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
        return getBookiesThenFreshCache(this.bookieRegistrationPath);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
        return getBookiesThenFreshCache(this.bookieAllRegistrationPath);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
        return getBookiesThenFreshCache(this.bookieReadonlyRegistrationPath);
    }

    private CompletableFuture<Versioned<Set<BookieId>>> getBookiesThenFreshCache(String str) {
        return (str == null || str.isEmpty()) ? FutureUtil.failedFuture(new IllegalArgumentException("parameter [path] can not be null or empty.")) : this.store.getChildren(str).thenComposeAsync(list -> {
            Set<BookieId> convertToBookieAddresses = convertToBookieAddresses(list);
            ArrayList arrayList = new ArrayList(convertToBookieAddresses.size());
            for (BookieId bookieId : convertToBookieAddresses) {
                if (str.equals(this.bookieReadonlyRegistrationPath) && this.readOnlyBookieInfo.get(bookieId) == null) {
                    arrayList.add(readBookieInfoAsReadonlyBookie(bookieId));
                } else if (str.equals(this.bookieRegistrationPath) && this.writableBookieInfo.get(bookieId) == null) {
                    arrayList.add(readBookieInfoAsWritableBookie(bookieId));
                } else if (str.equals(this.bookieAllRegistrationPath) && this.writableBookieInfo.get(bookieId) == null && this.readOnlyBookieInfo.get(bookieId) == null) {
                    arrayList.add(readBookieInfoAsWritableBookie(bookieId).thenCompose(optional -> {
                        return (CompletionStage) optional.map(cacheGetResult -> {
                            return CompletableFuture.completedFuture(null);
                        }).orElseGet(() -> {
                            return readBookieInfoAsReadonlyBookie(bookieId);
                        });
                    }));
                }
            }
            return arrayList.isEmpty() ? CompletableFuture.completedFuture(convertToBookieAddresses) : FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList).thenApply(r3 -> {
                return convertToBookieAddresses;
            });
        }).thenApply((Function<? super U, ? extends U>) set -> {
            return new Versioned(set, Version.NEW);
        });
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.add(registrationListener);
        CompletableFuture<Versioned<Set<BookieId>>> writableBookies = getWritableBookies();
        Objects.requireNonNull(registrationListener);
        return writableBookies.thenAcceptAsync(registrationListener::onBookiesChanged, (Executor) this.executor);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public void unwatchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.remove(registrationListener);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.add(registrationListener);
        CompletableFuture<Versioned<Set<BookieId>>> readOnlyBookies = getReadOnlyBookies();
        Objects.requireNonNull(registrationListener);
        return readOnlyBookies.thenAcceptAsync(registrationListener::onBookiesChanged, (Executor) this.executor);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.remove(registrationListener);
    }

    private void updatedBookies(Notification notification) {
        String path = notification.getPath();
        if ((!path.startsWith(this.bookieReadonlyRegistrationPath) && !path.startsWith(this.bookieRegistrationPath)) || path.equals(this.bookieReadonlyRegistrationPath) || path.equals(this.bookieRegistrationPath)) {
            return;
        }
        BookieId stripBookieIdFromPath = stripBookieIdFromPath(notification.getPath());
        this.sequencer.sequential(() -> {
            switch (notification.getType()) {
                case Created:
                    log.info("Bookie {} created. path: {}", stripBookieIdFromPath, notification.getPath());
                    return path.startsWith(this.bookieReadonlyRegistrationPath) ? getReadOnlyBookies().thenAccept(versioned -> {
                        this.readOnlyBookiesWatchers.forEach(registrationListener -> {
                            this.executor.execute(() -> {
                                registrationListener.onBookiesChanged(versioned);
                            });
                        });
                    }) : getWritableBookies().thenAccept(versioned2 -> {
                        this.writableBookiesWatchers.forEach(registrationListener -> {
                            this.executor.execute(() -> {
                                registrationListener.onBookiesChanged(versioned2);
                            });
                        });
                    });
                case Modified:
                    if (stripBookieIdFromPath == null) {
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("Bookie {} modified. path: {}", stripBookieIdFromPath, notification.getPath());
                    return path.startsWith(this.bookieReadonlyRegistrationPath) ? readBookieInfoAsReadonlyBookie(stripBookieIdFromPath).thenApply(optional -> {
                        return null;
                    }) : readBookieInfoAsWritableBookie(stripBookieIdFromPath).thenApply(optional2 -> {
                        return null;
                    });
                case Deleted:
                    if (stripBookieIdFromPath == null) {
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("Bookie {} deleted. path: {}", stripBookieIdFromPath, notification.getPath());
                    if (path.startsWith(this.bookieReadonlyRegistrationPath)) {
                        this.readOnlyBookieInfo.remove(stripBookieIdFromPath);
                        return getReadOnlyBookies().thenAccept(versioned3 -> {
                            this.readOnlyBookiesWatchers.forEach(registrationListener -> {
                                this.executor.execute(() -> {
                                    registrationListener.onBookiesChanged(versioned3);
                                });
                            });
                        });
                    }
                    if (!path.startsWith(this.bookieRegistrationPath)) {
                        return CompletableFuture.completedFuture(null);
                    }
                    this.writableBookieInfo.remove(stripBookieIdFromPath);
                    return getWritableBookies().thenAccept(versioned4 -> {
                        this.writableBookiesWatchers.forEach(registrationListener -> {
                            this.executor.execute(() -> {
                                registrationListener.onBookiesChanged(versioned4);
                            });
                        });
                    });
                default:
                    return CompletableFuture.completedFuture(null);
            }
        });
    }

    private static BookieId stripBookieIdFromPath(String str) {
        int lastIndexOf;
        if (str == null || (lastIndexOf = str.lastIndexOf(47)) < 0) {
            return null;
        }
        try {
            return BookieId.parse(str.substring(lastIndexOf + 1));
        } catch (IllegalArgumentException e) {
            log.warn("Cannot decode bookieId from {}, error: {}", str, e.getMessage());
            return null;
        }
    }

    private static Set<BookieId> convertToBookieAddresses(List<String> list) {
        HashSet hashSet = new HashSet();
        for (String str : list) {
            if (!BookKeeperConstants.READONLY.equals(str)) {
                hashSet.add(BookieId.parse(str));
            }
        }
        return hashSet;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
        Versioned<BookieServiceInfo> versioned = this.writableBookieInfo.get(bookieId);
        Versioned<BookieServiceInfo> versioned2 = versioned;
        if (versioned == null) {
            versioned2 = this.readOnlyBookieInfo.get(bookieId);
        }
        if (log.isDebugEnabled()) {
            log.debug("getBookieServiceInfo {} -> {}", bookieId, versioned2);
        }
        return versioned2 != null ? CompletableFuture.completedFuture(versioned2) : FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
    }

    public CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsWritableBookie(BookieId bookieId) {
        return this.bookieServiceInfoMetadataCache.getWithStats(this.bookieRegistrationPath + NodeBase.PATH_SEPARATOR_STR + bookieId).thenApply(optional -> {
            if (optional.isPresent()) {
                CacheGetResult cacheGetResult = (CacheGetResult) optional.get();
                log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, cacheGetResult.getValue());
                this.writableBookieInfo.put(bookieId, new Versioned<>((BookieServiceInfo) cacheGetResult.getValue(), new LongVersion(cacheGetResult.getStat().getVersion())));
            }
            return optional;
        });
    }

    final CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
        return this.bookieServiceInfoMetadataCache.getWithStats(this.bookieReadonlyRegistrationPath + NodeBase.PATH_SEPARATOR_STR + bookieId).thenApply(optional -> {
            if (optional.isPresent()) {
                CacheGetResult cacheGetResult = (CacheGetResult) optional.get();
                log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, cacheGetResult.getValue());
                this.readOnlyBookieInfo.put(bookieId, new Versioned<>((BookieServiceInfo) cacheGetResult.getValue(), new LongVersion(cacheGetResult.getStat().getVersion())));
            }
            return optional;
        });
    }
}
