package org.apache.bookkeeper.metadata.etcd.helpers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.metadata.etcd.EtcdWatchClient;
import org.apache.bookkeeper.metadata.etcd.EtcdWatcher;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/metadata/etcd/helpers/KeySetReader.class */
public class KeySetReader<T> implements BiConsumer<WatchResponse, Throwable>, AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KeySetReader.class);
    private final Client client;
    private final boolean ownWatchClient;
    private final EtcdWatchClient watchClient;
    private final Function<ByteSequence, T> encoder;
    private final ByteSequence beginKey;
    private final ByteSequence endKey;
    private final Set<ByteSequence> keys;
    private final CopyOnWriteArraySet<Consumer<Versioned<Set<T>>>> consumers;
    private volatile long revision;
    private CompletableFuture<EtcdWatcher> watchFuture;
    private CompletableFuture<Void> closeFuture;

    /* renamed from: org.apache.bookkeeper.metadata.etcd.helpers.KeySetReader$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/bookkeeper/metadata/etcd/helpers/KeySetReader$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$etcd$jetcd$watch$WatchEvent$EventType = new int[WatchEvent.EventType.values().length];

        static {
            try {
                $SwitchMap$io$etcd$jetcd$watch$WatchEvent$EventType[WatchEvent.EventType.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$etcd$jetcd$watch$WatchEvent$EventType[WatchEvent.EventType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public KeySetReader(Client client, Function<ByteSequence, T> function, ByteSequence byteSequence, ByteSequence byteSequence2) {
        this(client, new EtcdWatchClient(client), function, byteSequence, byteSequence2);
    }

    public KeySetReader(Client client, EtcdWatchClient etcdWatchClient, Function<ByteSequence, T> function, ByteSequence byteSequence, ByteSequence byteSequence2) {
        this.consumers = new CopyOnWriteArraySet<>();
        this.revision = -1L;
        this.watchFuture = null;
        this.closeFuture = null;
        this.client = client;
        this.watchClient = etcdWatchClient;
        this.ownWatchClient = false;
        this.encoder = function;
        this.beginKey = byteSequence;
        this.endKey = byteSequence2;
        this.keys = Collections.synchronizedSet(Sets.newHashSet());
    }

    public CompletableFuture<Versioned<Set<T>>> read() {
        GetOption.Builder withKeysOnly = GetOption.newBuilder().withKeysOnly(true);
        if (null != this.endKey) {
            withKeysOnly.withRange(this.endKey);
        }
        return this.client.getKVClient().get(this.beginKey, withKeysOnly.build()).thenApply(getResponse -> {
            boolean updateLocalValue = updateLocalValue(getResponse);
            Versioned<Set<T>> localValue = getLocalValue();
            if (updateLocalValue) {
                notifyConsumers(localValue);
            }
            return localValue;
        });
    }

    @VisibleForTesting
    long getRevision() {
        return this.revision;
    }

    private void notifyConsumers(Versioned<Set<T>> versioned) {
        this.consumers.forEach(consumer -> {
            consumer.accept(versioned);
        });
    }

    private synchronized boolean updateLocalValue(GetResponse getResponse) {
        if (this.revision >= getResponse.getHeader().getRevision()) {
            return false;
        }
        this.revision = getResponse.getHeader().getRevision();
        this.keys.clear();
        Iterator it = getResponse.getKvs().iterator();
        while (it.hasNext()) {
            this.keys.add(((KeyValue) it.next()).getKey());
        }
        return true;
    }

    private synchronized Versioned<Set<T>> processWatchResponse(WatchResponse watchResponse) {
        if (null != this.closeFuture) {
            return null;
        }
        if (log.isDebugEnabled()) {
            log.debug("Received watch response : revision = {}, {} events = {}", new Object[]{Long.valueOf(watchResponse.getHeader().getRevision()), Integer.valueOf(watchResponse.getEvents().size()), watchResponse.getEvents()});
        }
        if (watchResponse.getHeader().getRevision() <= this.revision) {
            return null;
        }
        this.revision = watchResponse.getHeader().getRevision();
        watchResponse.getEvents().forEach(watchEvent -> {
            switch (AnonymousClass1.$SwitchMap$io$etcd$jetcd$watch$WatchEvent$EventType[watchEvent.getEventType().ordinal()]) {
                case 1:
                    this.keys.add(watchEvent.getKeyValue().getKey());
                    return;
                case 2:
                    this.keys.remove(watchEvent.getKeyValue().getKey());
                    return;
                default:
                    return;
            }
        });
        return getLocalValue();
    }

    @VisibleForTesting
    synchronized Versioned<Set<T>> getLocalValue() {
        return new Versioned<>(this.keys.stream().map(this.encoder).collect(Collectors.toSet()), new LongVersion(this.revision));
    }

    private CompletableFuture<Versioned<Set<T>>> getOrRead() {
        boolean z = false;
        synchronized (this) {
            if (this.revision < 0) {
                z = true;
            }
        }
        return z ? read() : FutureUtils.value(getLocalValue());
    }

    @VisibleForTesting
    synchronized boolean isWatcherSet() {
        return null != this.watchFuture;
    }

    public CompletableFuture<Versioned<Set<T>>> readAndWatch(Consumer<Versioned<Set<T>>> consumer) {
        return (!this.consumers.add(consumer) || isWatcherSet()) ? getOrRead() : (CompletableFuture<Versioned<Set<T>>>) getOrRead().thenCompose(versioned -> {
            return watch(versioned.getVersion().getLongVersion()).thenApply(etcdWatcher -> {
                return versioned;
            });
        });
    }

    public CompletableFuture<Void> unwatch(Consumer<Versioned<Set<T>>> consumer) {
        return (this.consumers.remove(consumer) && this.consumers.isEmpty()) ? closeOrRewatch(false) : FutureUtils.Void();
    }

    private synchronized CompletableFuture<EtcdWatcher> watch(long j) {
        if (null != this.watchFuture) {
            return this.watchFuture;
        }
        WatchOption.Builder withRevision = WatchOption.newBuilder().withRevision(j);
        if (null != this.endKey) {
            withRevision.withRange(this.endKey);
        }
        this.watchFuture = this.watchClient.watch(this.beginKey, withRevision.build(), this);
        return this.watchFuture.whenComplete((etcdWatcher, th) -> {
            if (null != th) {
                synchronized (this) {
                    this.watchFuture = null;
                }
            }
        });
    }

    private CompletableFuture<Void> closeOrRewatch(boolean z) {
        CompletableFuture<EtcdWatcher> completableFuture;
        synchronized (this) {
            completableFuture = this.watchFuture;
            if (z && null == this.closeFuture) {
                this.watchFuture = watch(this.revision);
            } else {
                this.watchFuture = null;
            }
        }
        return null != completableFuture ? completableFuture.thenCompose((v0) -> {
            return v0.closeAsync();
        }) : FutureUtils.Void();
    }

    @Override // java.util.function.BiConsumer
    public void accept(WatchResponse watchResponse, Throwable th) {
        if (null != th) {
            closeOrRewatch(true);
            return;
        }
        Versioned<Set<T>> processWatchResponse = processWatchResponse(watchResponse);
        if (null != processWatchResponse) {
            notifyConsumers(processWatchResponse);
        }
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this) {
            if (null == this.closeFuture) {
                this.closeFuture = closeOrRewatch(false).thenCompose(r3 -> {
                    return this.ownWatchClient ? this.watchClient.closeAsync() : FutureUtils.Void();
                });
            }
            completableFuture = this.closeFuture;
        }
        return completableFuture;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            FutureUtils.result(closeAsync());
        } catch (Exception e) {
            log.warn("Encountered exceptions on closing key reader : {}", e.getMessage());
        }
    }
}
