package org.apache.bookkeeper.metadata.etcd;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.DeleteResponse;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerLayout;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/metadata/etcd/EtcdRegistrationManager.class */
class EtcdRegistrationManager implements RegistrationManager {
    private static final Logger log = LoggerFactory.getLogger(EtcdRegistrationManager.class);
    private final String scope;
    private final Client client;
    private final boolean ownClient;
    private final KV kvClient;
    private final EtcdBookieRegister bkRegister;

    @VisibleForTesting
    EtcdRegistrationManager(Client client, String str) {
        this(client, str, 60L);
    }

    @VisibleForTesting
    EtcdRegistrationManager(Client client, String str, long j) {
        this(client, str, j, () -> {
        });
    }

    @VisibleForTesting
    EtcdRegistrationManager(Client client, String str, long j, RegistrationManager.RegistrationListener registrationListener) {
        this(client, str, new EtcdBookieRegister(client.getLeaseClient(), j, registrationListener).start(), true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EtcdRegistrationManager(Client client, String str, EtcdBookieRegister etcdBookieRegister) {
        this(client, str, etcdBookieRegister, false);
    }

    private EtcdRegistrationManager(Client client, String str, EtcdBookieRegister etcdBookieRegister, boolean z) {
        this.scope = str;
        this.client = client;
        this.kvClient = client.getKVClient();
        this.bkRegister = etcdBookieRegister;
        this.ownClient = z;
    }

    public void close() {
        if (this.ownClient) {
            log.info("Closing registration manager under scope '{}'", this.scope);
            this.bkRegister.close();
            this.client.close();
            log.info("Successfully closed registration manager under scope '{}'", this.scope);
        }
    }

    public void registerBookie(BookieId bookieId, boolean z, BookieServiceInfo bookieServiceInfo) throws BookieException {
        if (z) {
            doRegisterReadonlyBookie(bookieId, this.bkRegister.get().longValue());
        } else {
            doRegisterBookie(EtcdUtils.getWritableBookiePath(this.scope, bookieId), this.bkRegister.get().longValue());
        }
    }

    private boolean checkRegNodeAndWaitExpired(String str, long j) throws BookieException.MetadataStoreException {
        if (((GetResponse) EtcdUtils.msResult(this.kvClient.get(ByteSequence.from(str, StandardCharsets.UTF_8)))).getCount() <= 0) {
            return false;
        }
        return waitUntilRegNodeExpired(str, j);
    }

    private boolean waitUntilRegNodeExpired(String str, long j) throws BookieException.MetadataStoreException {
        ByteSequence from = ByteSequence.from(str, StandardCharsets.UTF_8);
        GetResponse getResponse = (GetResponse) EtcdUtils.msResult(this.kvClient.get(from));
        if (getResponse.getCount() <= 0) {
            return false;
        }
        KeyValue keyValue = (KeyValue) getResponse.getKvs().get(0);
        if (keyValue.getLease() == j) {
            return true;
        }
        Watch watchClient = this.client.getWatchClient();
        CompletableFuture completableFuture = new CompletableFuture();
        Watch.Watcher watch = watchClient.watch(from, WatchOption.newBuilder().withRevision(getResponse.getHeader().getRevision() + 1).build(), watchResponse -> {
            for (WatchEvent watchEvent : watchResponse.getEvents()) {
                log.info("Received watch event on '{}' : EventType = {}, lease {}", new Object[]{str, watchEvent.getEventType(), Long.valueOf(j)});
                if (WatchEvent.EventType.DELETE == watchEvent.getEventType()) {
                    completableFuture.complete(null);
                    return;
                }
            }
        }, th -> {
            log.warn("Exception in keepAlive for watch event on {}, lease {}", new Object[]{str, Long.valueOf(j), th});
            completableFuture.completeExceptionally(new UncheckedExecutionException("Interrupted at waiting previous registration under " + str + " (lease = " + keyValue.getLease() + ") to be expired", th));
        });
        log.info("Previous bookie registration (lease = {}) still exists at {}, so new lease '{}' will be waiting previous lease for {} seconds to be expired", new Object[]{Long.valueOf(keyValue.getLease()), str, Long.valueOf(j), Long.valueOf(this.bkRegister.getTtlSeconds())});
        try {
            try {
                try {
                    EtcdUtils.msResult(completableFuture, 2 * this.bkRegister.getTtlSeconds(), TimeUnit.SECONDS);
                    watch.close();
                    return false;
                } catch (UncheckedExecutionException e) {
                    throw new BookieException.MetadataStoreException(e.getMessage(), e.getCause());
                }
            } catch (TimeoutException e2) {
                completableFuture.cancel(true);
                throw new BookieException.MetadataStoreException("Previous bookie registration still exists at " + str + " (lease = " + keyValue.getLease() + ") after " + (2 * this.bkRegister.getTtlSeconds()) + " seconds elapsed");
            }
        } catch (Throwable th2) {
            watch.close();
            throw th2;
        }
    }

    private void doRegisterBookie(String str, long j) throws BookieException.MetadataStoreException {
        if (checkRegNodeAndWaitExpired(str, j)) {
            return;
        }
        ByteSequence from = ByteSequence.from(str, StandardCharsets.UTF_8);
        TxnResponse txnResponse = (TxnResponse) EtcdUtils.msResult(this.kvClient.txn().If(new Cmp[]{new Cmp(from, Cmp.Op.GREATER, CmpTarget.createRevision(0L))}).Then(new Op[]{Op.get(from, GetOption.DEFAULT)}).Else(new Op[]{Op.put(from, ByteSequence.from(new byte[0]), PutOption.newBuilder().withLeaseId(this.bkRegister.get().longValue()).build())}).commit());
        if (!txnResponse.isSucceeded()) {
            log.info("Successfully registered bookie at {}", str);
            return;
        }
        GetResponse getResponse = (GetResponse) txnResponse.getGetResponses().get(0);
        if (getResponse.getCount() > 0) {
            throw new BookieException.MetadataStoreException("Another bookie already registered under '" + str + "': lease = " + ((KeyValue) getResponse.getKvs().get(0)).getLease());
        }
        throw new BookieException.MetadataStoreException("Failed to register bookie under '" + str + "', but no bookie is registered there.");
    }

    private void doRegisterReadonlyBookie(BookieId bookieId, long j) throws BookieException.MetadataStoreException {
        doRegisterBookie(EtcdUtils.getReadonlyBookiePath(this.scope, bookieId), j);
        EtcdUtils.msResult(this.kvClient.delete(ByteSequence.from(EtcdUtils.getWritableBookiePath(this.scope, bookieId), StandardCharsets.UTF_8)));
    }

    public void unregisterBookie(BookieId bookieId, boolean z) throws BookieException {
        String readonlyBookiePath = z ? EtcdUtils.getReadonlyBookiePath(this.scope, bookieId) : EtcdUtils.getWritableBookiePath(this.scope, bookieId);
        if (((DeleteResponse) EtcdUtils.msResult(this.kvClient.delete(ByteSequence.from(readonlyBookiePath, StandardCharsets.UTF_8)))).getDeleted() > 0) {
            log.info("Successfully unregistered bookie {} from {}", bookieId, readonlyBookiePath);
        } else {
            log.info("Bookie disappeared from {} before unregistering", readonlyBookiePath);
        }
    }

    public boolean isBookieRegistered(BookieId bookieId) throws BookieException {
        return ((GetResponse) EtcdUtils.msResult(this.kvClient.get(ByteSequence.from(EtcdUtils.getWritableBookiePath(this.scope, bookieId), StandardCharsets.UTF_8), GetOption.newBuilder().withCountOnly(true).build()))).getCount() > 0 || ((GetResponse) EtcdUtils.msResult(this.kvClient.get(ByteSequence.from(EtcdUtils.getReadonlyBookiePath(this.scope, bookieId), StandardCharsets.UTF_8), GetOption.newBuilder().withCountOnly(true).build()))).getCount() > 0;
    }

    public void writeCookie(BookieId bookieId, Versioned<byte[]> versioned) throws BookieException {
        ByteSequence from = ByteSequence.from(EtcdUtils.getCookiePath(this.scope, bookieId), StandardCharsets.UTF_8);
        Txn txn = this.kvClient.txn();
        if (Version.NEW == versioned.getVersion()) {
            txn.If(new Cmp[]{new Cmp(from, Cmp.Op.GREATER, CmpTarget.createRevision(0L))}).Else(new Op[]{Op.put(from, ByteSequence.from((byte[]) versioned.getValue()), PutOption.DEFAULT)});
        } else {
            if (!(versioned.getVersion() instanceof LongVersion)) {
                throw new BookieException.BookieIllegalOpException("Invalid version type, expected it to be LongVersion");
            }
            txn.If(new Cmp[]{new Cmp(from, Cmp.Op.EQUAL, CmpTarget.modRevision(versioned.getVersion().getLongVersion()))}).Then(new Op[]{Op.put(from, ByteSequence.from((byte[]) versioned.getValue()), PutOption.DEFAULT)});
        }
        if (((TxnResponse) EtcdUtils.msResult(txn.commit())).isSucceeded() != (Version.NEW != versioned.getVersion())) {
            throw new BookieException.MetadataStoreException("Conflict on writing cookie for bookie " + bookieId);
        }
    }

    public Versioned<byte[]> readCookie(BookieId bookieId) throws BookieException {
        GetResponse getResponse = (GetResponse) EtcdUtils.msResult(this.kvClient.get(ByteSequence.from(EtcdUtils.getCookiePath(this.scope, bookieId), StandardCharsets.UTF_8)));
        if (getResponse.getCount() <= 0) {
            throw new BookieException.CookieNotFoundException(bookieId.toString());
        }
        KeyValue keyValue = (KeyValue) getResponse.getKvs().get(0);
        return new Versioned<>(keyValue.getValue().getBytes(), new LongVersion(keyValue.getModRevision()));
    }

    public void removeCookie(BookieId bookieId, Version version) throws BookieException {
        ByteSequence from = ByteSequence.from(EtcdUtils.getCookiePath(this.scope, bookieId), StandardCharsets.UTF_8);
        TxnResponse txnResponse = (TxnResponse) EtcdUtils.msResult(this.kvClient.txn().If(new Cmp[]{new Cmp(from, Cmp.Op.EQUAL, CmpTarget.modRevision(((LongVersion) version).getLongVersion()))}).Then(new Op[]{Op.delete(from, DeleteOption.DEFAULT)}).Else(new Op[]{Op.get(from, GetOption.newBuilder().withCountOnly(true).build())}).commit());
        if (txnResponse.isSucceeded()) {
            log.info("Removed cookie from {} for bookie {}", from.toString(StandardCharsets.UTF_8), bookieId);
        } else {
            if (((GetResponse) txnResponse.getGetResponses().get(0)).getCount() <= 0) {
                throw new BookieException.CookieNotFoundException(bookieId.toString());
            }
            throw new BookieException.MetadataStoreException("Failed to remove cookie from " + from.toString(StandardCharsets.UTF_8) + " for bookie " + bookieId + " : bad version '" + version + "'");
        }
    }

    public String getClusterInstanceId() throws BookieException {
        GetResponse getResponse = (GetResponse) EtcdUtils.msResult(this.kvClient.get(ByteSequence.from(EtcdUtils.getClusterInstanceIdPath(this.scope), StandardCharsets.UTF_8)));
        if (getResponse.getCount() > 0) {
            return new String(((KeyValue) getResponse.getKvs().get(0)).getValue().getBytes(), StandardCharsets.UTF_8);
        }
        log.error("BookKeeper metadata doesn't exist in Etcd. Has the cluster been initialized? Try running bin/bookkeeper shell initNewCluster");
        throw new BookieException.MetadataStoreException("BookKeeper is not initialized under '" + this.scope + "' yet");
    }

    public boolean prepareFormat() throws Exception {
        return ((GetResponse) EtcdUtils.msResult(this.kvClient.get(ByteSequence.from(this.scope, StandardCharsets.UTF_8)))).getCount() > 0;
    }

    public boolean initNewCluster() throws Exception {
        return initNewCluster(this.kvClient, this.scope);
    }

    static boolean initNewCluster(KV kv, String str) throws Exception {
        ByteSequence from = ByteSequence.from(str, StandardCharsets.UTF_8);
        return !((TxnResponse) EtcdUtils.msResult(kv.txn().If(new Cmp[]{new Cmp(from, Cmp.Op.GREATER, CmpTarget.createRevision(0L))}).Else(new Op[]{Op.put(from, EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getLayoutKey(str), StandardCharsets.UTF_8), ByteSequence.from(new LedgerLayout(EtcdLedgerManagerFactory.class.getName(), 0).serialize()), PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getClusterInstanceIdPath(str), StandardCharsets.UTF_8), ByteSequence.from(UUID.randomUUID().toString(), StandardCharsets.UTF_8), PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getCookiesPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getBookiesPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getWritableBookiesPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getReadonlyBookiesPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getLedgersPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getBucketsPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT), Op.put(ByteSequence.from(EtcdUtils.getUnderreplicationPath(str), StandardCharsets.UTF_8), EtcdConstants.EMPTY_BS, PutOption.DEFAULT)}).commit())).isSucceeded();
    }

    public boolean format() throws Exception {
        return format(this.kvClient, this.scope);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean format(KV kv, String str) throws Exception {
        if (((GetResponse) EtcdUtils.msResult(kv.get(ByteSequence.from(str, StandardCharsets.UTF_8)))).getCount() > 0 && !nukeExistingCluster(kv, str)) {
            return false;
        }
        return initNewCluster(kv, str);
    }

    public boolean nukeExistingCluster() throws Exception {
        return nukeExistingCluster(this.kvClient, this.scope);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean nukeExistingCluster(KV kv, String str) throws Exception {
        ByteSequence from = ByteSequence.from(str, StandardCharsets.UTF_8);
        if (((GetResponse) EtcdUtils.msResult(kv.get(from))).getCount() <= 0) {
            log.info("There is no existing cluster with under scope '{}' in Etcd, so exiting nuke operation", str);
            return true;
        }
        String bookiesPath = EtcdUtils.getBookiesPath(str);
        GetResponse getResponse = (GetResponse) EtcdUtils.msResult(kv.get(ByteSequence.from(bookiesPath, StandardCharsets.UTF_8), GetOption.newBuilder().withRange(ByteSequence.from(EtcdUtils.getBookiesEndPath(str), StandardCharsets.UTF_8)).withKeysOnly(true).build()));
        String writableBookiesPath = EtcdUtils.getWritableBookiesPath(str);
        String readonlyBookiesPath = EtcdUtils.getReadonlyBookiesPath(str);
        boolean z = false;
        Iterator it = getResponse.getKvs().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String str2 = new String(((KeyValue) it.next()).getKey().getBytes(), StandardCharsets.UTF_8);
            if (!str2.equals(bookiesPath) && !str2.equals(writableBookiesPath) && !str2.equals(readonlyBookiesPath)) {
                z = true;
                break;
            }
        }
        if (z) {
            log.error("Bookies are still up and connected to this cluster, stop all bookies before nuking the cluster");
            return false;
        }
        log.info("Successfully nuked cluster under scope '{}' : {} kv pairs deleted", str, Long.valueOf(((DeleteResponse) EtcdUtils.msResult(kv.delete(from, DeleteOption.newBuilder().withRange(ByteSequence.from(EtcdUtils.getScopeEndKey(str), StandardCharsets.UTF_8)).build()))).getDeleted()));
        return true;
    }

    Client getClient() {
        return this.client;
    }

    EtcdBookieRegister getBkRegister() {
        return this.bkRegister;
    }
}
