package io.smallrye.reactive.messaging.kafka.companion.test;

import io.strimzi.test.container.StrimziKafkaContainer;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Node;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.core.ConditionFactory;
import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension.class */
public class KafkaBrokerExtension implements BeforeAllCallback, BeforeEachCallback, ParameterResolver, ExtensionContext.Store.CloseableResource {
    public static final Logger LOGGER = Logger.getLogger(KafkaBrokerExtension.class.getName());
    public static final String KAFKA_VERSION = "3.1.0";
    protected StrimziKafkaContainer kafka;

    @Target({ElementType.FIELD, ElementType.PARAMETER})
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/test/KafkaBrokerExtension$KafkaBootstrapServers.class */
    public @interface KafkaBootstrapServers {
    }

    public void beforeAll(ExtensionContext extensionContext) {
        ExtensionContext.Store store = extensionContext.getRoot().getStore(ExtensionContext.Namespace.GLOBAL);
        if (((KafkaBrokerExtension) store.get(KafkaBrokerExtension.class)) == null) {
            LOGGER.info("Starting Kafka broker");
            startKafkaBroker();
            store.put(KafkaBrokerExtension.class, this);
        }
    }

    public void close() {
        LOGGER.info("Stopping Kafka broker");
        stopKafkaBroker();
    }

    public static StrimziKafkaContainer createKafkaContainer() {
        return configureKafkaContainer(new StrimziKafkaContainer());
    }

    public static <T extends StrimziKafkaContainer> T configureKafkaContainer(T t) {
        t.withKafkaVersion(System.getProperty("kafka-container-version", KAFKA_VERSION));
        HashMap hashMap = new HashMap();
        hashMap.put("log.cleaner.enable", "false");
        t.withKafkaConfigurationMap(hashMap);
        return t;
    }

    public void startKafkaBroker() {
        this.kafka = createKafkaContainer();
        this.kafka.start();
        LOGGER.info("Kafka broker started: " + this.kafka.getBootstrapServers() + " (" + this.kafka.getMappedPort(ProxiedStrimziKafkaContainer.KAFKA_PORT) + ")");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.kafka.isRunning());
        });
    }

    public static StrimziKafkaContainer restart(StrimziKafkaContainer strimziKafkaContainer, int i) {
        int intValue = strimziKafkaContainer.getMappedPort(ProxiedStrimziKafkaContainer.KAFKA_PORT).intValue();
        try {
            strimziKafkaContainer.close();
        } catch (Exception e) {
        }
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!strimziKafkaContainer.isRunning());
        });
        sleep(Duration.ofSeconds(i));
        return startKafkaBroker(intValue);
    }

    public static StrimziKafkaContainer startKafkaBroker(int i) {
        StrimziKafkaContainer withPort = createKafkaContainer().withPort(i);
        withPort.start();
        ConditionFactory await = Awaitility.await();
        Objects.requireNonNull(withPort);
        await.until(withPort::isRunning);
        return withPort;
    }

    private static void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stopKafkaBroker() {
        if (this.kafka != null) {
            try {
                this.kafka.stop();
            } catch (Exception e) {
            }
            Awaitility.await().until(() -> {
                return Boolean.valueOf(!this.kafka.isRunning());
            });
        }
    }

    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        return parameterContext.isAnnotated(KafkaBootstrapServers.class) && parameterContext.getParameter().getType().equals(String.class);
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        if (!parameterContext.isAnnotated(KafkaBootstrapServers.class)) {
            return null;
        }
        KafkaBrokerExtension kafkaBrokerExtension = (KafkaBrokerExtension) extensionContext.getRoot().getStore(ExtensionContext.Namespace.GLOBAL).get(KafkaBrokerExtension.class);
        if (kafkaBrokerExtension.kafka != null) {
            return kafkaBrokerExtension.kafka.getBootstrapServers();
        }
        return null;
    }

    public void beforeEach(ExtensionContext extensionContext) throws Exception {
        LOGGER.infof("Running test %s (%s#%s)", extensionContext.getDisplayName(), extensionContext.getTestClass().map((v0) -> {
            return v0.getName();
        }).orElse(""), extensionContext.getTestMethod().map((v0) -> {
            return v0.getName();
        }).orElse(""));
        if (this.kafka != null) {
            for (int i = 0; i < 3; i++) {
                try {
                    isBrokerHealthy();
                    return;
                } catch (ConditionTimeoutException e) {
                    LOGGER.warn("The Kafka broker is not healthy, restarting it");
                    restart(this.kafka, 0);
                }
            }
            throw new IllegalStateException("The Kafka broker is not unhealthy, despite 3 restarts");
        }
    }

    private void isBrokerHealthy() {
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.kafka.isRunning());
        });
        Awaitility.await().catchUncaughtExceptions().until(() -> {
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", this.kafka.getBootstrapServers());
            hashMap.put("client.id", "broker-healthy-admin");
            AdminClient create = AdminClient.create(hashMap);
            try {
                Collection collection = (Collection) create.describeCluster().nodes().get();
                Boolean valueOf = Boolean.valueOf(collection.size() == 1 && ((Node) collection.iterator().next()).id() >= 0);
                if (create != null) {
                    create.close();
                }
                return valueOf;
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }
}
