package org.apache.bookkeeper.metadata.etcd;

import com.google.common.collect.Sets;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.common.exception.ClosedClientException;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator;
import org.apache.bookkeeper.metadata.etcd.helpers.KeyStream;
import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.class */
class EtcdLedgerManager implements LedgerManager {
    private static final Logger log = LoggerFactory.getLogger(EtcdLedgerManager.class);
    private final String scope;
    private final Client client;
    private final KV kvClient;
    private final EtcdWatchClient watchClient;
    private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
    private final ConcurrentLongHashMap<ValueStream<LedgerMetadata>> watchers = new ConcurrentLongHashMap<>();
    private final ConcurrentMap<BookkeeperInternalCallbacks.LedgerMetadataListener, LedgerMetadataConsumer> listeners = new ConcurrentHashMap();
    private volatile boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EtcdLedgerManager(Client client, String str) {
        this.client = client;
        this.kvClient = client.getKVClient();
        this.scope = str;
        this.watchClient = new EtcdWatchClient(client);
    }

    private boolean isClosed() {
        return this.closed;
    }

    ValueStream<LedgerMetadata> getLedgerMetadataStream(long j) {
        return (ValueStream) this.watchers.get(j);
    }

    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long j, LedgerMetadata ledgerMetadata) {
        CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, j);
        log.info("Create ledger metadata under key {}", ledgerKey);
        ByteSequence from = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8);
        try {
            this.kvClient.txn().If(new Cmp[]{new Cmp(from, Cmp.Op.GREATER, CmpTarget.createRevision(0L))}).Then(new Op[]{Op.get(from, GetOption.newBuilder().withCountOnly(true).build())}).Else(new Op[]{Op.put(from, ByteSequence.from(this.serDe.serialize(ledgerMetadata)), PutOption.DEFAULT)}).commit().thenAccept(txnResponse -> {
                if (!txnResponse.isSucceeded()) {
                    completableFuture.complete(new Versioned(ledgerMetadata, new LongVersion(txnResponse.getHeader().getRevision())));
                } else if (((GetResponse) txnResponse.getGetResponses().get(0)).getCount() <= 0) {
                    completableFuture.completeExceptionally(new BKException.BKUnexpectedConditionException());
                } else {
                    completableFuture.completeExceptionally(new BKException.BKLedgerExistException());
                }
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(new BKException.MetaStoreException());
                return null;
            });
            return completableFuture;
        } catch (IOException e) {
            completableFuture.completeExceptionally(new BKException.BKMetadataSerializationException(e));
            return completableFuture;
        }
    }

    public CompletableFuture<Void> removeLedgerMetadata(long j, Version version) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        long j2 = -43981;
        if (Version.NEW == version) {
            log.error("Request to delete ledger {} metadata with version set to the initial one", Long.valueOf(j));
            completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
            return completableFuture;
        }
        if (Version.ANY != version) {
            if (!(version instanceof LongVersion)) {
                log.info("Not an instance of LongVersion : {}", Long.valueOf(j));
                completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
                return completableFuture;
            }
            j2 = ((LongVersion) version).getLongVersion();
        }
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, j);
        ByteSequence from = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8);
        Txn txn = this.kvClient.txn();
        (j2 == -43981 ? txn.If(new Cmp[]{new Cmp(from, Cmp.Op.GREATER, CmpTarget.createRevision(0L))}) : txn.If(new Cmp[]{new Cmp(from, Cmp.Op.EQUAL, CmpTarget.modRevision(j2))})).Then(new Op[]{Op.delete(from, DeleteOption.DEFAULT)}).Else(new Op[]{Op.get(from, GetOption.DEFAULT)}).commit().thenAccept(txnResponse -> {
            if (txnResponse.isSucceeded()) {
                completableFuture.complete(null);
            } else if (((GetResponse) txnResponse.getGetResponses().get(0)).getCount() > 0) {
                completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
            } else {
                log.warn("Deleting ledger {} failed due to : ledger key {} doesn't exist", Long.valueOf(j), ledgerKey);
                completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(new BKException.MetaStoreException());
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long j) {
        CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, j);
        ByteSequence from = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8);
        log.info("read ledger metadata under key {}", ledgerKey);
        this.kvClient.get(from).thenAccept(getResponse -> {
            if (getResponse.getCount() <= 0) {
                completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
                return;
            }
            KeyValue keyValue = (KeyValue) getResponse.getKvs().get(0);
            try {
                completableFuture.complete(new Versioned(this.serDe.parseConfig(keyValue.getValue().getBytes(), j, Optional.empty()), new LongVersion(keyValue.getModRevision())));
            } catch (IOException e) {
                log.error("Could not parse ledger metadata for ledger : {}", Long.valueOf(j), e);
                completableFuture.completeExceptionally(new BKException.MetaStoreException());
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(new BKException.MetaStoreException());
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long j, LedgerMetadata ledgerMetadata, Version version) {
        CompletableFuture<Versioned<LedgerMetadata>> completableFuture = new CompletableFuture<>();
        if (Version.NEW == version || !(version instanceof LongVersion)) {
            completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
            return completableFuture;
        }
        LongVersion longVersion = (LongVersion) version;
        ByteSequence from = ByteSequence.from(EtcdUtils.getLedgerKey(this.scope, j), StandardCharsets.UTF_8);
        try {
            this.kvClient.txn().If(new Cmp[]{new Cmp(from, Cmp.Op.EQUAL, CmpTarget.modRevision(longVersion.getLongVersion()))}).Then(new Op[]{Op.put(from, ByteSequence.from(this.serDe.serialize(ledgerMetadata)), PutOption.DEFAULT)}).Else(new Op[]{Op.get(from, GetOption.DEFAULT)}).commit().thenAccept(txnResponse -> {
                if (txnResponse.isSucceeded()) {
                    completableFuture.complete(new Versioned(ledgerMetadata, new LongVersion(txnResponse.getHeader().getRevision())));
                    return;
                }
                GetResponse getResponse = (GetResponse) txnResponse.getGetResponses().get(0);
                if (getResponse.getCount() <= 0) {
                    completableFuture.completeExceptionally(new BKException.BKNoSuchLedgerExistsException());
                } else {
                    log.warn("Conditional update ledger metadata failed : expected version = {}, actual version = {}", Long.valueOf(((KeyValue) getResponse.getKvs().get(0)).getModRevision()), longVersion);
                    completableFuture.completeExceptionally(new BKException.BKMetadataVersionException());
                }
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(new BKException.MetaStoreException());
                return null;
            });
            return completableFuture;
        } catch (IOException e) {
            completableFuture.completeExceptionally(new BKException.BKMetadataSerializationException(e));
            return completableFuture;
        }
    }

    private LedgerMetadataConsumer listenerToConsumer(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener, Consumer<Long> consumer) {
        return new LedgerMetadataConsumer(j, ledgerMetadataListener, consumer);
    }

    public void registerLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
        if (this.listeners.containsKey(ledgerMetadataListener)) {
            return;
        }
        ValueStream valueStream = (ValueStream) this.watchers.computeIfAbsent(j, j2 -> {
            return new ValueStream(this.client, this.watchClient, byteSequence -> {
                try {
                    return this.serDe.parseConfig(byteSequence.getBytes(), j2, Optional.empty());
                } catch (IOException e) {
                    log.error("Could not parse ledger metadata : {}", byteSequence.toString(StandardCharsets.UTF_8), e);
                    throw new RuntimeException("Could not parse ledger metadata : " + byteSequence.toString(StandardCharsets.UTF_8), e);
                }
            }, ByteSequence.from(EtcdUtils.getLedgerKey(this.scope, j), StandardCharsets.UTF_8));
        });
        LedgerMetadataConsumer listenerToConsumer = listenerToConsumer(j, ledgerMetadataListener, l -> {
            if (this.watchers.remove(l.longValue(), valueStream)) {
                log.info("Closed ledger metadata watcher on ledger {} deletion.", l);
                valueStream.closeAsync();
            }
        });
        if (null != this.listeners.putIfAbsent(ledgerMetadataListener, listenerToConsumer)) {
            return;
        }
        valueStream.readAndWatch(listenerToConsumer).whenComplete((versioned, th) -> {
            if (null == th || (th instanceof ClosedClientException)) {
                return;
            }
            registerLedgerMetadataListener(j, ledgerMetadataListener);
        });
    }

    public void unregisterLedgerMetadataListener(long j, BookkeeperInternalCallbacks.LedgerMetadataListener ledgerMetadataListener) {
        unregisterLedgerMetadataListener(j, this.listeners.remove(ledgerMetadataListener));
    }

    private void unregisterLedgerMetadataListener(long j, LedgerMetadataConsumer ledgerMetadataConsumer) {
        ValueStream valueStream = (ValueStream) this.watchers.get(j);
        if (null == valueStream) {
            return;
        }
        valueStream.unwatch(ledgerMetadataConsumer).thenAccept(bool -> {
            if (bool.booleanValue() && this.watchers.remove(j, valueStream)) {
                log.info("Closed ledger metadata watcher on ledger {} since there are no listeners any more.", Long.valueOf(j));
                valueStream.closeAsync();
            }
        }).exceptionally(th -> {
            if (!(th instanceof ClosedClientException)) {
                return null;
            }
            unregisterLedgerMetadataListener(j, ledgerMetadataConsumer);
            return null;
        });
    }

    public void asyncProcessLedgers(BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback voidCallback, Object obj, int i, int i2) {
        processLedgers(new KeyStream<>(this.kvClient, ByteSequence.from(EtcdUtils.getLedgerKey(this.scope, 0L), StandardCharsets.UTF_8), ByteSequence.from(EtcdUtils.getLedgerKey(this.scope, Long.MAX_VALUE), StandardCharsets.UTF_8), byteSequence -> {
            return Long.valueOf(EtcdUtils.parseLedgerKey(byteSequence.toString(StandardCharsets.UTF_8)).getLeastSignificantBits());
        }), processor, voidCallback, obj, i, i2);
    }

    private void processLedgers(KeyStream<Long> keyStream, BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback voidCallback, Object obj, int i, int i2) {
        keyStream.readNext().whenCompleteAsync((list, th) -> {
            if (null != th) {
                voidCallback.processResult(i2, (String) null, obj);
            } else if (list.isEmpty()) {
                voidCallback.processResult(i, (String) null, obj);
            } else {
                list.forEach(l -> {
                    processor.process(l, voidCallback);
                });
                processLedgers(keyStream, processor, voidCallback, obj, i, i2);
            }
        });
    }

    public LedgerManager.LedgerRangeIterator getLedgerRanges(long j) {
        final KeyIterator keyIterator = new KeyIterator(new KeyStream(this.kvClient, ByteSequence.from(EtcdUtils.getLedgerKey(this.scope, 0L), StandardCharsets.UTF_8), ByteSequence.from(EtcdUtils.getLedgerKey(this.scope, Long.MAX_VALUE), StandardCharsets.UTF_8), byteSequence -> {
            return Long.valueOf(EtcdUtils.parseLedgerKey(byteSequence.toString(StandardCharsets.UTF_8)).getLeastSignificantBits());
        }));
        return new LedgerManager.LedgerRangeIterator() { // from class: org.apache.bookkeeper.metadata.etcd.EtcdLedgerManager.1
            public boolean hasNext() throws IOException {
                try {
                    return keyIterator.hasNext();
                } catch (Exception e) {
                    if (e instanceof IOException) {
                        throw ((IOException) e);
                    }
                    throw new IOException(e);
                }
            }

            public LedgerManager.LedgerRange next() throws IOException {
                try {
                    List next = keyIterator.next();
                    TreeSet newTreeSet = Sets.newTreeSet();
                    newTreeSet.addAll(next);
                    return new LedgerManager.LedgerRange(newTreeSet);
                } catch (Exception e) {
                    if (e instanceof IOException) {
                        throw ((IOException) e);
                    }
                    throw new IOException(e);
                }
            }
        };
    }

    public void close() {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.watchClient.close();
        }
    }
}
