package org.apache.bookkeeper.discover;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.5.jar:org/apache/bookkeeper/discover/ZKRegistrationClient.class */
public class ZKRegistrationClient implements RegistrationClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ZKRegistrationClient.class);
    static final int ZK_CONNECT_BACKOFF_MS = 200;
    private final ZooKeeper zk;
    private final ScheduledExecutorService scheduler;
    private WatchTask watchWritableBookiesTask = null;
    private WatchTask watchReadOnlyBookiesTask = null;
    private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache = new ConcurrentHashMap<>();
    private final Watcher bookieServiceInfoCacheInvalidation;
    private final boolean bookieAddressTracking;
    private final String bookieRegistrationPath;
    private final String bookieAllRegistrationPath;
    private final String bookieReadonlyRegistrationPath;

    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.5.jar:org/apache/bookkeeper/discover/ZKRegistrationClient$BookieServiceInfoCacheInvalidationWatcher.class */
    private class BookieServiceInfoCacheInvalidationWatcher implements Watcher {
        private BookieServiceInfoCacheInvalidationWatcher() {
        }

        @Override // org.apache.zookeeper.Watcher
        public void process(WatchedEvent watchedEvent) {
            if (ZKRegistrationClient.log.isDebugEnabled()) {
                ZKRegistrationClient.log.debug("zk event {} for {} state {}", watchedEvent.getType(), watchedEvent.getPath(), watchedEvent.getState());
            }
            if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                ZKRegistrationClient.log.info("zk session expired, invalidating cache");
                ZKRegistrationClient.this.bookieServiceInfoCache.clear();
                return;
            }
            BookieId stripBookieIdFromPath = ZKRegistrationClient.stripBookieIdFromPath(watchedEvent.getPath());
            if (stripBookieIdFromPath == null) {
                return;
            }
            switch (watchedEvent.getType()) {
                case NodeDeleted:
                    ZKRegistrationClient.log.info("Invalidate cache for {}", stripBookieIdFromPath);
                    ZKRegistrationClient.this.bookieServiceInfoCache.remove(stripBookieIdFromPath);
                    return;
                case NodeDataChanged:
                    ZKRegistrationClient.log.info("refresh cache for {}", stripBookieIdFromPath);
                    ZKRegistrationClient.this.readBookieServiceInfoAsync(stripBookieIdFromPath);
                    return;
                default:
                    if (ZKRegistrationClient.log.isDebugEnabled()) {
                        ZKRegistrationClient.log.debug("ignore cache event {} for {}", watchedEvent.getType(), stripBookieIdFromPath);
                        return;
                    }
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.16.5.jar:org/apache/bookkeeper/discover/ZKRegistrationClient$WatchTask.class */
    public class WatchTask implements Runnable, Watcher, BiConsumer<Versioned<Set<BookieId>>, Throwable>, AutoCloseable {
        private final String regPath;
        private final CompletableFuture<Void> firstRunFuture;
        private volatile boolean closed = false;
        private Set<BookieId> bookies = null;
        private Version version = Version.NEW;
        private final Set<RegistrationClient.RegistrationListener> listeners = new CopyOnWriteArraySet();

        WatchTask(String str, CompletableFuture<Void> completableFuture) {
            this.regPath = str;
            this.firstRunFuture = completableFuture;
        }

        public int getNumListeners() {
            return this.listeners.size();
        }

        public boolean addListener(RegistrationClient.RegistrationListener registrationListener) {
            if (!this.listeners.add(registrationListener) || null == this.bookies) {
                return true;
            }
            ZKRegistrationClient.this.scheduler.execute(() -> {
                registrationListener.onBookiesChanged(new Versioned<>(this.bookies, this.version));
            });
            return true;
        }

        public boolean removeListener(RegistrationClient.RegistrationListener registrationListener) {
            return this.listeners.remove(registrationListener);
        }

        void watch() {
            scheduleWatchTask(0L);
        }

        private void scheduleWatchTask(long j) {
            try {
                ZKRegistrationClient.this.scheduler.schedule(this, j, TimeUnit.MILLISECONDS);
            } catch (RejectedExecutionException e) {
                ZKRegistrationClient.log.warn("Failed to schedule watch bookies task", (Throwable) e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            if (isClosed()) {
                return;
            }
            ZKRegistrationClient.this.getChildren(this.regPath, this).whenCompleteAsync((BiConsumer) this, (Executor) ZKRegistrationClient.this.scheduler);
        }

        @Override // java.util.function.BiConsumer
        public void accept(Versioned<Set<BookieId>> versioned, Throwable th) {
            if (th != null) {
                if (this.firstRunFuture.isDone()) {
                    scheduleWatchTask(200L);
                    return;
                } else {
                    this.firstRunFuture.completeExceptionally(th);
                    return;
                }
            }
            if (this.version.compare(versioned.getVersion()) == Version.Occurred.BEFORE) {
                this.version = versioned.getVersion();
                this.bookies = versioned.getValue();
                if (!this.listeners.isEmpty()) {
                    Iterator<RegistrationClient.RegistrationListener> it = this.listeners.iterator();
                    while (it.hasNext()) {
                        it.next().onBookiesChanged(versioned);
                    }
                }
            }
            FutureUtils.complete(this.firstRunFuture, null);
        }

        @Override // org.apache.zookeeper.Watcher
        public void process(WatchedEvent watchedEvent) {
            if (Watcher.Event.EventType.None != watchedEvent.getType()) {
                scheduleWatchTask(0L);
            } else if (Watcher.Event.KeeperState.Expired == watchedEvent.getState()) {
                scheduleWatchTask(200L);
            }
        }

        boolean isClosed() {
            return this.closed;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.closed = true;
        }
    }

    public ZKRegistrationClient(ZooKeeper zooKeeper, String str, ScheduledExecutorService scheduledExecutorService, boolean z) {
        this.zk = zooKeeper;
        this.scheduler = scheduledExecutorService;
        this.bookieAddressTracking = z;
        this.bookieServiceInfoCacheInvalidation = z ? new BookieServiceInfoCacheInvalidationWatcher() : null;
        this.bookieRegistrationPath = str + "/" + BookKeeperConstants.AVAILABLE_NODE;
        this.bookieAllRegistrationPath = str + "/" + BookKeeperConstants.COOKIE_NODE;
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + BookKeeperConstants.READONLY;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient, java.lang.AutoCloseable
    public void close() {
    }

    public boolean isBookieAddressTracking() {
        return this.bookieAddressTracking;
    }

    public ZooKeeper getZk() {
        return this.zk;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
        return getChildren(this.bookieRegistrationPath, null);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
        return getChildren(this.bookieAllRegistrationPath, null);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
        return getChildren(this.bookieReadonlyRegistrationPath, null);
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
        Versioned<BookieServiceInfo> versioned = this.bookieServiceInfoCache.get(bookieId);
        if (log.isDebugEnabled()) {
            log.debug("getBookieServiceInfo {} -> {}", bookieId, versioned);
        }
        return versioned != null ? CompletableFuture.completedFuture(versioned) : FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
        String str = this.bookieRegistrationPath + "/" + bookieId;
        String str2 = this.bookieReadonlyRegistrationPath + "/" + bookieId;
        CompletableFuture<Versioned<BookieServiceInfo>> completableFuture = new CompletableFuture<>();
        this.zk.getData(str, this.bookieServiceInfoCacheInvalidation, (i, str3, obj, bArr, stat) -> {
            if (KeeperException.Code.OK.intValue() != i) {
                if (KeeperException.Code.NONODE.intValue() == i) {
                    this.zk.getData(str2, this.bookieServiceInfoCacheInvalidation, (i, str3, obj, bArr, stat) -> {
                        if (KeeperException.Code.OK.intValue() != i) {
                            completableFuture.completeExceptionally(BKException.create(-3));
                            return;
                        }
                        try {
                            Versioned<BookieServiceInfo> versioned = new Versioned<>(deserializeBookieServiceInfo(bookieId, bArr), new LongVersion(stat.getCversion()));
                            log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, versioned.getValue());
                            this.bookieServiceInfoCache.put(bookieId, versioned);
                            completableFuture.complete(versioned);
                        } catch (IOException e) {
                            log.error("Cannot update BookieInfo for ", (Throwable) e);
                            completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i), str3).initCause(e));
                        }
                    }, (Object) null);
                    return;
                } else {
                    completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i), str3));
                    return;
                }
            }
            try {
                Versioned<BookieServiceInfo> versioned = new Versioned<>(deserializeBookieServiceInfo(bookieId, bArr), new LongVersion(stat.getCversion()));
                log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, versioned.getValue());
                this.bookieServiceInfoCache.put(bookieId, versioned);
                completableFuture.complete(versioned);
            } catch (IOException e) {
                log.error("Cannot update BookieInfo for ", (Throwable) e);
                completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i), str3).initCause(e));
            }
        }, (Object) null);
        return completableFuture;
    }

    @VisibleForTesting
    static BookieServiceInfo deserializeBookieServiceInfo(BookieId bookieId, byte[] bArr) throws IOException {
        if (bArr == null || bArr.length == 0) {
            return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId.toString());
        }
        DataFormats.BookieServiceInfoFormat parseFrom = DataFormats.BookieServiceInfoFormat.parseFrom(bArr);
        BookieServiceInfo bookieServiceInfo = new BookieServiceInfo();
        bookieServiceInfo.setEndpoints((List) parseFrom.getEndpointsList().stream().map(endpoint -> {
            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
            endpoint.setId(endpoint.getId());
            endpoint.setPort(endpoint.getPort());
            endpoint.setHost(endpoint.getHost());
            endpoint.setProtocol(endpoint.getProtocol());
            endpoint.setAuth(endpoint.getAuthList());
            endpoint.setExtensions(endpoint.getExtensionsList());
            return endpoint;
        }).collect(Collectors.toList()));
        bookieServiceInfo.setProperties(parseFrom.getPropertiesMap());
        return bookieServiceInfo;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Versioned<Set<BookieId>>> getChildren(String str, Watcher watcher) {
        CompletableFuture<Versioned<Set<BookieId>>> createFuture = FutureUtils.createFuture();
        this.zk.getChildren(str, watcher, (i, str2, obj, list, stat) -> {
            if (KeeperException.Code.OK.intValue() != i) {
                createFuture.completeExceptionally(new BKException.ZKException(KeeperException.create(KeeperException.Code.get(i), str2)).fillInStackTrace());
                return;
            }
            LongVersion longVersion = new LongVersion(stat.getCversion());
            HashSet<BookieId> convertToBookieAddresses = convertToBookieAddresses(list);
            ArrayList arrayList = new ArrayList(convertToBookieAddresses.size());
            for (BookieId bookieId : convertToBookieAddresses) {
                if (!this.bookieServiceInfoCache.containsKey(bookieId)) {
                    arrayList.add(readBookieServiceInfoAsync(bookieId));
                }
            }
            if (arrayList.isEmpty()) {
                createFuture.complete(new Versioned(convertToBookieAddresses, longVersion));
            } else {
                FutureUtils.collect(arrayList).whenComplete((list, th) -> {
                    createFuture.complete(new Versioned(convertToBookieAddresses, longVersion));
                });
            }
        }, (Object) null);
        return createFuture;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public synchronized CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        CompletableFuture<Void> completableFuture;
        if (null == this.watchWritableBookiesTask) {
            CompletableFuture completableFuture2 = new CompletableFuture();
            this.watchWritableBookiesTask = new WatchTask(this.bookieRegistrationPath, completableFuture2);
            completableFuture = completableFuture2.whenComplete((r5, th) -> {
                if (null != th) {
                    unwatchWritableBookies(registrationListener);
                }
            });
        } else {
            completableFuture = this.watchWritableBookiesTask.firstRunFuture;
        }
        this.watchWritableBookiesTask.addListener(registrationListener);
        if (this.watchWritableBookiesTask.getNumListeners() == 1) {
            this.watchWritableBookiesTask.watch();
        }
        return completableFuture;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public synchronized void unwatchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        if (null == this.watchWritableBookiesTask) {
            return;
        }
        this.watchWritableBookiesTask.removeListener(registrationListener);
        if (this.watchWritableBookiesTask.getNumListeners() == 0) {
            this.watchWritableBookiesTask.close();
            this.watchWritableBookiesTask = null;
        }
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public synchronized CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        CompletableFuture<Void> completableFuture;
        if (null == this.watchReadOnlyBookiesTask) {
            CompletableFuture completableFuture2 = new CompletableFuture();
            this.watchReadOnlyBookiesTask = new WatchTask(this.bookieReadonlyRegistrationPath, completableFuture2);
            completableFuture = completableFuture2.whenComplete((r5, th) -> {
                if (null != th) {
                    unwatchReadOnlyBookies(registrationListener);
                }
            });
        } else {
            completableFuture = this.watchReadOnlyBookiesTask.firstRunFuture;
        }
        this.watchReadOnlyBookiesTask.addListener(registrationListener);
        if (this.watchReadOnlyBookiesTask.getNumListeners() == 1) {
            this.watchReadOnlyBookiesTask.watch();
        }
        return completableFuture;
    }

    @Override // org.apache.bookkeeper.discover.RegistrationClient
    public synchronized void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        if (null == this.watchReadOnlyBookiesTask) {
            return;
        }
        this.watchReadOnlyBookiesTask.removeListener(registrationListener);
        if (this.watchReadOnlyBookiesTask.getNumListeners() == 0) {
            this.watchReadOnlyBookiesTask.close();
            this.watchReadOnlyBookiesTask = null;
        }
    }

    private static HashSet<BookieId> convertToBookieAddresses(List<String> list) {
        HashSet<BookieId> newHashSet = Sets.newHashSet();
        for (String str : list) {
            if (!BookKeeperConstants.READONLY.equals(str)) {
                newHashSet.add(BookieId.parse(str));
            }
        }
        return newHashSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BookieId stripBookieIdFromPath(String str) {
        int lastIndexOf;
        if (str == null || (lastIndexOf = str.lastIndexOf(47)) < 0) {
            return null;
        }
        try {
            return BookieId.parse(str.substring(lastIndexOf + 1));
        } catch (IllegalArgumentException e) {
            log.warn("Cannot decode bookieId from {}", str, e);
            return null;
        }
    }

    WatchTask getWatchWritableBookiesTask() {
        return this.watchWritableBookiesTask;
    }

    WatchTask getWatchReadOnlyBookiesTask() {
        return this.watchReadOnlyBookiesTask;
    }
}
