package org.apache.kafka.streams.integration.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.TopicPartition;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster extends ExternalResource {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final int DEFAULT_BROKER_PORT = 0;
    public static final int TOPIC_CREATION_TIMEOUT = 30000;
    private EmbeddedZookeeper zookeeper;
    private final KafkaEmbedded[] brokers;
    private final Properties brokerConfig;
    public final MockTime time;

    public EmbeddedKafkaCluster(int i) {
        this(i, new Properties());
    }

    public EmbeddedKafkaCluster(int i, Properties properties) {
        this.zookeeper = null;
        this.time = new MockTime();
        this.brokers = new KafkaEmbedded[i];
        this.brokerConfig = properties;
    }

    public void start() throws IOException, InterruptedException {
        log.debug("Initiating embedded Kafka cluster startup");
        log.debug("Starting a ZooKeeper instance");
        this.zookeeper = new EmbeddedZookeeper();
        log.debug("ZooKeeper instance is running at {}", zKConnectString());
        this.brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
        this.brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), Integer.valueOf(DEFAULT_BROKER_PORT));
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2097152L);
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), Integer.valueOf(DEFAULT_BROKER_PORT));
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
        for (int i = DEFAULT_BROKER_PORT; i < this.brokers.length; i++) {
            this.brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.valueOf(i));
            log.debug("Starting a Kafka instance on port {} ...", this.brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
            this.brokers[i] = new KafkaEmbedded(this.brokerConfig, this.time);
            log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", this.brokers[i].brokerList(), this.brokers[i].zookeeperConnect());
        }
    }

    private void putIfAbsent(Properties properties, String str, Object obj) {
        if (properties.containsKey(str)) {
            return;
        }
        this.brokerConfig.put(str, obj);
    }

    public void stop() {
        KafkaEmbedded[] kafkaEmbeddedArr = this.brokers;
        int length = kafkaEmbeddedArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            kafkaEmbeddedArr[i].stop();
        }
        this.zookeeper.shutdown();
    }

    public String zKConnectString() {
        return "localhost:" + this.zookeeper.port();
    }

    public String bootstrapServers() {
        return this.brokers[DEFAULT_BROKER_PORT].brokerList();
    }

    protected void before() throws Throwable {
        start();
    }

    protected void after() {
        stop();
    }

    public void createTopic(String str) throws InterruptedException {
        createTopic(str, 1, 1, new Properties());
    }

    public void createTopic(String str, int i, int i2) throws InterruptedException {
        createTopic(str, i, i2, new Properties());
    }

    public void createTopic(String str, int i, int i2, Properties properties) throws InterruptedException {
        this.brokers[DEFAULT_BROKER_PORT].createTopic(str, i, i2, properties);
        ArrayList arrayList = new ArrayList();
        for (int i3 = DEFAULT_BROKER_PORT; i3 < i; i3++) {
            arrayList.add(new TopicPartition(str, i3));
        }
        IntegrationTestUtils.waitForTopicPartitions(brokers(), arrayList, IntegrationTestUtils.DEFAULT_TIMEOUT);
    }

    public void deleteTopic(String str) {
        this.brokers[DEFAULT_BROKER_PORT].deleteTopic(str);
    }

    public List<KafkaServer> brokers() {
        ArrayList arrayList = new ArrayList();
        KafkaEmbedded[] kafkaEmbeddedArr = this.brokers;
        int length = kafkaEmbeddedArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            arrayList.add(kafkaEmbeddedArr[i].kafkaServer());
        }
        return arrayList;
    }
}
