package org.apache.kafka.connect.util;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.antlr.v4.runtime.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.10.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/util/TopicAdmin.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/util/TopicAdmin.class */
public class TopicAdmin implements AutoCloseable {
    public static final int NO_PARTITIONS = -1;
    public static final short NO_REPLICATION_FACTOR = -1;
    private static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
    private static final String CLEANUP_POLICY_COMPACT = "compact";
    private static final String MIN_INSYNC_REPLICAS_CONFIG = "min.insync.replicas";
    private static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TopicAdmin.class);
    private final Map<String, Object> adminConfig;
    private final Admin admin;
    private final boolean logCreation;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.10.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/util/TopicAdmin$NewTopicBuilder.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/util/TopicAdmin$NewTopicBuilder.class */
    public static class NewTopicBuilder {
        private final String name;
        private int numPartitions = -1;
        private short replicationFactor = -1;
        private final Map<String, String> configs = new HashMap();

        /* JADX INFO: Access modifiers changed from: package-private */
        public NewTopicBuilder(String str) {
            this.name = str;
        }

        public NewTopicBuilder partitions(int i) {
            this.numPartitions = i;
            return this;
        }

        public NewTopicBuilder defaultPartitions() {
            this.numPartitions = -1;
            return this;
        }

        public NewTopicBuilder replicationFactor(short s) {
            this.replicationFactor = s;
            return this;
        }

        public NewTopicBuilder defaultReplicationFactor() {
            this.replicationFactor = (short) -1;
            return this;
        }

        public NewTopicBuilder compacted() {
            this.configs.put("cleanup.policy", "compact");
            return this;
        }

        public NewTopicBuilder minInSyncReplicas(short s) {
            this.configs.put("min.insync.replicas", Short.toString(s));
            return this;
        }

        public NewTopicBuilder uncleanLeaderElection(boolean z) {
            this.configs.put("unclean.leader.election.enable", Boolean.toString(z));
            return this;
        }

        public NewTopicBuilder config(Map<String, Object> map) {
            if (map != null) {
                for (Map.Entry<String, Object> entry : map.entrySet()) {
                    Object value = entry.getValue();
                    this.configs.put(entry.getKey(), value != null ? value.toString() : null);
                }
            } else {
                this.configs.clear();
            }
            return this;
        }

        public NewTopic build() {
            return new NewTopic(this.name, (Optional<Integer>) Optional.of(Integer.valueOf(this.numPartitions)), (Optional<Short>) Optional.of(Short.valueOf(this.replicationFactor))).configs(this.configs);
        }
    }

    public static NewTopicBuilder defineTopic(String str) {
        return new NewTopicBuilder(str);
    }

    public TopicAdmin(Map<String, Object> map) {
        this(map, Admin.create(map));
    }

    TopicAdmin(Map<String, Object> map, Admin admin) {
        this(map, admin, true);
    }

    TopicAdmin(Map<String, Object> map, Admin admin, boolean z) {
        this.admin = admin;
        this.adminConfig = map != null ? map : Collections.emptyMap();
        this.logCreation = z;
    }

    public boolean createTopic(NewTopic newTopic) {
        if (newTopic == null) {
            return false;
        }
        return createTopics(newTopic).contains(newTopic.name());
    }

    public Set<String> createTopics(NewTopic... newTopicArr) {
        HashMap hashMap = new HashMap();
        if (newTopicArr != null) {
            for (NewTopic newTopic : newTopicArr) {
                if (newTopic != null) {
                    hashMap.put(newTopic.name(), newTopic);
                }
            }
        }
        if (hashMap.isEmpty()) {
            return Collections.emptySet();
        }
        String bootstrapServers = bootstrapServers();
        String join = Utils.join(hashMap.keySet(), "', '");
        Map<String, KafkaFuture<Void>> values = this.admin.createTopics(hashMap.values(), new CreateTopicsOptions().validateOnly(false)).values();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, KafkaFuture<Void>> entry : values.entrySet()) {
            String key = entry.getKey();
            try {
                entry.getValue().get();
                if (this.logCreation) {
                    log.info("Created topic {} on brokers at {}", hashMap.get(key), bootstrapServers);
                }
                hashSet.add(key);
            } catch (InterruptedException e) {
                Thread.interrupted();
                throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + join + "'", e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (!(cause instanceof TopicExistsException)) {
                    if (cause instanceof UnsupportedVersionException) {
                        log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API. Falling back to assume topic(s) exist or will be auto-created by the broker.", join, bootstrapServers);
                        return Collections.emptySet();
                    }
                    if (cause instanceof ClusterAuthorizationException) {
                        log.debug("Not authorized to create topic(s) '{}' upon the brokers {}. Falling back to assume topic(s) exist or will be auto-created by the broker.", join, bootstrapServers);
                        return Collections.emptySet();
                    }
                    if (cause instanceof TopicAuthorizationException) {
                        log.debug("Not authorized to create topic(s) '{}' upon the brokers {}. Falling back to assume topic(s) exist or will be auto-created by the broker.", join, bootstrapServers);
                        return Collections.emptySet();
                    }
                    if (cause instanceof InvalidConfigurationException) {
                        throw new ConnectException("Unable to create topic(s) '" + join + "': " + cause.getMessage(), cause);
                    }
                    if (cause instanceof TimeoutException) {
                        throw new ConnectException("Timed out while checking for or creating topic(s) '" + join + "'. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.", cause);
                    }
                    throw new ConnectException("Error while attempting to create/find topic(s) '" + join + "'", e2);
                }
                log.debug("Found existing topic '{}' on the brokers at {}", key, bootstrapServers);
            }
        }
        return hashSet;
    }

    public Map<String, TopicDescription> describeTopics(String... strArr) {
        if (strArr == null) {
            return Collections.emptyMap();
        }
        String bootstrapServers = bootstrapServers();
        String join = String.join(", ", strArr);
        Map<String, KafkaFuture<TopicDescription>> values = this.admin.describeTopics(Arrays.asList(strArr), new DescribeTopicsOptions()).values();
        HashMap hashMap = new HashMap();
        values.forEach((str, kafkaFuture) -> {
            try {
                hashMap.put(str, kafkaFuture.get());
            } catch (InterruptedException e) {
                Thread.interrupted();
                throw new RetriableException("Interrupted while attempting to describe topics '" + join + "'", e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    log.debug("Topic '{}' does not exist on the brokers at {}", str, bootstrapServers);
                    return;
                }
                if ((cause instanceof ClusterAuthorizationException) || (cause instanceof TopicAuthorizationException)) {
                    throw new ConnectException(String.format("Not authorized to describe topic(s) '%s' on the brokers %s", join, bootstrapServers), cause);
                }
                if (cause instanceof UnsupportedVersionException) {
                    throw new ConnectException(String.format("Unable to describe topic(s) '%s' since the brokers at %s do not support the DescribeTopics API.", join, bootstrapServers), cause);
                }
                if (!(cause instanceof TimeoutException)) {
                    throw new ConnectException("Error while attempting to describe topics '" + join + "'", e2);
                }
                throw new RetriableException("Timed out while describing topics '" + join + "'", cause);
            }
        });
        return hashMap;
    }

    public boolean verifyTopicCleanupPolicyOnlyCompact(String str, String str2, String str3) {
        Set<String> set = topicCleanupPolicy(str);
        if (set.isEmpty()) {
            log.info("Unable to use admin client to verify the cleanup policy of '{}' topic is '{}', either because the broker is an older version or because the Kafka principal used for Connect internal topics does not have the required permission to describe topic configurations.", str, "compact");
            return false;
        }
        Set singleton = Collections.singleton("compact");
        if (set.equals(singleton)) {
            return true;
        }
        String join = String.join(",", singleton);
        throw new ConfigException(String.format("Topic '%s' supplied via the '%s' property is required to have '%s=%s' to guarantee consistency and durability of %s, but found the topic currently has '%s=%s'. Continuing would likely result in eventually losing %s and problems restarting this Connect cluster in the future. Change the '%s' property in the Connect worker configurations to use a topic with '%s=%s'.", str, str2, "cleanup.policy", join, str3, "cleanup.policy", String.join(",", set), str3, str2, "cleanup.policy", join));
    }

    public Set<String> topicCleanupPolicy(String str) {
        Config describeTopicConfig = describeTopicConfig(str);
        if (describeTopicConfig == null) {
            log.debug("Unable to find topic '{}' when getting cleanup policy", str);
            return Collections.emptySet();
        }
        ConfigEntry configEntry = describeTopicConfig.get("cleanup.policy");
        if (configEntry == null || configEntry.value() == null) {
            log.debug("Found no cleanup.policy for topic '{}'", str);
            return Collections.emptySet();
        }
        String value = configEntry.value();
        log.debug("Found cleanup.policy={} for topic '{}'", value, str);
        return (Set) Arrays.stream(value.split(",")).map((v0) -> {
            return v0.trim();
        }).filter(str2 -> {
            return !str2.isEmpty();
        }).map((v0) -> {
            return v0.toLowerCase();
        }).collect(Collectors.toSet());
    }

    public Config describeTopicConfig(String str) {
        return describeTopicConfigs(str).get(str);
    }

    public Map<String, Config> describeTopicConfigs(String... strArr) {
        if (strArr == null) {
            return Collections.emptyMap();
        }
        Collection collection = (Collection) Arrays.stream(strArr).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.trim();
        }).filter(str -> {
            return !str.isEmpty();
        }).collect(Collectors.toList());
        if (collection.isEmpty()) {
            return Collections.emptyMap();
        }
        String bootstrapServers = bootstrapServers();
        String str2 = (String) collection.stream().collect(Collectors.joining(", "));
        Map<ConfigResource, KafkaFuture<Config>> values = this.admin.describeConfigs((Collection) collection.stream().map(str3 -> {
            return new ConfigResource(ConfigResource.Type.TOPIC, str3);
        }).collect(Collectors.toList()), new DescribeConfigsOptions()).values();
        HashMap hashMap = new HashMap();
        values.forEach((configResource, kafkaFuture) -> {
            String name = configResource.name();
            try {
                hashMap.put(name, kafkaFuture.get());
            } catch (InterruptedException e) {
                Thread.interrupted();
                throw new RetriableException(String.format("Interrupted while attempting to describe topic configs '%s'", str2), e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    log.debug("Topic '{}' does not exist on the brokers at {}", name, bootstrapServers);
                    hashMap.put(name, null);
                } else if ((cause instanceof ClusterAuthorizationException) || (cause instanceof TopicAuthorizationException)) {
                    log.debug("Not authorized to describe topic config for topic '{}' on brokers at {}", name, bootstrapServers);
                } else if (cause instanceof UnsupportedVersionException) {
                    log.debug("API to describe topic config for topic '{}' is unsupported on brokers at {}", name, bootstrapServers);
                } else {
                    if (!(cause instanceof TimeoutException)) {
                        throw new ConnectException(String.format("Error while attempting to describe topic config for topic '%s' on brokers at %s", name, bootstrapServers), e2);
                    }
                    throw new RetriableException(String.format("Timed out while waiting to describe topic config for topic '%s' on brokers at %s", name, bootstrapServers), e2);
                }
            }
        });
        return hashMap;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.admin.close();
    }

    public void close(Duration duration) {
        this.admin.close(duration);
    }

    private String bootstrapServers() {
        Object obj = this.adminConfig.get("bootstrap.servers");
        return obj != null ? obj.toString() : IntStream.UNKNOWN_SOURCE_NAME;
    }
}
