package org.apache.pulsar.broker.admin.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiParam;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.io.swagger.annotations.Example;
import org.apache.pulsar.shade.io.swagger.annotations.ExampleProperty;
import org.apache.pulsar.shade.javax.ws.rs.DELETE;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.POST;
import org.apache.pulsar.shade.javax.ws.rs.PUT;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/ClustersBase.class */
public class ClustersBase extends PulsarWebResource {
    private static final Logger log = LoggerFactory.getLogger(ClustersBase.class);

    @GET
    @ApiOperation(value = "Get the list of all the Pulsar clusters.", response = String.class, responseContainer = "Set")
    @ApiResponses({@ApiResponse(code = 200, message = "Return a list of clusters."), @ApiResponse(code = 500, message = "Internal server error.")})
    public Set<String> getClusters() throws Exception {
        try {
            return (Set) clusterResources().list().stream().filter(str -> {
                return !Constants.GLOBAL_CLUSTER.equals(str);
            }).collect(Collectors.toSet());
        } catch (Exception e) {
            log.error("[{}] Failed to get clusters list", clientAppId(), e);
            throw new RestException(e);
        }
    }

    @Path("/{cluster}")
    @GET
    @ApiOperation(value = "Get the configuration for the specified cluster.", response = ClusterDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterDataImpl.class), @ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public ClusterData getCluster(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str) {
        validateSuperUserAccess();
        try {
            return clusterResources().getCluster(str).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            });
        } catch (Exception e) {
            log.error("[{}] Failed to get cluster {}", new Object[]{clientAppId(), str, e});
            if (e instanceof RestException) {
                throw ((RestException) e);
            }
            throw new RestException(e);
        }
    }

    @PUT
    @Path("/{cluster}")
    @ApiOperation(value = "Create a new cluster.", notes = "This operation requires Pulsar superuser privileges, and the name cannot contain the '/' characters.")
    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been created."), @ApiResponse(code = 403, message = "You don't have admin permission to create the cluster."), @ApiResponse(code = 409, message = "Cluster already exists."), @ApiResponse(code = 412, message = "Cluster name is not valid."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void createCluster(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The cluster data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   'serviceUrl': 'http://pulsar.example.com:8080',\n   'brokerServiceUrl': 'pulsar://pulsar.example.com:6651',\n}")})) ClusterDataImpl clusterDataImpl) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        if (clusterDataImpl == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "cluster data is required");
        }
        try {
            NamedEntity.checkName(str);
            if (clusterResources().getCluster(str).isPresent()) {
                log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), str);
                throw new RestException(Response.Status.CONFLICT, "Cluster already exists");
            }
            clusterResources().createCluster(str, clusterDataImpl);
            log.info("[{}] Created cluster {}", clientAppId(), str);
        } catch (IllegalArgumentException e) {
            log.warn("[{}] Failed to create cluster with invalid name {}", new Object[]{clientAppId(), str, e});
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster name is not valid");
        } catch (Exception e2) {
            log.error("[{}] Failed to create cluster {}", new Object[]{clientAppId(), str, e2});
            throw new RestException(e2);
        }
    }

    @Path("/{cluster}")
    @POST
    @ApiOperation(value = "Update the configuration for a cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been updated."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void updateCluster(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The cluster data", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{\n   'serviceUrl': 'http://pulsar.example.com:8080',\n   'brokerServiceUrl': 'pulsar://pulsar.example.com:6651'\n}")})) ClusterDataImpl clusterDataImpl) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        try {
            clusterResources().updateCluster(str, clusterData -> {
                return clusterDataImpl;
            });
            log.info("[{}] Updated cluster {}", clientAppId(), str);
        } catch (MetadataStoreException.NotFoundException e) {
            log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to update cluster {}", new Object[]{clientAppId(), str, e2});
            throw new RestException(e2);
        }
    }

    @Path("/{cluster}/peers")
    @POST
    @ApiOperation(value = "Update peer-cluster-list for a cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been updated."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 412, message = "Peer cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void setPeerClusterNames(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The list of peer cluster names", required = true, examples = @Example({@ExampleProperty(mediaType = "application/json", value = "[\n   'cluster-a',\n   'cluster-b'\n]")})) LinkedHashSet<String> linkedHashSet) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        if (linkedHashSet != null && !linkedHashSet.isEmpty()) {
            Iterator<String> it = linkedHashSet.iterator();
            while (it.hasNext()) {
                String next = it.next();
                try {
                    if (str.equalsIgnoreCase(next)) {
                        throw new RestException(Response.Status.PRECONDITION_FAILED, str + " itself can't be part of peer-list");
                    }
                    clusterResources().getCluster(next).orElseThrow(() -> {
                        return new RestException(Response.Status.PRECONDITION_FAILED, "Peer cluster " + next + " does not exist");
                    });
                } catch (RestException e) {
                    log.warn("[{}] Peer cluster doesn't exist from {}, {}", new Object[]{clientAppId(), linkedHashSet, e.getMessage()});
                    throw e;
                } catch (Exception e2) {
                    log.warn("[{}] Failed to validate peer-cluster list {}, {}", new Object[]{clientAppId(), linkedHashSet, e2.getMessage()});
                    throw new RestException(e2);
                }
            }
        }
        try {
            clusterResources().updateCluster(str, clusterData -> {
                return clusterData.m4711clone().peerClusterNames(linkedHashSet).build();
            });
            log.info("[{}] Successfully added peer-cluster {} for {}", new Object[]{clientAppId(), linkedHashSet, str});
        } catch (MetadataStoreException.NotFoundException e3) {
            log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
        } catch (Exception e4) {
            log.error("[{}] Failed to update cluster {}", new Object[]{clientAppId(), str, e4});
            throw new RestException(e4);
        }
    }

    @Path("/{cluster}/peers")
    @GET
    @ApiOperation(value = "Get the peer-cluster data for the specified cluster.", response = String.class, responseContainer = "Set", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public Set<String> getPeerCluster(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str) {
        validateSuperUserAccess();
        try {
            return clusterResources().getCluster(str).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            }).getPeerClusterNames();
        } catch (Exception e) {
            log.error("[{}] Failed to get cluster {}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        }
    }

    @DELETE
    @Path("/{cluster}")
    @ApiOperation(value = "Delete an existing cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 204, message = "Cluster has been deleted."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 412, message = "Cluster is not empty."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void deleteCluster(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str) {
        validateSuperUserAccess();
        validatePoliciesReadOnlyAccess();
        try {
            boolean isClusterUsed = pulsar().getPulsarResources().getClusterResources().isClusterUsed(str);
            Optional<NamespaceIsolationPolicies> isolationDataPolicies = namespaceIsolationPolicies().getIsolationDataPolicies(str);
            if (isolationDataPolicies.isPresent()) {
                if (isolationDataPolicies.get().getPolicies().isEmpty()) {
                    namespaceIsolationPolicies().deleteIsolationData(str);
                } else {
                    isClusterUsed = true;
                }
            }
            if (isClusterUsed) {
                log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), str);
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster not empty");
            }
            try {
                clusterResources().getFailureDomainResources().deleteFailureDomains(str);
                clusterResources().deleteCluster(str);
                log.info("[{}] Deleted cluster {}", clientAppId(), str);
            } catch (MetadataStoreException.NotFoundException e) {
                log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), str);
                throw new RestException(Response.Status.NOT_FOUND, "Cluster does not exist");
            } catch (Exception e2) {
                log.error("[{}] Failed to delete cluster {}", new Object[]{clientAppId(), str, e2});
                throw new RestException(e2);
            }
        } catch (Exception e3) {
            log.error("[{}] Failed to get cluster usage {}", new Object[]{clientAppId(), str, e3});
            throw new RestException(e3);
        }
    }

    @Path("/{cluster}/namespaceIsolationPolicies")
    @GET
    @ApiOperation(value = "Get the namespace isolation policies assigned to the cluster.", response = NamespaceIsolationDataImpl.class, responseContainer = "Map", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public Map<String, ? extends NamespaceIsolationData> getNamespaceIsolationPolicies(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str) throws Exception {
        validateSuperUserAccess();
        if (!clusterResources().clusterExists(str)) {
            throw new RestException(Response.Status.NOT_FOUND, "Cluster " + str + " does not exist.");
        }
        try {
            return namespaceIsolationPolicies().getIsolationDataPolicies(str).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
            }).getPolicies();
        } catch (Exception e) {
            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        }
    }

    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @GET
    @ApiOperation(value = "Get the single namespace isolation policy assigned to the cluster.", response = NamespaceIsolationDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public NamespaceIsolationData getNamespaceIsolationPolicy(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The name of the namespace isolation policy", required = true) @PathParam("policyName") String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            NamespaceIsolationPolicies orElseThrow = namespaceIsolationPolicies().getIsolationDataPolicies(str).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
            });
            if (orElseThrow.getPolicies().containsKey(str2)) {
                return orElseThrow.getPolicies().get(str2);
            }
            log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", new Object[]{clientAppId(), str2, str});
            throw new RestException(Response.Status.NOT_FOUND, "Cannot find NamespaceIsolationPolicy " + str2 + " for cluster " + str);
        } catch (RestException e) {
            throw e;
        } catch (Exception e2) {
            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, e2});
            throw new RestException(e2);
        }
    }

    @Path("/{cluster}/namespaceIsolationPolicies/brokers")
    @GET
    @ApiOperation(value = "Get list of brokers with namespace-isolation policies attached to them.", response = BrokerNamespaceIsolationDataImpl.class, responseContainer = "set", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Namespace-isolation policies not found."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str) {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            Set<String> availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
            try {
                Optional<NamespaceIsolationPolicies> isolationDataPolicies = namespaceIsolationPolicies().getIsolationDataPolicies(str);
                if (!isolationDataPolicies.isPresent()) {
                    throw new RestException(Response.Status.NOT_FOUND, "namespace-isolation policies not found for " + str);
                }
                Map<String, NamespaceIsolationDataImpl> policies = isolationDataPolicies.get().getPolicies();
                return (List) availableBrokers.stream().map(str2 -> {
                    BrokerNamespaceIsolationData.Builder brokerName = BrokerNamespaceIsolationData.builder().brokerName(str2);
                    if (policies != null) {
                        ArrayList arrayList = new ArrayList();
                        policies.forEach((str2, namespaceIsolationData) -> {
                            NamespaceIsolationPolicyImpl namespaceIsolationPolicyImpl = new NamespaceIsolationPolicyImpl(namespaceIsolationData);
                            if (namespaceIsolationPolicyImpl.isPrimaryBroker(str2) || namespaceIsolationPolicyImpl.isSecondaryBroker(str2)) {
                                arrayList.addAll(namespaceIsolationData.getNamespaces());
                                if (namespaceIsolationPolicyImpl.isPrimaryBroker(str2)) {
                                    brokerName.primary(true);
                                }
                            }
                        });
                        brokerName.namespaceRegex(arrayList);
                    }
                    return brokerName.build();
                }).collect(Collectors.toList());
            } catch (Exception e) {
                log.error("[{}] Failed to get namespace isolation-policies {}", new Object[]{clientAppId(), str, e});
                throw new RestException(e);
            }
        } catch (Exception e2) {
            log.error("[{}] Failed to get list of brokers in cluster {}", new Object[]{clientAppId(), str, e2});
            throw new RestException(e2);
        }
    }

    @Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}")
    @GET
    @ApiOperation(value = "Get a broker with namespace-isolation policies attached to it.", response = BrokerNamespaceIsolationDataImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The broker name (<broker-hostname>:<web-service-port>)", required = true, example = "broker1:8080") @PathParam("broker") String str2) {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            Optional<NamespaceIsolationPolicies> isolationDataPolicies = namespaceIsolationPolicies().getIsolationDataPolicies(str);
            if (!isolationDataPolicies.isPresent()) {
                throw new RestException(Response.Status.NOT_FOUND, "namespace-isolation policies not found for " + str);
            }
            Map<String, NamespaceIsolationDataImpl> policies = isolationDataPolicies.get().getPolicies();
            BrokerNamespaceIsolationData.Builder brokerName = BrokerNamespaceIsolationData.builder().brokerName(str2);
            if (policies != null) {
                ArrayList arrayList = new ArrayList();
                policies.forEach((str3, namespaceIsolationData) -> {
                    NamespaceIsolationPolicyImpl namespaceIsolationPolicyImpl = new NamespaceIsolationPolicyImpl(namespaceIsolationData);
                    boolean isPrimaryBroker = namespaceIsolationPolicyImpl.isPrimaryBroker(str2);
                    if (isPrimaryBroker || namespaceIsolationPolicyImpl.isSecondaryBroker(str2)) {
                        arrayList.addAll(namespaceIsolationData.getNamespaces());
                        brokerName.primary(isPrimaryBroker);
                        brokerName.policyName(str3);
                    }
                });
                brokerName.namespaceRegex(arrayList);
            }
            return brokerName.build();
        } catch (Exception e) {
            log.error("[{}] Failed to get namespace isolation-policies {}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        }
    }

    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @POST
    @ApiOperation(value = "Set namespace isolation policy.", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 400, message = "Namespace isolation policy data is invalid."), @ApiResponse(code = 403, message = "Don't have admin permission or policies are read-only."), @ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void setNamespaceIsolationPolicy(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String str2, @ApiParam(value = "The namespace isolation policy data", required = true) NamespaceIsolationDataImpl namespaceIsolationDataImpl) {
        validateSuperUserAccess();
        validateClusterExists(str);
        validatePoliciesReadOnlyAccess();
        try {
            namespaceIsolationDataImpl.validate();
            ObjectMapperFactory.create().writeValueAsString(namespaceIsolationDataImpl);
            NamespaceIsolationPolicies orElseGet = namespaceIsolationPolicies().getIsolationDataPolicies(str).orElseGet(() -> {
                try {
                    namespaceIsolationPolicies().setIsolationDataWithCreate(str, optional -> {
                        return Collections.emptyMap();
                    });
                    return new NamespaceIsolationPolicies();
                } catch (Exception e) {
                    throw new RestException(e);
                }
            });
            orElseGet.setPolicy(str2, namespaceIsolationDataImpl);
            namespaceIsolationPolicies().setIsolationData(str, map -> {
                return orElseGet.getPolicies();
            });
            if (pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
                filterAndUnloadMatchedNameSpaces(asyncResponse, namespaceIsolationDataImpl);
            } else {
                asyncResponse.resume(Response.noContent().build());
            }
        } catch (IllegalArgumentException e) {
            log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid", new Object[]{clientAppId(), str, str2, e});
            asyncResponse.resume((Throwable) new RestException(Response.Status.BAD_REQUEST, "Invalid format of input policy data. policy: " + str2 + "; data: " + ((String) null)));
        } catch (MetadataStoreException.NotFoundException e2) {
            log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), str);
            asyncResponse.resume((Throwable) new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist"));
        } catch (Exception e3) {
            log.error("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, str2, e3});
            asyncResponse.resume((Throwable) new RestException(e3));
        }
    }

    private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse, NamespaceIsolationDataImpl namespaceIsolationDataImpl) throws Exception {
        Namespaces namespaces = pulsar().getAdminClient().namespaces();
        ArrayList newArrayList = Lists.newArrayList();
        pulsar().getAdminClient().tenants().getTenantsAsync().whenComplete((list, th) -> {
            if (th != null) {
                log.error("[{}] Failed to get tenants when setNamespaceIsolationPolicy.", clientAppId(), th);
            } else {
                AtomicInteger atomicInteger = new AtomicInteger(list.size());
                list.forEach(str -> {
                    namespaces.getNamespacesAsync(str).whenComplete((list, th) -> {
                        int decrementAndGet = atomicInteger.decrementAndGet();
                        if (th == null) {
                            AtomicInteger atomicInteger2 = new AtomicInteger(list.size());
                            list.forEach(str -> {
                                int decrementAndGet2 = atomicInteger2.decrementAndGet();
                                if (namespaceIsolationDataImpl.getNamespaces().stream().anyMatch(str -> {
                                    return str.matches(str);
                                })) {
                                    newArrayList.add(str);
                                }
                                if (decrementAndGet2 == 0 && decrementAndGet == 0) {
                                    unloadMatchedNamespacesList(asyncResponse, newArrayList, namespaces);
                                }
                            });
                        } else {
                            log.error("[{}] Failed to get namespaces for tenant {} when setNamespaceIsolationPolicy.", new Object[]{clientAppId(), str, th});
                            if (decrementAndGet == 0) {
                                unloadMatchedNamespacesList(asyncResponse, newArrayList, namespaces);
                            }
                        }
                    });
                });
            }
        });
    }

    private void unloadMatchedNamespacesList(AsyncResponse asyncResponse, List<String> list, Namespaces namespaces) {
        if (list.size() == 0) {
            asyncResponse.resume(Response.noContent().build());
        } else {
            FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) list.stream().map(str -> {
                return namespaces.unloadAsync(str);
            }).collect(Collectors.toList())).whenComplete((r7, th) -> {
                if (th != null) {
                    log.error("[{}] Failed to unload namespace while setNamespaceIsolationPolicy.", clientAppId(), th);
                    asyncResponse.resume((Throwable) new RestException(th));
                } else {
                    try {
                        pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
                    } catch (Exception e) {
                        log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
                    }
                    asyncResponse.resume(Response.noContent().build());
                }
            });
        }
    }

    @DELETE
    @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
    @ApiOperation(value = "Delete namespace isolation policy.", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission or policies are read only."), @ApiResponse(code = 404, message = "Namespace isolation policy doesn't exist."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void deleteNamespaceIsolationPolicy(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        validatePoliciesReadOnlyAccess();
        try {
            NamespaceIsolationPolicies orElseGet = namespaceIsolationPolicies().getIsolationDataPolicies(str).orElseGet(() -> {
                try {
                    namespaceIsolationPolicies().setIsolationDataWithCreate(str, optional -> {
                        return Collections.emptyMap();
                    });
                    return new NamespaceIsolationPolicies();
                } catch (Exception e) {
                    throw new RestException(e);
                }
            });
            orElseGet.deletePolicy(str2);
            namespaceIsolationPolicies().setIsolationData(str, map -> {
                return orElseGet.getPolicies();
            });
        } catch (MetadataStoreException.NotFoundException e) {
            log.warn("[{}] Failed to update brokers/{}/namespaceIsolationPolicies: Does not exist", clientAppId(), str);
            throw new RestException(Response.Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + str + " does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to update brokers/{}/namespaceIsolationPolicies/{}", new Object[]{clientAppId(), str, str2, e2});
            throw new RestException(e2);
        }
    }

    @Path("/{cluster}/failureDomains/{domainName}")
    @POST
    @ApiOperation(value = "Set the failure domain of the cluster.", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission."), @ApiResponse(code = 404, message = "Failure domain doesn't exist."), @ApiResponse(code = 409, message = "Broker already exists in another domain."), @ApiResponse(code = 412, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.")})
    public void setFailureDomain(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The failure domain name", required = true) @PathParam("domainName") String str2, @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomainImpl failureDomainImpl) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        validateBrokerExistsInOtherDomain(str, str2, failureDomainImpl);
        try {
            clusterResources().getFailureDomainResources().setFailureDomainWithCreate(str, str2, optional -> {
                return failureDomainImpl;
            });
        } catch (MetadataStoreException.NotFoundException e) {
            log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", new Object[]{clientAppId(), str, str2});
            throw new RestException(Response.Status.NOT_FOUND, "Domain " + str2 + " for cluster " + str + " does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to update clusters/{}/domainName/{}", new Object[]{clientAppId(), str, str2, e2});
            throw new RestException(e2);
        }
    }

    @Path("/{cluster}/failureDomains")
    @GET
    @ApiOperation(value = "Get the cluster failure domains.", response = FailureDomainImpl.class, responseContainer = "Map", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 500, message = "Internal server error")})
    public Map<String, FailureDomainImpl> getFailureDomains(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str) throws Exception {
        validateSuperUserAccess();
        HashMap newHashMap = Maps.newHashMap();
        try {
            ClusterResources.FailureDomainResources failureDomainResources = clusterResources().getFailureDomainResources();
            for (String str2 : failureDomainResources.listFailureDomains(str)) {
                try {
                    failureDomainResources.getFailureDomain(str, str2).ifPresent(failureDomainImpl -> {
                        newHashMap.put(str2, failureDomainImpl);
                    });
                } catch (Exception e) {
                    log.warn("Failed to get domain {}", str2, e);
                }
            }
            return newHashMap;
        } catch (MetadataStoreException.NotFoundException e2) {
            log.warn("[{}] Failure-domain is not configured for cluster {}", new Object[]{clientAppId(), str, e2});
            return Collections.emptyMap();
        } catch (Exception e3) {
            log.error("[{}] Failed to get failure-domains for cluster {}", new Object[]{clientAppId(), str, e3});
            throw new RestException(e3);
        }
    }

    @Path("/{cluster}/failureDomains/{domainName}")
    @GET
    @ApiOperation(value = "Get a domain in a cluster", response = FailureDomainImpl.class, notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "FailureDomain doesn't exist"), @ApiResponse(code = 412, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    public FailureDomainImpl getDomain(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The failure domain name", required = true) @PathParam("domainName") String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            return clusterResources().getFailureDomainResources().getFailureDomain(str, str2).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Domain " + str2 + " for cluster " + str + " does not exist");
            });
        } catch (RestException e) {
            throw e;
        } catch (Exception e2) {
            log.error("[{}] Failed to get domain {} for cluster {}", new Object[]{clientAppId(), str2, str, e2});
            throw new RestException(e2);
        }
    }

    @DELETE
    @Path("/{cluster}/failureDomains/{domainName}")
    @ApiOperation(value = "Delete the failure domain of the cluster", notes = "This operation requires Pulsar superuser privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission or policy is read only"), @ApiResponse(code = 404, message = "FailureDomain doesn't exist"), @ApiResponse(code = 412, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    public void deleteFailureDomain(@ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String str, @ApiParam(value = "The failure domain name", required = true) @PathParam("domainName") String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterExists(str);
        try {
            clusterResources().getFailureDomainResources().deleteFailureDomain(str, str2);
        } catch (MetadataStoreException.NotFoundException e) {
            log.warn("[{}] Domain {} does not exist in {}", new Object[]{clientAppId(), str2, str});
            throw new RestException(Response.Status.NOT_FOUND, "Domain-name " + str2 + " or cluster " + str + " does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to delete domain {} in cluster {}", new Object[]{clientAppId(), str2, str, e2});
            throw new RestException(e2);
        }
    }

    private void validateBrokerExistsInOtherDomain(String str, String str2, FailureDomainImpl failureDomainImpl) {
        if (failureDomainImpl == null || failureDomainImpl.brokers == null) {
            return;
        }
        try {
            for (String str3 : clusterResources().getFailureDomainResources().listFailureDomains(str)) {
                if (!str2.equals(str3)) {
                    try {
                        Optional<FailureDomainImpl> failureDomain = clusterResources().getFailureDomainResources().getFailureDomain(str, str3);
                        if (failureDomain.isPresent() && failureDomain.get().brokers != null) {
                            Stream stream = (Stream) failureDomain.get().brokers.stream().parallel();
                            Set<String> set = failureDomainImpl.brokers;
                            Objects.requireNonNull(set);
                            List list = (List) stream.filter((v1) -> {
                                return r1.contains(v1);
                            }).collect(Collectors.toList());
                            if (!list.isEmpty()) {
                                throw new RestException(Response.Status.CONFLICT, list + " already exists in " + str3);
                                break;
                            }
                        }
                    } catch (Exception e) {
                        if (e instanceof RestException) {
                            throw e;
                        }
                        log.warn("Failed to get domain {}", str3, e);
                    }
                }
            }
        } catch (MetadataStoreException.NotFoundException e2) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Domain is not configured for cluster", clientAppId(), e2);
            }
        } catch (Exception e3) {
            log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e3);
            throw new RestException(e3);
        }
    }
}
