package org.apache.bookkeeper.metadata.etcd;

import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.EtcdConnectionManager;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.common.exception.EtcdExceptionFactory;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchResponse;
import io.etcd.jetcd.watch.WatchResponseWithError;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/metadata/etcd/EtcdWatchClient.class */
public class EtcdWatchClient implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EtcdWatchClient.class);
    private final EtcdConnectionManager connMgr;
    private final WatchGrpc.WatchStub stub;
    private volatile StreamObserver<WatchRequest> grpcWatchStreamObserver;
    private final ConcurrentLongHashMap<EtcdWatcher> watchers = new ConcurrentLongHashMap<>();
    private final LinkedList<EtcdWatcher> pendingWatchers = new LinkedList<>();
    private final ConcurrentLongHashSet cancelSet = new ConcurrentLongHashSet();
    private CompletableFuture<Void> closeFuture = null;
    private final OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().name("etcd-watcher-scheduler").numThreads(Runtime.getRuntime().availableProcessors()).build();
    private final ScheduledExecutorService watchExecutor = this.scheduler.chooseThread();

    public EtcdWatchClient(Client client) {
        this.connMgr = new EtcdConnectionManager(client);
        this.stub = this.connMgr.newWatchStub();
    }

    public synchronized boolean isClosed() {
        return this.closeFuture != null;
    }

    public CompletableFuture<EtcdWatcher> watch(ByteSequence byteSequence, BiConsumer<WatchResponse, Throwable> biConsumer) {
        return watch(byteSequence, WatchOption.DEFAULT, biConsumer);
    }

    public CompletableFuture<EtcdWatcher> watch(ByteSequence byteSequence, WatchOption watchOption, BiConsumer<WatchResponse, Throwable> biConsumer) {
        return CompletableFuture.supplyAsync(() -> {
            if (isClosed()) {
                throw EtcdExceptionFactory.newClosedWatchClientException();
            }
            EtcdWatcher etcdWatcher = new EtcdWatcher(byteSequence, watchOption, this.scheduler.chooseThread(), this);
            etcdWatcher.addConsumer(biConsumer);
            this.pendingWatchers.add(etcdWatcher);
            if (this.pendingWatchers.size() == 1) {
                getGrpcWatchStreamObserver().onNext(toWatchCreateRequest(etcdWatcher));
            }
            return etcdWatcher;
        }, this.watchExecutor);
    }

    private void notifyWatchers(EtcdException etcdException) {
        WatchResponseWithError watchResponseWithError = new WatchResponseWithError(etcdException);
        this.pendingWatchers.forEach(etcdWatcher -> {
            etcdWatcher.notifyWatchResponse(watchResponseWithError);
        });
        this.pendingWatchers.clear();
        this.watchers.values().forEach(etcdWatcher2 -> {
            etcdWatcher2.notifyWatchResponse(watchResponseWithError);
        });
        this.watchers.clear();
    }

    public CompletableFuture<Void> unwatch(EtcdWatcher etcdWatcher) {
        return CompletableFuture.runAsync(() -> {
            cancelWatcher(etcdWatcher.getWatchID());
        }, this.watchExecutor);
    }

    private void cancelWatcher(long j) {
        if (isClosed() || this.cancelSet.contains(j)) {
            return;
        }
        this.watchers.remove(j);
        this.cancelSet.add(j);
        getGrpcWatchStreamObserver().onNext(WatchRequest.newBuilder().setCancelRequest(WatchCancelRequest.newBuilder().setWatchId(j).build()).build());
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this) {
            if (null == this.closeFuture) {
                log.info("Closing watch client");
                this.closeFuture = CompletableFuture.runAsync(() -> {
                    notifyWatchers(EtcdExceptionFactory.newClosedWatchClientException());
                    closeGrpcWatchStreamObserver();
                }, this.watchExecutor);
            }
            completableFuture = this.closeFuture;
        }
        return completableFuture.whenComplete((r3, th) -> {
            this.scheduler.shutdown();
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            FutureUtils.result(closeAsync());
        } catch (Exception e) {
            log.warn("Encountered exceptions on closing watch client", e);
        }
        this.scheduler.forceShutdown(10L, TimeUnit.SECONDS);
    }

    private StreamObserver<io.etcd.jetcd.api.WatchResponse> createWatchStreamObserver() {
        return new StreamObserver<io.etcd.jetcd.api.WatchResponse>() { // from class: org.apache.bookkeeper.metadata.etcd.EtcdWatchClient.1
            public void onNext(io.etcd.jetcd.api.WatchResponse watchResponse) {
                if (EtcdWatchClient.this.isClosed()) {
                    return;
                }
                EtcdWatchClient.this.watchExecutor.submit(() -> {
                    EtcdWatchClient.this.processWatchResponse(watchResponse);
                });
            }

            public void onError(Throwable th) {
                if (EtcdWatchClient.this.isClosed()) {
                    return;
                }
                EtcdWatchClient.this.watchExecutor.submit(() -> {
                    EtcdWatchClient.this.processError(th);
                });
            }

            public void onCompleted() {
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processWatchResponse(io.etcd.jetcd.api.WatchResponse watchResponse) {
        if (isClosed()) {
            return;
        }
        if (watchResponse.getCreated()) {
            processCreate(watchResponse);
        } else if (watchResponse.getCanceled()) {
            processCanceled(watchResponse);
        } else {
            processEvents(watchResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processError(Throwable th) {
        if (isClosed()) {
            return;
        }
        Status fromThrowable = Status.fromThrowable(th);
        if (!isHaltError(fromThrowable) && !isNoLeaderError(fromThrowable)) {
            this.scheduler.schedule(this::resume, 500L, TimeUnit.MILLISECONDS);
            return;
        }
        notifyWatchers(EtcdExceptionFactory.toEtcdException(fromThrowable));
        closeGrpcWatchStreamObserver();
        this.cancelSet.clear();
    }

    private void resume() {
        closeGrpcWatchStreamObserver();
        this.cancelSet.clear();
        resumeWatchers();
    }

    private synchronized StreamObserver<WatchRequest> getGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            this.grpcWatchStreamObserver = this.stub.watch(createWatchStreamObserver());
        }
        return this.grpcWatchStreamObserver;
    }

    private void closeGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            return;
        }
        this.grpcWatchStreamObserver.onCompleted();
        this.grpcWatchStreamObserver = null;
    }

    private void processCreate(io.etcd.jetcd.api.WatchResponse watchResponse) {
        EtcdWatcher poll = this.pendingWatchers.poll();
        sendNextWatchCreateRequest();
        if (poll == null) {
            log.warn("Watch client receives watch create response but find no corresponding watcher");
            return;
        }
        if (poll.isClosed()) {
            return;
        }
        if (watchResponse.getWatchId() == -1) {
            poll.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.INTERNAL, "etcd server failed to create watch id")));
            return;
        }
        if (poll.getRevision() == 0) {
            poll.setRevision(watchResponse.getHeader().getRevision());
        }
        poll.setWatchID(watchResponse.getWatchId());
        this.watchers.put(poll.getWatchID(), poll);
    }

    private Optional<WatchRequest> nextResume() {
        EtcdWatcher peek = this.pendingWatchers.peek();
        return peek != null ? Optional.of(toWatchCreateRequest(peek)) : Optional.empty();
    }

    private void sendNextWatchCreateRequest() {
        nextResume().ifPresent(watchRequest -> {
            getGrpcWatchStreamObserver().onNext(watchRequest);
        });
    }

    private void processEvents(io.etcd.jetcd.api.WatchResponse watchResponse) {
        EtcdWatcher etcdWatcher = (EtcdWatcher) this.watchers.get(watchResponse.getWatchId());
        if (etcdWatcher == null) {
            cancelWatcher(watchResponse.getWatchId());
            return;
        }
        if (watchResponse.getCompactRevision() != 0) {
            etcdWatcher.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newCompactedException(watchResponse.getCompactRevision())));
        } else if (watchResponse.getEventsCount() == 0) {
            etcdWatcher.setRevision(watchResponse.getHeader().getRevision());
        } else {
            etcdWatcher.notifyWatchResponse(new WatchResponseWithError(new WatchResponse(watchResponse)));
            etcdWatcher.setRevision(watchResponse.getEvents(watchResponse.getEventsCount() - 1).getKv().getModRevision() + 1);
        }
    }

    private void resumeWatchers() {
        this.watchers.values().forEach(etcdWatcher -> {
            if (etcdWatcher.isClosed()) {
                return;
            }
            etcdWatcher.setWatchID(-1L);
            this.pendingWatchers.add(etcdWatcher);
        });
        this.watchers.clear();
        sendNextWatchCreateRequest();
    }

    private void processCanceled(io.etcd.jetcd.api.WatchResponse watchResponse) {
        EtcdWatcher etcdWatcher = (EtcdWatcher) this.watchers.get(watchResponse.getWatchId());
        this.cancelSet.remove(watchResponse.getWatchId());
        if (etcdWatcher == null) {
            return;
        }
        String cancelReason = watchResponse.getCancelReason();
        if (Strings.isNullOrEmpty(cancelReason)) {
            etcdWatcher.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.OUT_OF_RANGE, "etcdserver: mvcc: required revision is a future revision")));
        } else {
            etcdWatcher.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException(ErrorCode.FAILED_PRECONDITION, cancelReason)));
        }
    }

    private static boolean isNoLeaderError(Status status) {
        return status.getCode() == Status.Code.UNAVAILABLE && "etcdserver: no leader".equals(status.getDescription());
    }

    private static boolean isHaltError(Status status) {
        return (status.getCode() == Status.Code.UNAVAILABLE || status.getCode() == Status.Code.INTERNAL) ? false : true;
    }

    private static WatchRequest toWatchCreateRequest(EtcdWatcher etcdWatcher) {
        ByteString unsafeWrap = UnsafeByteOperations.unsafeWrap(etcdWatcher.getKey().getBytes());
        WatchOption watchOption = etcdWatcher.getWatchOption();
        WatchCreateRequest.Builder startRevision = WatchCreateRequest.newBuilder().setKey(unsafeWrap).setPrevKv(watchOption.isPrevKV()).setProgressNotify(watchOption.isProgressNotify()).setStartRevision(etcdWatcher.getRevision());
        watchOption.getEndKey().ifPresent(byteSequence -> {
            startRevision.setRangeEnd(UnsafeByteOperations.unsafeWrap(byteSequence.getBytes()));
        });
        if (watchOption.isNoDelete()) {
            startRevision.addFilters(WatchCreateRequest.FilterType.NODELETE);
        }
        if (watchOption.isNoPut()) {
            startRevision.addFilters(WatchCreateRequest.FilterType.NOPUT);
        }
        return WatchRequest.newBuilder().setCreateRequest(startRevision).build();
    }
}
