package org.apache.pulsar.shade.org.apache.bookkeeper.discover;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.bookie.BookieException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.LayoutManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLayoutManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.DataFormats;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.LongVersion;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.Version;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.Op;
import org.apache.pulsar.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZKUtil;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.ACL;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/shade/org/apache/bookkeeper/discover/ZKRegistrationManager.class */
public class ZKRegistrationManager implements RegistrationManager {
    private static final Logger log = LoggerFactory.getLogger(ZKRegistrationManager.class);
    private static final Function<Throwable, BKException> EXCEPTION_FUNC = th -> {
        if (th instanceof BKException) {
            log.error("Failed to get bookie list : ", th);
            return (BKException) th;
        }
        if (!(th instanceof InterruptedException)) {
            return new BKException.MetaStoreException();
        }
        log.error("Interrupted reading bookie list : ", th);
        return new BKException.BKInterruptedException();
    };
    private final ServerConfiguration conf;
    private final ZooKeeper zk;
    private final List<ACL> zkAcls;
    private final LayoutManager layoutManager;
    private volatile boolean zkRegManagerInitialized;
    private final String ledgersRootPath;
    private final String cookiePath;
    protected final String bookieRegistrationPath;
    protected final String bookieReadonlyRegistrationPath;
    private final int zkTimeoutMs;

    public ZKRegistrationManager(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, RegistrationManager.RegistrationListener registrationListener) {
        this(serverConfiguration, zooKeeper, ZKMetadataDriverBase.resolveZkLedgersRootPath(serverConfiguration), registrationListener);
    }

    public ZKRegistrationManager(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, String str, RegistrationManager.RegistrationListener registrationListener) {
        this.zkRegManagerInitialized = false;
        this.conf = serverConfiguration;
        this.zk = zooKeeper;
        this.zkAcls = ZkUtils.getACLs(serverConfiguration);
        this.ledgersRootPath = str;
        this.cookiePath = str + "/" + BookKeeperConstants.COOKIE_NODE;
        this.bookieRegistrationPath = str + "/" + BookKeeperConstants.AVAILABLE_NODE;
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + BookKeeperConstants.READONLY;
        this.zkTimeoutMs = serverConfiguration.getZkTimeout();
        this.layoutManager = new ZkLayoutManager(zooKeeper, str, this.zkAcls);
        this.zk.register(watchedEvent -> {
            if (this.zkRegManagerInitialized && watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.Expired)) {
                registrationListener.onRegistrationExpired();
            }
        });
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager, java.lang.AutoCloseable
    public void close() {
    }

    public String getCookiePath(BookieId bookieId) {
        return this.cookiePath + "/" + bookieId;
    }

    protected boolean checkRegNodeAndWaitExpired(String str) throws IOException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            Stat exists = this.zk.exists(str, new Watcher() { // from class: org.apache.pulsar.shade.org.apache.bookkeeper.discover.ZKRegistrationManager.1
                @Override // org.apache.pulsar.shade.org.apache.zookeeper.Watcher
                public void process(WatchedEvent watchedEvent) {
                    if (Watcher.Event.EventType.NodeDeleted == watchedEvent.getType()) {
                        countDownLatch.countDown();
                    }
                }
            });
            if (null == exists) {
                return false;
            }
            if (exists.getEphemeralOwner() == this.zk.getSessionId()) {
                return true;
            }
            log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout: {} ms for znode deletion", str, Integer.valueOf(this.zkTimeoutMs));
            if (countDownLatch.await(this.zkTimeoutMs, TimeUnit.MILLISECONDS)) {
                return false;
            }
            throw new KeeperException.NodeExistsException(str);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Interrupted checking and wait ephemeral znode {} expired : ", str, e);
            throw new IOException("Interrupted checking and wait ephemeral znode " + str + " expired", e);
        } catch (KeeperException e2) {
            log.error("ZK exception checking and wait ephemeral znode {} expired : ", str, e2);
            throw new IOException("ZK exception checking and wait ephemeral znode " + str + " expired", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public void registerBookie(BookieId bookieId, boolean z, BookieServiceInfo bookieServiceInfo) throws BookieException {
        if (z) {
            doRegisterReadOnlyBookie(bookieId, bookieServiceInfo);
        } else {
            doRegisterBookie(this.bookieRegistrationPath + "/" + bookieId, bookieServiceInfo);
        }
    }

    @VisibleForTesting
    static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
        if (log.isDebugEnabled()) {
            log.debug("serialize BookieServiceInfo {}", bookieServiceInfo);
        }
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                DataFormats.BookieServiceInfoFormat.Builder newBuilder = DataFormats.BookieServiceInfoFormat.newBuilder();
                newBuilder.addAllEndpoints((List) bookieServiceInfo.getEndpoints().stream().map(endpoint -> {
                    return DataFormats.BookieServiceInfoFormat.Endpoint.newBuilder().setId(endpoint.getId()).setPort(endpoint.getPort()).setHost(endpoint.getHost()).setProtocol(endpoint.getProtocol()).addAllAuth(endpoint.getAuth()).addAllExtensions(endpoint.getExtensions()).build();
                }).collect(Collectors.toList()));
                newBuilder.putAllProperties(bookieServiceInfo.getProperties());
                newBuilder.build().writeTo(byteArrayOutputStream);
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                byteArrayOutputStream.close();
                return byteArray;
            } finally {
            }
        } catch (IOException e) {
            log.error("Cannot serialize bookieServiceInfo from " + bookieServiceInfo);
            throw new RuntimeException(e);
        }
    }

    private void doRegisterBookie(String str, BookieServiceInfo bookieServiceInfo) throws BookieException {
        try {
            if (!checkRegNodeAndWaitExpired(str)) {
                this.zk.create(str, serializeBookieServiceInfo(bookieServiceInfo), this.zkAcls, CreateMode.EPHEMERAL);
                this.zkRegManagerInitialized = true;
            }
        } catch (IOException e) {
            throw new BookieException.MetadataStoreException(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            log.error("Interrupted exception registering ephemeral Znode for Bookie!", e2);
            throw new BookieException.MetadataStoreException(e2);
        } catch (KeeperException e3) {
            log.error("ZK exception registering ephemeral Znode for Bookie!", e3);
            throw new BookieException.MetadataStoreException(e3);
        }
    }

    private void doRegisterReadOnlyBookie(BookieId bookieId, BookieServiceInfo bookieServiceInfo) throws BookieException {
        try {
            if (null == this.zk.exists(this.bookieReadonlyRegistrationPath, false)) {
                try {
                    this.zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo), this.zkAcls, CreateMode.PERSISTENT);
                } catch (KeeperException.NodeExistsException e) {
                }
            }
            doRegisterBookie(this.bookieReadonlyRegistrationPath + "/" + bookieId, bookieServiceInfo);
            String str = this.bookieRegistrationPath + "/" + bookieId;
            try {
                this.zk.delete(str, -1);
            } catch (KeeperException.NoNodeException e2) {
                log.warn("No writable bookie registered node {} when transitioning to readonly", str, e2);
            }
        } catch (InterruptedException | KeeperException e3) {
            throw new BookieException.MetadataStoreException(e3);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public void unregisterBookie(BookieId bookieId, boolean z) throws BookieException {
        doUnregisterBookie(!z ? this.bookieRegistrationPath + "/" + bookieId : this.bookieReadonlyRegistrationPath + "/" + bookieId);
    }

    private void doUnregisterBookie(String str) throws BookieException {
        try {
            this.zk.delete(str, -1);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new BookieException.MetadataStoreException(e);
        } catch (KeeperException e2) {
            throw new BookieException.MetadataStoreException(e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public void writeCookie(BookieId bookieId, Versioned<byte[]> versioned) throws BookieException {
        String cookiePath = getCookiePath(bookieId);
        try {
            if (Version.NEW == versioned.getVersion()) {
                if (this.zk.exists(this.cookiePath, false) == null) {
                    try {
                        this.zk.create(this.cookiePath, new byte[0], this.zkAcls, CreateMode.PERSISTENT);
                    } catch (KeeperException.NodeExistsException e) {
                        log.info("More than one bookie tried to create {} at once. Safe to ignore.", this.cookiePath);
                    }
                }
                this.zk.create(cookiePath, versioned.getValue(), this.zkAcls, CreateMode.PERSISTENT);
            } else {
                if (!(versioned.getVersion() instanceof LongVersion)) {
                    throw new BookieException.BookieIllegalOpException("Invalid version type, expected it to be LongVersion");
                }
                this.zk.setData(cookiePath, versioned.getValue(), (int) ((LongVersion) versioned.getVersion()).getLongVersion());
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new BookieException.MetadataStoreException("Interrupted writing cookie for bookie " + bookieId, e2);
        } catch (KeeperException.NoNodeException e3) {
            throw new BookieException.CookieNotFoundException(bookieId.toString());
        } catch (KeeperException.NodeExistsException e4) {
            throw new BookieException.CookieExistException(bookieId.toString());
        } catch (KeeperException e5) {
            throw new BookieException.MetadataStoreException("Failed to write cookie for bookie " + bookieId);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public Versioned<byte[]> readCookie(BookieId bookieId) throws BookieException {
        String cookiePath = getCookiePath(bookieId);
        try {
            return new Versioned<>(this.zk.getData(cookiePath, false, this.zk.exists(cookiePath, false)), new LongVersion(r0.getVersion()));
        } catch (InterruptedException | KeeperException e) {
            throw new BookieException.MetadataStoreException("Failed to read cookie for bookie " + bookieId);
        } catch (KeeperException.NoNodeException e2) {
            throw new BookieException.CookieNotFoundException(bookieId.toString());
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public void removeCookie(BookieId bookieId, Version version) throws BookieException {
        try {
            this.zk.delete(getCookiePath(bookieId), (int) ((LongVersion) version).getLongVersion());
            log.info("Removed cookie from {} for bookie {}.", this.cookiePath, bookieId);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new BookieException.MetadataStoreException("Interrupted deleting cookie for bookie " + bookieId, e);
        } catch (KeeperException.NoNodeException e2) {
            throw new BookieException.CookieNotFoundException(bookieId.toString());
        } catch (KeeperException e3) {
            throw new BookieException.MetadataStoreException("Failed to delete cookie for bookie " + bookieId);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public String getClusterInstanceId() throws BookieException {
        String str = null;
        try {
            if (this.zk.exists(this.ledgersRootPath, (Watcher) null) == null) {
                log.error("BookKeeper metadata doesn't exist in zookeeper. Has the cluster been initialized? Try running bin/bookkeeper shell metaformat");
                throw new KeeperException.NoNodeException("BookKeeper metadata");
            }
            try {
                str = new String(this.zk.getData(this.ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, false, (Stat) null), StandardCharsets.UTF_8);
            } catch (KeeperException.NoNodeException e) {
                log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification");
            }
            return str;
        } catch (InterruptedException | KeeperException e2) {
            throw new BookieException.MetadataStoreException("Failed to get cluster instance id", e2);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public boolean prepareFormat() throws Exception {
        boolean z = null != this.zk.exists(this.ledgersRootPath, false);
        boolean z2 = null != this.zk.exists(this.bookieRegistrationPath, false);
        if (!z) {
            ZkUtils.createFullPathOptimistic(this.zk, this.ledgersRootPath, "".getBytes(StandardCharsets.UTF_8), this.zkAcls, CreateMode.PERSISTENT);
        }
        if (!z2) {
            this.zk.create(this.bookieRegistrationPath, "".getBytes(StandardCharsets.UTF_8), this.zkAcls, CreateMode.PERSISTENT);
        }
        if (null == this.zk.exists(this.bookieReadonlyRegistrationPath, false)) {
            this.zk.create(this.bookieReadonlyRegistrationPath, new byte[0], this.zkAcls, CreateMode.PERSISTENT);
        }
        return z;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public boolean initNewCluster() throws Exception {
        String resolveZkServers = ZKMetadataDriverBase.resolveZkServers(this.conf);
        String str = this.ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID;
        log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", resolveZkServers, this.ledgersRootPath);
        if (null != this.zk.exists(this.ledgersRootPath, false)) {
            log.error("Ledger root path: {} already exists", this.ledgersRootPath);
            return false;
        }
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(4);
        newArrayListWithExpectedSize.add(Op.create(this.ledgersRootPath, BookKeeperConstants.EMPTY_BYTE_ARRAY, this.zkAcls, CreateMode.PERSISTENT));
        newArrayListWithExpectedSize.add(Op.create(this.bookieRegistrationPath, BookKeeperConstants.EMPTY_BYTE_ARRAY, this.zkAcls, CreateMode.PERSISTENT));
        newArrayListWithExpectedSize.add(Op.create(this.bookieReadonlyRegistrationPath, BookKeeperConstants.EMPTY_BYTE_ARRAY, this.zkAcls, CreateMode.PERSISTENT));
        String uuid = UUID.randomUUID().toString();
        newArrayListWithExpectedSize.add(Op.create(str, uuid.getBytes(StandardCharsets.UTF_8), this.zkAcls, CreateMode.PERSISTENT));
        this.zk.multi(newArrayListWithExpectedSize);
        AbstractZkLedgerManagerFactory.newLedgerManagerFactory(this.conf, this.layoutManager);
        log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", new Object[]{resolveZkServers, this.ledgersRootPath, uuid});
        return true;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public boolean nukeExistingCluster() throws Exception {
        Collection collection;
        String resolveZkServers = ZKMetadataDriverBase.resolveZkServers(this.conf);
        log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}", resolveZkServers, this.ledgersRootPath);
        if (!(null != this.zk.exists(this.ledgersRootPath, false))) {
            log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, so exiting nuke operation", this.ledgersRootPath, resolveZkServers);
            return true;
        }
        boolean z = null != this.zk.exists(this.bookieRegistrationPath, false);
        ZKRegistrationClient zKRegistrationClient = new ZKRegistrationClient(this.zk, this.ledgersRootPath, null, false);
        if (z) {
            try {
                Collection collection2 = (Collection) ((Versioned) FutureUtils.result(zKRegistrationClient.getWritableBookies(), EXCEPTION_FUNC)).getValue();
                if (collection2 != null && !collection2.isEmpty()) {
                    log.error("Bookies are still up and connected to this cluster, stop all bookies before nuking the cluster");
                    zKRegistrationClient.close();
                    return false;
                }
                if ((null != this.zk.exists(this.bookieReadonlyRegistrationPath, false)) && (collection = (Collection) ((Versioned) FutureUtils.result(zKRegistrationClient.getReadOnlyBookies(), EXCEPTION_FUNC)).getValue()) != null && !collection.isEmpty()) {
                    log.error("Readonly Bookies are still up and connected to this cluster, stop all bookies before nuking the cluster");
                    zKRegistrationClient.close();
                    return false;
                }
            } catch (Throwable th) {
                try {
                    zKRegistrationClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        zKRegistrationClient.close();
        return AbstractZkLedgerManagerFactory.newLedgerManagerFactory(this.conf, this.layoutManager).validateAndNukeExistingCluster(this.conf, this.layoutManager);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public boolean format() throws Exception {
        try {
            ZKUtil.deleteRecursive(this.zk, ZkLedgerUnderreplicationManager.getBasePath(this.ledgersRootPath) + "/ledgers");
        } catch (KeeperException.NoNodeException e) {
            if (log.isDebugEnabled()) {
                log.debug("underreplicated ledgers root path node not exists in zookeeper to delete");
            }
        }
        try {
            ZKUtil.deleteRecursive(this.zk, ZkLedgerUnderreplicationManager.getBasePath(this.ledgersRootPath) + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK);
        } catch (KeeperException.NoNodeException e2) {
            if (log.isDebugEnabled()) {
                log.debug("underreplicatedledger locks node not exists in zookeeper to delete");
            }
        }
        try {
            ZKUtil.deleteRecursive(this.zk, this.cookiePath);
        } catch (KeeperException.NoNodeException e3) {
            if (log.isDebugEnabled()) {
                log.debug("cookies node not exists in zookeeper to delete");
            }
        }
        try {
            this.zk.delete(this.ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, -1);
        } catch (KeeperException.NoNodeException e4) {
            if (log.isDebugEnabled()) {
                log.debug("INSTANCEID not exists in zookeeper to delete");
            }
        }
        this.zk.create(this.ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8), this.zkAcls, CreateMode.PERSISTENT);
        log.info("Successfully formatted BookKeeper metadata");
        return true;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.discover.RegistrationManager
    public boolean isBookieRegistered(BookieId bookieId) throws BookieException {
        String str = this.bookieRegistrationPath + "/" + bookieId;
        String str2 = this.bookieReadonlyRegistrationPath + "/" + bookieId;
        try {
            if (null == this.zk.exists(str, false)) {
                if (null == this.zk.exists(str2, false)) {
                    return false;
                }
            }
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("InterruptedException while checking registration ephemeral znodes for BookieId: {}", bookieId, e);
            throw new BookieException.MetadataStoreException(e);
        } catch (KeeperException e2) {
            log.error("ZK exception while checking registration ephemeral znodes for BookieId: {}", bookieId, e2);
            throw new BookieException.MetadataStoreException(e2);
        }
    }
}
