package org.apache.distributedlog.impl.subscription;

import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.versioning.LongVersion;
import dlshade.org.apache.commons.lang3.tuple.Pair;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.util.Utils;

/* loaded from: input_file:org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.class */
public class ZKSubscriptionsStore implements SubscriptionsStore {
    private final ZooKeeperClient zkc;
    private final String zkPath;
    private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers = new ConcurrentHashMap();

    public ZKSubscriptionsStore(ZooKeeperClient zooKeeperClient, String str) {
        this.zkc = zooKeeperClient;
        this.zkPath = str;
    }

    private ZKSubscriptionStateStore getSubscriber(String str) {
        ZKSubscriptionStateStore zKSubscriptionStateStore = this.subscribers.get(str);
        if (zKSubscriptionStateStore == null) {
            ZKSubscriptionStateStore zKSubscriptionStateStore2 = new ZKSubscriptionStateStore(this.zkc, getSubscriberZKPath(str));
            ZKSubscriptionStateStore putIfAbsent = this.subscribers.putIfAbsent(str, zKSubscriptionStateStore2);
            if (putIfAbsent == null) {
                zKSubscriptionStateStore = zKSubscriptionStateStore2;
            } else {
                try {
                    zKSubscriptionStateStore2.close();
                } catch (IOException e) {
                }
                zKSubscriptionStateStore = putIfAbsent;
            }
        }
        return zKSubscriptionStateStore;
    }

    private String getSubscriberZKPath(String str) {
        return String.format("%s/%s", this.zkPath, str);
    }

    @Override // org.apache.distributedlog.api.subscription.SubscriptionsStore
    public CompletableFuture<DLSN> getLastCommitPosition(String str) {
        return getSubscriber(str).getLastCommitPosition();
    }

    @Override // org.apache.distributedlog.api.subscription.SubscriptionsStore
    public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() {
        final CompletableFuture<Map<String, DLSN>> completableFuture = new CompletableFuture<>();
        try {
            this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { // from class: org.apache.distributedlog.impl.subscription.ZKSubscriptionsStore.1
                @Override // dlshade.org.apache.zookeeper.AsyncCallback.Children2Callback
                public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        completableFuture.complete(new HashMap());
                    } else if (KeeperException.Code.OK.intValue() != i) {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i), str));
                    } else {
                        ZKSubscriptionsStore.this.getLastCommitPositions(completableFuture, list);
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(new DLInterruptedException("getLastCommitPositions was interrupted", e));
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getLastCommitPositions(CompletableFuture<Map<String, DLSN>> completableFuture, List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (String str : list) {
            arrayList.add(getSubscriber(str).getLastCommitPositionFromZK().thenApply(dlsn -> {
                return Pair.of(str, dlsn);
            }));
        }
        FutureUtils.collect(arrayList).thenAccept(list2 -> {
            HashMap hashMap = new HashMap();
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                Pair pair = (Pair) it.next();
                hashMap.put((String) pair.getLeft(), (DLSN) pair.getRight());
            }
            completableFuture.complete(hashMap);
        });
    }

    @Override // org.apache.distributedlog.api.subscription.SubscriptionsStore
    public CompletableFuture<Void> advanceCommitPosition(String str, DLSN dlsn) {
        return getSubscriber(str).advanceCommitPosition(dlsn);
    }

    @Override // org.apache.distributedlog.api.subscription.SubscriptionsStore
    public CompletableFuture<Boolean> deleteSubscriber(String str) {
        this.subscribers.remove(str);
        return Utils.zkDeleteIfNotExist(this.zkc, getSubscriberZKPath(str), new LongVersion(-1L));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<ZKSubscriptionStateStore> it = this.subscribers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
