package com.linkedin.venice.pubsub.adapter.kafka.admin;

import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceRetriableException;
import com.linkedin.venice.kafka.TopicDoesNotExistException;
import com.linkedin.venice.pubsub.PubSubTopicConfiguration;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Utils;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapter.class */
public class ApacheKafkaAdminAdapter implements PubSubAdminAdapter {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ApacheKafkaAdminAdapter.class);
    private AdminClient kafkaAdminClient;
    private Long maxRetryInMs;
    private PubSubTopicRepository pubSubTopicRepository;

    public ApacheKafkaAdminAdapter(Properties properties, PubSubTopicRepository pubSubTopicRepository) {
        if (properties == null) {
            throw new IllegalArgumentException("properties cannot be null!");
        }
        this.kafkaAdminClient = AdminClient.create(properties);
        this.maxRetryInMs = Long.valueOf(((Long) properties.get(ConfigKeys.KAFKA_ADMIN_GET_TOPIC_CONFIG_MAX_RETRY_TIME_SEC)).longValue() * 1000);
        this.pubSubTopicRepository = pubSubTopicRepository;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public void createTopic(PubSubTopic pubSubTopic, int i, int i2, PubSubTopicConfiguration pubSubTopicConfiguration) {
        if (i2 > 32767) {
            throw new IllegalArgumentException("Replication factor cannot be > 32767");
        }
        Properties unmarshallProperties = unmarshallProperties(pubSubTopicConfiguration);
        HashMap hashMap = new HashMap();
        unmarshallProperties.stringPropertyNames().forEach(str -> {
            hashMap.put(str, unmarshallProperties.getProperty(str));
        });
        try {
            getKafkaAdminClient().createTopics(Collections.singleton(new NewTopic(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()), i, (short) i2).configs(hashMap))).all().get();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof InvalidReplicationFactorException) {
                throw ((InvalidReplicationFactorException) e.getCause());
            }
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new VeniceException("Failed to create topic: " + pubSubTopic + " due to ExecutionException", e);
            }
            throw ((TopicExistsException) e.getCause());
        } catch (Exception e2) {
            throw new VeniceException("Failed to create topic: " + pubSubTopic + "due to Exception", e2);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public Set<PubSubTopic> listAllTopics() {
        try {
            return (Set) getKafkaAdminClient().listTopics().names().get().stream().map(str -> {
                return this.pubSubTopicRepository.getTopic(str);
            }).collect(Collectors.toSet());
        } catch (Exception e) {
            throw new VeniceException("Failed to list all topics due to exception: ", e);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public KafkaFuture<Void> deleteTopic(PubSubTopic pubSubTopic) {
        return getKafkaAdminClient().deleteTopics(Collections.singleton(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()))).values().get(pubSubTopic.getName());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public void setTopicConfig(PubSubTopic pubSubTopic, PubSubTopicConfiguration pubSubTopicConfiguration) throws TopicDoesNotExistException {
        Properties unmarshallProperties = unmarshallProperties(pubSubTopicConfiguration);
        ArrayList arrayList = new ArrayList(unmarshallProperties.stringPropertyNames().size());
        unmarshallProperties.stringPropertyNames().forEach(str -> {
            arrayList.add(new ConfigEntry(str, unmarshallProperties.getProperty(str)));
        });
        try {
            getKafkaAdminClient().alterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName())), new Config(arrayList))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            if (!containsTopicWithExpectationAndRetry(pubSubTopic, 3, true)) {
                throw new TopicDoesNotExistException("Topic " + pubSubTopic + " does not exist");
            }
            throw new VeniceException(e);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public Map<PubSubTopic, Long> getAllTopicRetentions() {
        return getSomethingForAllTopics(config -> {
            return (Long) Optional.ofNullable(config.get(TopicConfig.RETENTION_MS_CONFIG)).map(configEntry -> {
                return Long.valueOf(Long.parseLong(configEntry.value()));
            }).orElse(Long.MIN_VALUE);
        }, "retention");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public PubSubTopicConfiguration getTopicConfig(PubSubTopic pubSubTopic) throws TopicDoesNotExistException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()));
        try {
            return marshallProperties(getKafkaAdminClient().describeConfigs(Collections.singleton(configResource)).all().get().get(configResource));
        } catch (Exception e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                throw new TopicDoesNotExistException("Topic: " + pubSubTopic + " doesn't exist");
            }
            throw new VeniceException("Failed to get topic configs for: " + pubSubTopic, e);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic pubSubTopic) {
        long j = 0;
        long j2 = 100;
        VeniceException veniceException = null;
        while (j < this.maxRetryInMs.longValue()) {
            try {
                return getTopicConfig(pubSubTopic);
            } catch (VeniceException e) {
                veniceException = e;
                Utils.sleep(j2);
                j += j2;
                j2 = Math.min(5000L, j2 * 2);
            }
        }
        throw new VeniceException("After retrying for " + j + "ms, failed to get topic configs for: " + pubSubTopic, veniceException);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public boolean containsTopic(PubSubTopic pubSubTopic) {
        try {
            if (getKafkaAdminClient().describeTopics(Collections.singleton(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()))).values().get(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName())).get() != null) {
                return true;
            }
            LOGGER.warn("Unexpected: kafkaAdminClient.describeTopics returned null (rather than throwing an InvalidTopicException). Will carry on assuming the topic doesn't exist.");
            return false;
        } catch (ExecutionException e) {
            if ((e.getCause() instanceof UnknownTopicOrPartitionException) || (e.getCause() instanceof InvalidTopicException)) {
                return false;
            }
            throw new VeniceException("Failed to check if '" + pubSubTopic + " exists!", e);
        } catch (Exception e2) {
            throw new VeniceException("Failed to check if '" + pubSubTopic + " exists!", e2);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public boolean containsTopicWithPartitionCheck(PubSubTopicPartition pubSubTopicPartition) {
        PubSubTopic pubSubTopic = pubSubTopicPartition.getPubSubTopic();
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        try {
            TopicDescription topicDescription = getKafkaAdminClient().describeTopics(Collections.singleton(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()))).values().get(ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName())).get();
            if (topicDescription == null) {
                LOGGER.warn("Unexpected: kafkaAdminClient.describeTopics returned null (rather than throwing an InvalidTopicException). Will carry on assuming the topic doesn't exist.");
                return false;
            }
            if (topicDescription.partitions().size() > partitionNumber) {
                return true;
            }
            LOGGER.warn("{} is trying to check partitionID {}, but total partitions count is {}. Will carry on assuming the topic doesn't exist.", pubSubTopic, Integer.valueOf(partitionNumber), Integer.valueOf(topicDescription.partitions().size()));
            return false;
        } catch (Exception e) {
            if ((e.getCause() instanceof UnknownTopicOrPartitionException) || (e.getCause() instanceof InvalidTopicException)) {
                return false;
            }
            throw new VeniceException("Failed to check if '" + pubSubTopic + " exists!", e);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public List<Class<? extends Throwable>> getRetriableExceptions() {
        return Collections.unmodifiableList(Arrays.asList(VeniceRetriableException.class, TimeoutException.class));
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public Map<PubSubTopic, PubSubTopicConfiguration> getSomeTopicConfigs(Set<PubSubTopic> set) {
        return getSomethingForSomeTopics(set, config -> {
            return marshallProperties(config);
        }, "configs");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public boolean isTopicDeletionUnderway() {
        return false;
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubAdminAdapter
    public String getClassName() {
        return ApacheKafkaAdminAdapter.class.getName();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.kafkaAdminClient != null) {
            try {
                this.kafkaAdminClient.close(Duration.ofSeconds(60L));
            } catch (Exception e) {
                LOGGER.warn("Exception (suppressed) during kafkaAdminClient.close()", (Throwable) e);
            }
        }
    }

    private PubSubTopicConfiguration marshallProperties(Config config) {
        Properties properties = new Properties();
        config.entries().forEach(configEntry -> {
            properties.setProperty(configEntry.name(), configEntry.value());
        });
        return new PubSubTopicConfiguration(Optional.ofNullable(properties.getProperty(TopicConfig.RETENTION_MS_CONFIG)).map(Long::parseLong), Boolean.valueOf(TopicConfig.CLEANUP_POLICY_COMPACT.equals(properties.getProperty(TopicConfig.CLEANUP_POLICY_CONFIG))).booleanValue(), Optional.ofNullable(properties.getProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)).map(Integer::parseInt), Long.valueOf(properties.containsKey(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG) ? Long.parseLong((String) properties.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)) : 0L));
    }

    private Properties unmarshallProperties(PubSubTopicConfiguration pubSubTopicConfiguration) {
        Properties properties = new Properties();
        Optional<Long> retentionInMs = pubSubTopicConfiguration.retentionInMs();
        if (retentionInMs.isPresent()) {
            properties.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(retentionInMs.get().longValue()));
        }
        if (pubSubTopicConfiguration.isLogCompacted()) {
            properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
            properties.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, Long.toString(pubSubTopicConfiguration.minLogCompactionLagMs().longValue()));
        } else {
            properties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
        }
        pubSubTopicConfiguration.minInSyncReplicas().ifPresent(num -> {
            properties.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Integer.toString(num.intValue()));
        });
        properties.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime");
        return properties;
    }

    private AdminClient getKafkaAdminClient() {
        if (this.kafkaAdminClient == null) {
            throw new IllegalStateException("initialize(properties) has not been called!");
        }
        return this.kafkaAdminClient;
    }

    private <T> Map<PubSubTopic, T> getSomethingForAllTopics(Function<Config, T> function, String str) {
        try {
            return getSomethingForSomeTopics((Set) getKafkaAdminClient().listTopics().names().get().stream().map(str2 -> {
                return this.pubSubTopicRepository.getTopic(str2);
            }).collect(Collectors.toSet()), function, str);
        } catch (Exception e) {
            throw new VeniceException("Failed to get " + str + " for all topics", e);
        }
    }

    private <T> Map<PubSubTopic, T> getSomethingForSomeTopics(Set<PubSubTopic> set, Function<Config, T> function, String str) {
        HashMap hashMap = new HashMap();
        try {
            getKafkaAdminClient().describeConfigs((Collection) set.stream().map(pubSubTopic -> {
                return new ConfigResource(ConfigResource.Type.TOPIC, ApacheKafkaProducerAdapter.mapToPulsar(pubSubTopic.getName()));
            }).collect(Collectors.toCollection(ArrayList::new))).all().get().forEach((configResource, config) -> {
                hashMap.put(this.pubSubTopicRepository.getTopic(ApacheKafkaProducerAdapter.mapToPulsar(configResource.name())), function.apply(config));
            });
            return hashMap;
        } catch (Exception e) {
            int size = set.size();
            throw new VeniceException("Failed to get " + str + " for " + (size + " topic" + (size > 1 ? "s" : "")), e);
        }
    }
}
