package org.apache.bookkeeper.metadata.etcd.helpers;

import com.google.common.primitives.UnsignedBytes;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.options.GetOption;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/metadata/etcd/helpers/KeyStream.class */
public class KeyStream<T> {
    private static final Logger log = LoggerFactory.getLogger(KeyStream.class);
    private final KV kvClient;
    private final ByteSequence startKey;
    private final ByteSequence endKey;
    private final Function<ByteSequence, T> encoder;
    private final int batchSize;
    private ByteSequence nextKey;
    private ByteSequence lastKey;
    private boolean hasMore;

    public KeyStream(KV kv, ByteSequence byteSequence, ByteSequence byteSequence2, Function<ByteSequence, T> function) {
        this(kv, byteSequence, byteSequence2, function, 100);
    }

    public KeyStream(KV kv, ByteSequence byteSequence, ByteSequence byteSequence2, Function<ByteSequence, T> function, int i) {
        this.lastKey = null;
        this.hasMore = true;
        this.kvClient = kv;
        this.startKey = byteSequence;
        this.endKey = byteSequence2;
        this.nextKey = byteSequence;
        this.encoder = function;
        this.batchSize = i;
    }

    public CompletableFuture<List<T>> readNext() {
        int i = this.batchSize;
        synchronized (this) {
            if (!this.hasMore) {
                return FutureUtils.value(Collections.emptyList());
            }
            ByteSequence byteSequence = this.nextKey;
            if (null != this.lastKey) {
                i++;
            }
            if (log.isTraceEnabled()) {
                log.trace("Read keys between {} and {}", byteSequence.toString(StandardCharsets.UTF_8), this.endKey.toString(StandardCharsets.UTF_8));
            }
            return this.kvClient.get(byteSequence, GetOption.newBuilder().withRange(this.endKey).withKeysOnly(true).withLimit(i).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build()).thenApply(getResponse -> {
                ByteSequence byteSequence2;
                List kvs = getResponse.getKvs();
                synchronized (this) {
                    this.hasMore = getResponse.isMore();
                    byteSequence2 = this.lastKey;
                    if (kvs.size() > 0) {
                        ByteSequence key = ((KeyValue) kvs.get(kvs.size() - 1)).getKey();
                        this.nextKey = key;
                        this.lastKey = key;
                    }
                }
                if (null != byteSequence2 && kvs.size() > 0 && UnsignedBytes.lexicographicalComparator().compare(byteSequence2.getBytes(), ((KeyValue) kvs.get(0)).getKey().getBytes()) == 0) {
                    kvs.remove(0);
                }
                return (List) kvs.stream().map(keyValue -> {
                    return this.encoder.apply(keyValue.getKey());
                }).collect(Collectors.toList());
            });
        }
    }
}
