package org.apache.distributedlog.impl.acl;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.thrift.AccessControlEntry;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/impl/acl/ZKAccessControlManager.class */
public class ZKAccessControlManager implements AccessControlManager, Watcher {
    private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class);
    private static final int ZK_RETRY_BACKOFF_MS = 500;
    protected final DistributedLogConfiguration conf;
    protected final ZooKeeperClient zkc;
    protected final String zkRootPath;
    protected final ScheduledExecutorService scheduledExecutorService;
    protected ZKAccessControl defaultAccessControl;
    protected volatile boolean closed = false;
    protected final ConcurrentMap<String, ZKAccessControl> streamEntries = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.distributedlog.impl.acl.ZKAccessControlManager$6, reason: invalid class name */
    /* loaded from: input_file:org/apache/distributedlog/impl/acl/ZKAccessControlManager$6.class */
    public class AnonymousClass6 implements Runnable {
        AnonymousClass6() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ZKAccessControlManager.this.fetchDefaultAccessControlEntry().whenComplete((BiConsumer) new FutureEventListener<ZKAccessControl>() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.6.1
                public void onSuccess(ZKAccessControl zKAccessControl) {
                    ZKAccessControlManager.this.fetchAccessControlEntries().whenComplete((BiConsumer) new FutureEventListener<Void>() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.6.1.1
                        public void onSuccess(Void r2) {
                        }

                        public void onFailure(Throwable th) {
                            ZKAccessControlManager.logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ", Integer.valueOf(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS), th);
                            ZKAccessControlManager.this.refetchAccessControlEntries(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS);
                        }
                    });
                }

                public void onFailure(Throwable th) {
                    ZKAccessControlManager.logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ", Integer.valueOf(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS), th);
                    ZKAccessControlManager.this.refetchAllAccessControlEntries(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS);
                }
            });
        }
    }

    public ZKAccessControlManager(DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClient zooKeeperClient, String str, ScheduledExecutorService scheduledExecutorService) throws IOException {
        this.conf = distributedLogConfiguration;
        this.zkc = zooKeeperClient;
        this.zkRootPath = str;
        this.scheduledExecutorService = scheduledExecutorService;
        try {
            FutureUtils.result(fetchDefaultAccessControlEntry());
            try {
                FutureUtils.result(fetchAccessControlEntries());
            } catch (Throwable th) {
                if (th instanceof InterruptedException) {
                    throw new DLInterruptedException("Interrupted on getting access control entries for " + str, th);
                }
                if (th instanceof KeeperException) {
                    throw new IOException("Encountered zookeeper exception on getting access control entries for " + str, th);
                }
                if (!(th instanceof IOException)) {
                    throw new IOException("Encountered unknown exception on getting access control entries for " + str, th);
                }
                throw ((IOException) th);
            }
        } catch (Throwable th2) {
            if (th2 instanceof InterruptedException) {
                throw new DLInterruptedException("Interrupted on getting default access control entry for " + str, th2);
            }
            if (th2 instanceof KeeperException) {
                throw new IOException("Encountered zookeeper exception on getting default access control entry for " + str, th2);
            }
            if (!(th2 instanceof IOException)) {
                throw new IOException("Encountered unknown exception on getting access control entries for " + str, th2);
            }
            throw ((IOException) th2);
        }
    }

    protected AccessControlEntry getAccessControlEntry(String str) {
        ZKAccessControl zKAccessControl = this.streamEntries.get(str);
        return (null == zKAccessControl ? this.defaultAccessControl : zKAccessControl).getAccessControlEntry();
    }

    @Override // org.apache.distributedlog.acl.AccessControlManager
    public boolean allowWrite(String str) {
        return !getAccessControlEntry(str).isDenyWrite();
    }

    @Override // org.apache.distributedlog.acl.AccessControlManager
    public boolean allowTruncate(String str) {
        return !getAccessControlEntry(str).isDenyTruncate();
    }

    @Override // org.apache.distributedlog.acl.AccessControlManager
    public boolean allowDelete(String str) {
        return !getAccessControlEntry(str).isDenyDelete();
    }

    @Override // org.apache.distributedlog.acl.AccessControlManager
    public boolean allowAcquire(String str) {
        return !getAccessControlEntry(str).isDenyAcquire();
    }

    @Override // org.apache.distributedlog.acl.AccessControlManager
    public boolean allowRelease(String str) {
        return !getAccessControlEntry(str).isDenyRelease();
    }

    @Override // org.apache.distributedlog.acl.AccessControlManager
    public void close() {
        this.closed = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Void> fetchAccessControlEntries() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        fetchAccessControlEntries(completableFuture);
        return completableFuture;
    }

    private void fetchAccessControlEntries(final CompletableFuture<Void> completableFuture) {
        try {
            this.zkc.get().getChildren(this.zkRootPath, this, new AsyncCallback.Children2Callback() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.1
                public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
                    if (KeeperException.Code.OK.intValue() != i) {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i)));
                        return;
                    }
                    HashSet<String> hashSet = new HashSet();
                    hashSet.addAll(list);
                    for (String str2 : Sets.difference(ZKAccessControlManager.this.streamEntries.keySet(), hashSet).immutableCopy()) {
                        ZKAccessControl remove = ZKAccessControlManager.this.streamEntries.remove(str2);
                        if (null != remove) {
                            ZKAccessControlManager.logger.info("Removed Access Control Entry for stream {} : {}", str2, remove.getAccessControlEntry());
                        }
                    }
                    if (hashSet.isEmpty()) {
                        completableFuture.complete(null);
                        return;
                    }
                    final AtomicInteger atomicInteger = new AtomicInteger(hashSet.size());
                    final AtomicInteger atomicInteger2 = new AtomicInteger(0);
                    for (final String str3 : hashSet) {
                        ZKAccessControl.read(ZKAccessControlManager.this.zkc, ZKAccessControlManager.this.zkRootPath + "/" + str3, null).whenComplete((BiConsumer<? super ZKAccessControl, ? super Throwable>) new FutureEventListener<ZKAccessControl>() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.1.1
                            public void onSuccess(ZKAccessControl zKAccessControl) {
                                ZKAccessControlManager.this.streamEntries.put(str3, zKAccessControl);
                                ZKAccessControlManager.logger.info("Added overrided access control for stream {} : {}", str3, zKAccessControl.getAccessControlEntry());
                                complete();
                            }

                            public void onFailure(Throwable th) {
                                if (th instanceof KeeperException.NoNodeException) {
                                    ZKAccessControlManager.this.streamEntries.remove(str3);
                                } else if (th instanceof ZKAccessControl.CorruptedAccessControlException) {
                                    ZKAccessControlManager.logger.warn("Access control is corrupted for stream {} @ {},skipped it ...", new Object[]{str3, ZKAccessControlManager.this.zkRootPath, th});
                                    ZKAccessControlManager.this.streamEntries.remove(str3);
                                } else if (1 == atomicInteger2.incrementAndGet()) {
                                    completableFuture.completeExceptionally(th);
                                }
                                complete();
                            }

                            private void complete() {
                                if (0 == atomicInteger.decrementAndGet() && atomicInteger2.get() == 0) {
                                    completableFuture.complete(null);
                                }
                            }
                        });
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<ZKAccessControl> fetchDefaultAccessControlEntry() {
        CompletableFuture<ZKAccessControl> completableFuture = new CompletableFuture<>();
        fetchDefaultAccessControlEntry(completableFuture);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchDefaultAccessControlEntry(final CompletableFuture<ZKAccessControl> completableFuture) {
        ZKAccessControl.read(this.zkc, this.zkRootPath, this).whenComplete((BiConsumer<? super ZKAccessControl, ? super Throwable>) new FutureEventListener<ZKAccessControl>() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.2
            public void onSuccess(ZKAccessControl zKAccessControl) {
                ZKAccessControlManager.logger.info("Default Access Control will be changed from {} to {}", ZKAccessControlManager.this.defaultAccessControl, zKAccessControl);
                ZKAccessControlManager.this.defaultAccessControl = zKAccessControl;
                completableFuture.complete(zKAccessControl);
            }

            public void onFailure(Throwable th) {
                if (!(th instanceof KeeperException.NoNodeException)) {
                    completableFuture.completeExceptionally(th);
                } else {
                    ZKAccessControlManager.logger.info("Default Access Control is missing, creating one for {} ...", ZKAccessControlManager.this.zkRootPath);
                    ZKAccessControlManager.this.createDefaultAccessControlEntryIfNeeded(completableFuture);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createDefaultAccessControlEntryIfNeeded(final CompletableFuture<ZKAccessControl> completableFuture) {
        try {
            ZkUtils.asyncCreateFullPathOptimistic(this.zkc.get(), this.zkRootPath, new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.3
                public void processResult(int i, String str, Object obj, String str2) {
                    if (KeeperException.Code.OK.intValue() != i) {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i)));
                    } else {
                        ZKAccessControlManager.logger.info("Created zk path {} for default ACL.", ZKAccessControlManager.this.zkRootPath);
                        ZKAccessControlManager.this.fetchDefaultAccessControlEntry(completableFuture);
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refetchDefaultAccessControlEntry(int i) {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.4
            @Override // java.lang.Runnable
            public void run() {
                ZKAccessControlManager.this.fetchDefaultAccessControlEntry().whenComplete((BiConsumer) new FutureEventListener<ZKAccessControl>() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.4.1
                    public void onSuccess(ZKAccessControl zKAccessControl) {
                    }

                    public void onFailure(Throwable th) {
                        if (th instanceof ZKAccessControl.CorruptedAccessControlException) {
                            ZKAccessControlManager.logger.warn("Default access control entry is corrupted, ignore this update : ", th);
                        } else {
                            ZKAccessControlManager.logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ", Integer.valueOf(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS), th);
                            ZKAccessControlManager.this.refetchDefaultAccessControlEntry(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS);
                        }
                    }
                });
            }
        }, i, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refetchAccessControlEntries(int i) {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.5
            @Override // java.lang.Runnable
            public void run() {
                ZKAccessControlManager.this.fetchAccessControlEntries().whenComplete((BiConsumer) new FutureEventListener<Void>() { // from class: org.apache.distributedlog.impl.acl.ZKAccessControlManager.5.1
                    public void onSuccess(Void r2) {
                    }

                    public void onFailure(Throwable th) {
                        ZKAccessControlManager.logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ", Integer.valueOf(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS), th);
                        ZKAccessControlManager.this.refetchAccessControlEntries(ZKAccessControlManager.ZK_RETRY_BACKOFF_MS);
                    }
                });
            }
        }, i, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refetchAllAccessControlEntries(int i) {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(new AnonymousClass6(), i, TimeUnit.MILLISECONDS);
    }

    public void process(WatchedEvent watchedEvent) {
        if (Watcher.Event.EventType.None.equals(watchedEvent.getType())) {
            if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                refetchAllAccessControlEntries(0);
            }
        } else if (Watcher.Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
            logger.info("Default ACL for {} is changed, refetching ...", this.zkRootPath);
            refetchDefaultAccessControlEntry(0);
        } else if (Watcher.Event.EventType.NodeChildrenChanged.equals(watchedEvent.getType())) {
            logger.info("List of ACLs for {} are changed, refetching ...", this.zkRootPath);
            refetchAccessControlEntries(0);
        }
    }
}
