package org.apache.pulsar.broker.authorization;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Joiner;
import org.apache.pulsar.functions.runtime.shaded.javax.ws.rs.core.Response;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.class */
public class PulsarAuthorizationProvider implements AuthorizationProvider {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarAuthorizationProvider.class);
    public ServiceConfiguration conf;
    private PulsarResources pulsarResources;

    public PulsarAuthorizationProvider() {
    }

    public PulsarAuthorizationProvider(ServiceConfiguration serviceConfiguration, PulsarResources pulsarResources) throws IOException {
        initialize(serviceConfiguration, pulsarResources);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public void initialize(ServiceConfiguration serviceConfiguration, PulsarResources pulsarResources) throws IOException {
        Objects.requireNonNull(serviceConfiguration, "ServiceConfiguration can't be null");
        Objects.requireNonNull(pulsarResources, "PulsarResources can't be null");
        this.conf = serviceConfiguration;
        this.pulsarResources = pulsarResources;
        initialize(serviceConfiguration, (ConfigurationCacheService) null);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String str, AuthenticationDataSource authenticationDataSource) {
        return checkAuthorization(topicName, str, AuthAction.produce);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String str, AuthenticationDataSource authenticationDataSource, String str2) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            this.pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenAccept(optional -> {
                if (optional.isPresent()) {
                    if (StringUtils.isNotBlank(str2)) {
                        Set<String> set = ((Policies) optional.get()).auth_policies.getSubscriptionAuthentication().get(str2);
                        if (set != null && !set.isEmpty() && !set.contains(str)) {
                            log.warn("[{}] is not authorized to subscribe on {}-{}", str, topicName, str2);
                            completableFuture.complete(false);
                            return;
                        } else {
                            switch (((Policies) optional.get()).subscription_auth_mode) {
                                case Prefix:
                                    if (!str2.startsWith(str)) {
                                        completableFuture.completeExceptionally(new PulsarServerException(String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for topic: %s", str, topicName)));
                                        return;
                                    }
                                    break;
                            }
                        }
                    }
                } else if (log.isDebugEnabled()) {
                    log.debug("Policies node couldn't be found for topic : {}", topicName);
                }
                checkAuthorization(topicName, str, AuthAction.consume).thenAccept(bool -> {
                    completableFuture.complete(bool);
                }).exceptionally(th -> {
                    log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", str, topicName, th.getMessage());
                    completableFuture.completeExceptionally(th);
                    return null;
                });
            }).exceptionally(th -> {
                log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", str, topicName, th.getMessage());
                completableFuture.completeExceptionally(th);
                return null;
            });
        } catch (Exception e) {
            log.warn("Client  with Role - {} failed to get permissions for topic - {}. {}", str, topicName, e.getMessage());
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String str, AuthenticationDataSource authenticationDataSource) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        canProduceAsync(topicName, str, authenticationDataSource).whenComplete((bool, th) -> {
            if (th == null) {
                if (bool.booleanValue()) {
                    completableFuture.complete(bool);
                    return;
                }
            } else if (log.isDebugEnabled()) {
                log.debug("Topic [{}] Role [{}] exception occurred while trying to check Produce permissions. {}", topicName.toString(), str, th.getMessage());
            }
            canConsumeAsync(topicName, str, authenticationDataSource, null).whenComplete((bool, th) -> {
                if (th == null) {
                    completableFuture.complete(bool);
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Topic [{}] Role [{}] exception occurred while trying to check Consume permissions. {}", topicName.toString(), str, th.getMessage());
                }
                completableFuture.completeExceptionally(th);
            });
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String str, AuthenticationDataSource authenticationDataSource) {
        return allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.functions);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String str, AuthenticationDataSource authenticationDataSource) {
        return allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.sources);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String str, AuthenticationDataSource authenticationDataSource) {
        return allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.sinks);
    }

    private CompletableFuture<Boolean> allowConsumeOrProduceOpsAsync(NamespaceName namespaceName, String str, AuthenticationDataSource authenticationDataSource) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.consume).whenComplete((bool, th) -> {
            if (th == null) {
                if (bool.booleanValue()) {
                    completableFuture.complete(bool);
                    return;
                }
            } else if (log.isDebugEnabled()) {
                log.debug("Namespace [{}] Role [{}] exception occurred while trying to check Consume permission. {}", namespaceName, str, th.getCause());
            }
            allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.produce).whenComplete((bool, th) -> {
                if (th == null) {
                    completableFuture.complete(bool);
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Namespace [{}] Role [{}] exception occurred while trying to check Produce permission. {}", namespaceName, str, th.getCause());
                }
                completableFuture.completeExceptionally(th.getCause());
            });
        });
        return completableFuture;
    }

    private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String str, AuthenticationDataSource authenticationDataSource, AuthAction authAction) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            this.pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optional -> {
                if (optional.isPresent()) {
                    Map<String, Set<AuthAction>> namespaceAuthentication = ((Policies) optional.get()).auth_policies.getNamespaceAuthentication();
                    Set<AuthAction> set = namespaceAuthentication.get(str);
                    if (set != null && set.contains(authAction)) {
                        completableFuture.complete(true);
                        return;
                    } else if (this.conf.isAuthorizationAllowWildcardsMatching() && checkWildcardPermission(str, authAction, namespaceAuthentication)) {
                        completableFuture.complete(true);
                        return;
                    }
                } else if (log.isDebugEnabled()) {
                    log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
                }
                completableFuture.complete(false);
            }).exceptionally(th -> {
                log.warn("Client  with Role - {} failed to get permissions for namespace - {}. {}", str, namespaceName, th.getMessage());
                completableFuture.completeExceptionally(th);
                return null;
            });
        } catch (Exception e) {
            log.warn("Client  with Role - {} failed to get permissions for namespace - {}. {}", str, namespaceName, e.getMessage());
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> set, String str, String str2) {
        try {
            validatePoliciesReadOnlyAccess();
            String topicName2 = topicName.toString();
            return this.pulsarResources.getNamespaceResources().setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
                policies.auth_policies.getTopicAuthentication().computeIfAbsent(topicName2, str3 -> {
                    return new HashMap();
                }).put(str, set);
                return policies;
            }).whenComplete((r11, th) -> {
                if (th != null) {
                    log.error("[{}] Failed to set permissions for role {} on topic {}", str, str, topicName, th);
                } else {
                    log.info("[{}] Successfully granted access for role {}: {} - topic {}", str, str, set, topicName2);
                }
            });
        } catch (Exception e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName, Set<AuthAction> set, String str, String str2) {
        try {
            validatePoliciesReadOnlyAccess();
            return this.pulsarResources.getNamespaceResources().setPoliciesAsync(namespaceName, policies -> {
                policies.auth_policies.getNamespaceAuthentication().put(str, set);
                return policies;
            }).whenComplete((r10, th) -> {
                if (th != null) {
                    log.error("[{}] Failed to set permissions for role {} namespace {}", str, str, namespaceName, th);
                } else {
                    log.info("[{}] Successfully granted access for role {}: {} - namespace {}", str, str, set, namespaceName);
                }
            });
        } catch (Exception e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName namespaceName, String str, Set<String> set, String str2) {
        return updateSubscriptionPermissionAsync(namespaceName, str, set, false);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespaceName, String str, String str2, String str3) {
        return updateSubscriptionPermissionAsync(namespaceName, str, Collections.singleton(str2), true);
    }

    private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespaceName, String str, Set<String> set, boolean z) {
        try {
            validatePoliciesReadOnlyAccess();
            return this.pulsarResources.getNamespaceResources().setPoliciesAsync(namespaceName, policies -> {
                if (z) {
                    Set<String> set2 = policies.auth_policies.getSubscriptionAuthentication().get(str);
                    if (set2 == null) {
                        log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespaceName, set, str);
                        throw new IllegalArgumentException("couldn't find subscription");
                    }
                    set2.removeAll(set);
                } else {
                    policies.auth_policies.getSubscriptionAuthentication().put(str, set);
                }
                return policies;
            }).whenComplete((r10, th) -> {
                if (th != null) {
                    log.error("[{}] Failed to get permissions for role {} on namespace {}", str, set, namespaceName, th);
                } else {
                    log.info("[{}] Successfully granted access for role {} for sub = {}", namespaceName, set, str);
                }
            });
        } catch (Exception e) {
            return FutureUtil.failedFuture(e);
        }
    }

    private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String str, AuthAction authAction) {
        return checkPermission(topicName, str, authAction).thenApply(bool -> {
            return bool;
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool2 -> {
            return bool2.booleanValue() ? checkCluster(topicName) : CompletableFuture.completedFuture(false);
        });
    }

    private CompletableFuture<Boolean> checkCluster(TopicName topicName) {
        if (topicName.isGlobal() || this.conf.getClusterName().equals(topicName.getCluster())) {
            return CompletableFuture.completedFuture(true);
        }
        if (log.isDebugEnabled()) {
            log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), this.conf.getClusterName());
        }
        return this.pulsarResources.getClusterResources().listAsync().thenApply(set -> {
            return Boolean.valueOf(set.contains(topicName.getCluster()));
        });
    }

    public CompletableFuture<Boolean> checkPermission(TopicName topicName, String str, AuthAction authAction) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            this.pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenAccept(optional -> {
                Map<String, Set<AuthAction>> map;
                Set<AuthAction> set;
                Set<AuthAction> set2;
                if (optional.isPresent()) {
                    Map<String, Set<AuthAction>> namespaceAuthentication = ((Policies) optional.get()).auth_policies.getNamespaceAuthentication();
                    Set<AuthAction> set3 = namespaceAuthentication.get(str);
                    if (set3 != null && set3.contains(authAction)) {
                        completableFuture.complete(true);
                        return;
                    }
                    Map<String, Set<AuthAction>> map2 = ((Policies) optional.get()).auth_policies.getTopicAuthentication().get(topicName.toString());
                    if (map2 != null && str != null && (set2 = map2.get(str)) != null && set2.contains(authAction)) {
                        completableFuture.complete(true);
                        return;
                    }
                    if (this.conf.isAuthorizationAllowWildcardsMatching()) {
                        if (checkWildcardPermission(str, authAction, namespaceAuthentication)) {
                            completableFuture.complete(true);
                            return;
                        } else if (map2 != null && checkWildcardPermission(str, authAction, map2)) {
                            completableFuture.complete(true);
                            return;
                        }
                    }
                    if (topicName.isPartitioned() && (map = ((Policies) optional.get()).auth_policies.getTopicAuthentication().get(topicName.getPartitionedTopicName())) != null && (set = map.get(str)) != null && set.contains(authAction)) {
                        completableFuture.complete(true);
                        return;
                    }
                } else if (log.isDebugEnabled()) {
                    log.debug("Policies node couldn't be found for topic : {}", topicName);
                }
                completableFuture.complete(false);
            }).exceptionally(th -> {
                log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", str, topicName, th.getMessage());
                completableFuture.completeExceptionally(th);
                return null;
            });
        } catch (Exception e) {
            log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", str, topicName, e.getMessage());
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private boolean checkWildcardPermission(String str, AuthAction authAction, Map<String, Set<AuthAction>> map) {
        for (Map.Entry<String, Set<AuthAction>> entry : map.entrySet()) {
            String key = entry.getKey();
            Set<AuthAction> value = entry.getValue();
            if (str != null) {
                if (key.charAt(key.length() - 1) == '*' && str.startsWith(key.substring(0, key.length() - 1)) && value.contains(authAction)) {
                    return true;
                }
                if (key.charAt(0) == '*' && str.endsWith(key.substring(1)) && value.contains(authAction)) {
                    return true;
                }
            }
        }
        return false;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    private void validatePoliciesReadOnlyAccess() {
        try {
            if (this.pulsarResources.getNamespaceResources().getPoliciesReadOnly()) {
                if (log.isDebugEnabled()) {
                    log.debug("Policies are read-only. Broker cannot do read-write operations");
                }
                throw new IllegalStateException("policies are in readonly mode");
            }
        } catch (Exception e) {
            log.warn("Unable to check if policies are read-only", (Throwable) e);
            throw new IllegalStateException("Unable to fetch content from configuration metadata store");
        }
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowTenantOperationAsync(String str, String str2, TenantOperation tenantOperation, AuthenticationDataSource authenticationDataSource) {
        return validateTenantAdminAccess(str, str2, authenticationDataSource);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, String str, NamespaceOperation namespaceOperation, AuthenticationDataSource authenticationDataSource) {
        if (log.isDebugEnabled()) {
            log.debug("Check allowNamespaceOperationAsync [{}] on [{}].", namespaceOperation.name(), namespaceName);
        }
        return validateTenantAdminAccess(namespaceName.getTenant(), str, authenticationDataSource).thenCompose(bool -> {
            if (log.isDebugEnabled()) {
                log.debug("Verify if role {} is allowed to {} to namespace {}: isSuperUserOrAdmin={}", str, namespaceOperation, namespaceName, bool);
            }
            if (bool.booleanValue()) {
                return CompletableFuture.completedFuture(true);
            }
            switch (namespaceOperation) {
                case PACKAGES:
                    return allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.packages);
                case GET_TOPIC:
                case GET_TOPICS:
                case GET_BUNDLE:
                    return allowConsumeOrProduceOpsAsync(namespaceName, str, authenticationDataSource);
                case UNSUBSCRIBE:
                case CLEAR_BACKLOG:
                    return allowTheSpecifiedActionOpsAsync(namespaceName, str, authenticationDataSource, AuthAction.consume);
                case CREATE_TOPIC:
                case DELETE_TOPIC:
                case ADD_BUNDLE:
                case DELETE_BUNDLE:
                case GRANT_PERMISSION:
                case GET_PERMISSION:
                case REVOKE_PERMISSION:
                    return CompletableFuture.completedFuture(false);
                default:
                    return FutureUtil.failedFuture(new IllegalStateException("NamespaceOperation [" + namespaceOperation.name() + "] is not supported."));
            }
        });
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policyName, PolicyOperation policyOperation, String str, AuthenticationDataSource authenticationDataSource) {
        return validateTenantAdminAccess(namespaceName.getTenant(), str, authenticationDataSource);
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName, String str, TopicOperation topicOperation, AuthenticationDataSource authenticationDataSource) {
        if (log.isDebugEnabled()) {
            log.debug("Check allowTopicOperationAsync [{}] on [{}].", topicOperation.name(), topicName);
        }
        return validateTenantAdminAccess(topicName.getTenant(), str, authenticationDataSource).thenCompose(bool -> {
            if (log.isDebugEnabled()) {
                log.debug("Verify if role {} is allowed to {} to topic {}: isSuperUserOrAdmin={}", str, topicOperation, topicName, bool);
            }
            if (bool.booleanValue()) {
                return CompletableFuture.completedFuture(true);
            }
            switch (topicOperation) {
                case LOOKUP:
                case GET_STATS:
                case GET_METADATA:
                    return canLookupAsync(topicName, str, authenticationDataSource);
                case PRODUCE:
                    return canProduceAsync(topicName, str, authenticationDataSource);
                case GET_SUBSCRIPTIONS:
                case CONSUME:
                case SUBSCRIBE:
                case UNSUBSCRIBE:
                case SKIP:
                case EXPIRE_MESSAGES:
                case PEEK_MESSAGES:
                case RESET_CURSOR:
                case GET_BACKLOG_SIZE:
                case SET_REPLICATED_SUBSCRIPTION_STATUS:
                case GET_REPLICATED_SUBSCRIPTION_STATUS:
                    return canConsumeAsync(topicName, str, authenticationDataSource, authenticationDataSource.getSubscription());
                case TERMINATE:
                case COMPACT:
                case OFFLOAD:
                case UNLOAD:
                case ADD_BUNDLE_RANGE:
                case GET_BUNDLE_RANGE:
                case DELETE_BUNDLE_RANGE:
                    return CompletableFuture.completedFuture(false);
                default:
                    return FutureUtil.failedFuture(new IllegalStateException("TopicOperation [" + topicOperation.name() + "] is not supported."));
            }
        });
    }

    @Override // org.apache.pulsar.broker.authorization.AuthorizationProvider
    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topicName, String str, PolicyName policyName, PolicyOperation policyOperation, AuthenticationDataSource authenticationDataSource) {
        return validateTenantAdminAccess(topicName.getTenant(), str, authenticationDataSource);
    }

    private static String path(String... strArr) {
        StringBuilder sb = new StringBuilder();
        sb.append("/admin/");
        Joiner.on('/').appendTo(sb, (Object[]) strArr);
        return sb.toString();
    }

    public CompletableFuture<Boolean> validateTenantAdminAccess(String str, String str2, AuthenticationDataSource authenticationDataSource) {
        return isSuperUser(str2, authenticationDataSource, this.conf).thenCompose(bool -> {
            if (bool.booleanValue()) {
                return CompletableFuture.completedFuture(true);
            }
            try {
                return isTenantAdmin(str, str2, this.pulsarResources.getTenantResources().getTenant(str).orElseThrow(() -> {
                    return new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
                }), authenticationDataSource);
            } catch (MetadataStoreException.NotFoundException e) {
                log.warn("Failed to get tenant info data for non existing tenant {}", str);
                throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
            } catch (Exception e2) {
                log.error("Failed to get tenant {}", str, e2);
                throw new RestException(e2);
            }
        });
    }
}
