package io.smallrye.reactive.messaging.kafka.companion.test;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.tools.StorageTool;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.jboss.logging.Logger;
import scala.Option;
import scala.jdk.CollectionConverters;
import scala.jdk.javaapi.StreamConverters;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.class */
public class EmbeddedKafkaBroker implements Closeable {
    private static final Logger LOGGER = Logger.getLogger(EmbeddedKafkaBroker.class.getName());
    private static final String COMPANION_BROKER_PREFIX = "companion-embedded-kafka";
    private KafkaRaftServer kafkaServer;
    private KafkaConfig config;
    private int nodeId = 1;
    private String host = "localhost";
    private int kafkaPort = 0;
    private int controllerPort = 0;
    private boolean deleteDirsOnClose = true;
    private String clusterId = Uuid.randomUuid().toString();
    private final List<Endpoint> advertisedListeners = new ArrayList();
    private Consumer<Properties> brokerConfigModifier;

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker$LoggingOutputStream.class */
    public static class LoggingOutputStream extends OutputStream {
        private final ByteArrayOutputStream os = new ByteArrayOutputStream(1000);
        private final Logger logger;

        public static PrintStream loggerPrintStream(Logger logger) {
            return new PrintStream(new LoggingOutputStream(logger));
        }

        LoggingOutputStream(Logger logger) {
            this.logger = logger;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            if (i != 10 && i != 13) {
                this.os.write(i);
                return;
            }
            this.os.flush();
            this.logger.info(this.os.toString());
        }
    }

    public EmbeddedKafkaBroker withNodeId(int i) {
        assertNotRunning();
        this.nodeId = i;
        return this;
    }

    public EmbeddedKafkaBroker withAdditionalProperties(Consumer<Properties> consumer) {
        assertNotRunning();
        this.brokerConfigModifier = consumer;
        return this;
    }

    public EmbeddedKafkaBroker withKafkaPort(int i) {
        assertNotRunning();
        this.kafkaPort = i;
        return this;
    }

    public EmbeddedKafkaBroker withControllerPort(int i) {
        assertNotRunning();
        this.controllerPort = i;
        return this;
    }

    public EmbeddedKafkaBroker withKafkaHost(String str) {
        assertNotRunning();
        this.host = str;
        return this;
    }

    public EmbeddedKafkaBroker withClusterId(String str) {
        assertNotRunning();
        this.clusterId = str;
        return this;
    }

    public EmbeddedKafkaBroker withDeleteLogDirsOnClose(boolean z) {
        assertNotRunning();
        this.deleteDirsOnClose = z;
        return this;
    }

    public EmbeddedKafkaBroker withAdvertisedListeners(Endpoint... endpointArr) {
        assertNotRunning();
        this.advertisedListeners.addAll(Arrays.asList(endpointArr));
        return this;
    }

    public EmbeddedKafkaBroker withAdvertisedListeners(String str) {
        assertNotRunning();
        for (String str2 : str.split(",")) {
            this.advertisedListeners.add(parseEndpoint(str2));
        }
        return this;
    }

    public synchronized EmbeddedKafkaBroker start() {
        if (isRunning()) {
            return this;
        }
        Endpoint endpoint = endpoint(SecurityProtocol.PLAINTEXT, this.host, this.kafkaPort);
        Properties createDefaultBrokerConfig = createDefaultBrokerConfig(this.nodeId, controller(this.host, this.controllerPort), endpoint, this.advertisedListeners);
        if (this.brokerConfigModifier != null) {
            this.brokerConfigModifier.accept(createDefaultBrokerConfig);
        }
        if (createDefaultBrokerConfig.get(KafkaConfig.LogDirProp()) == null) {
            createAndSetlogDir(createDefaultBrokerConfig);
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.config = formatStorageFromConfig(createDefaultBrokerConfig, this.clusterId, true);
        this.kafkaServer = createServer(this.config);
        LOGGER.infof("Kafka broker started in %d ms with advertised listeners: %s", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), getAdvertisedListeners());
        return this;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        try {
            try {
                if (isRunning()) {
                    this.kafkaServer.shutdown();
                    this.kafkaServer.awaitShutdown();
                }
                if (this.deleteDirsOnClose) {
                    try {
                        Iterator<String> it = getLogDirs().iterator();
                        while (it.hasNext()) {
                            Utils.delete(new File(it.next()));
                        }
                    } catch (Exception e) {
                        LOGGER.error("Error deleting logdirs", e);
                    }
                }
                this.kafkaServer = null;
            } catch (Exception e2) {
                LOGGER.error("Error shutting down broker", e2);
                if (this.deleteDirsOnClose) {
                    try {
                        Iterator<String> it2 = getLogDirs().iterator();
                        while (it2.hasNext()) {
                            Utils.delete(new File(it2.next()));
                        }
                    } catch (Exception e3) {
                        LOGGER.error("Error deleting logdirs", e3);
                    }
                }
                this.kafkaServer = null;
            }
        } catch (Throwable th) {
            if (this.deleteDirsOnClose) {
                try {
                    Iterator<String> it3 = getLogDirs().iterator();
                    while (it3.hasNext()) {
                        Utils.delete(new File(it3.next()));
                    }
                } catch (Exception e4) {
                    LOGGER.error("Error deleting logdirs", e4);
                }
            }
            this.kafkaServer = null;
            throw th;
        }
    }

    public boolean isRunning() {
        return this.kafkaServer != null;
    }

    private void assertNotRunning() {
        if (isRunning()) {
            throw new IllegalStateException("Configuration of the running broker is not permitted.");
        }
    }

    public KafkaConfig getKafkaConfig() {
        return this.config;
    }

    public String getAdvertisedListeners() {
        return getAdvertisedListeners(this.config);
    }

    public List<String> getLogDirs() {
        return getLogDirs(this.config);
    }

    public int getNodeId() {
        return this.nodeId;
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public static Endpoint endpoint(SecurityProtocol securityProtocol, int i) {
        return endpoint(securityProtocol.name, securityProtocol, "", i);
    }

    public static Endpoint endpoint(SecurityProtocol securityProtocol, String str, int i) {
        return endpoint(securityProtocol.name, securityProtocol, str, i);
    }

    public static Endpoint endpoint(String str, SecurityProtocol securityProtocol, int i) {
        return endpoint(str, securityProtocol, "", i);
    }

    public static Endpoint endpoint(String str, SecurityProtocol securityProtocol, String str2, int i) {
        return new Endpoint(str, securityProtocol, str2, getUnusedPort(i));
    }

    public static Endpoint parseEndpoint(SecurityProtocol securityProtocol, String str) {
        Endpoint parseEndpoint = parseEndpoint(str);
        return new Endpoint((String) parseEndpoint.listenerName().orElse(securityProtocol.name), securityProtocol, parseEndpoint.host(), parseEndpoint.port());
    }

    public static Endpoint parseEndpoint(String str) {
        String[] split = str.split(":");
        if (split.length == 2) {
            return new Endpoint((String) null, SecurityProtocol.PLAINTEXT, split[0], Integer.parseInt(split[1]));
        }
        if (split.length != 3) {
            throw new IllegalArgumentException("Cannot parse listener: " + str);
        }
        String str2 = split[0];
        return new Endpoint(str2, SecurityProtocol.forName(str2), split[1].replace("//", ""), Integer.parseInt(split[2]));
    }

    public static Properties createDefaultBrokerConfig(int i, Endpoint endpoint, Endpoint endpoint2, List<Endpoint> list) {
        Properties properties = new Properties();
        properties.put(KafkaConfig.BrokerIdProp(), Integer.toString(i));
        properties.put(KafkaConfig.ProcessRolesProp(), "broker,controller");
        properties.put(KafkaConfig.ControllerListenerNamesProp(), listenerName(endpoint));
        properties.put(KafkaConfig.QuorumVotersProp(), i + "@" + endpoint.host() + ":" + endpoint.port());
        Map map = (Map) list.stream().map(endpoint3 -> {
            return new Endpoint((String) endpoint3.listenerName().orElse(null), endpoint3.securityProtocol(), "", endpoint3.port());
        }).collect(Collectors.toMap(EmbeddedKafkaBroker::listenerName, Function.identity()));
        map.put(listenerName(endpoint), endpoint);
        map.put(listenerName(endpoint2), endpoint2);
        properties.put(KafkaConfig.ListenersProp(), (String) map.values().stream().map(EmbeddedKafkaBroker::toListenerString).distinct().collect(Collectors.joining(",")));
        Endpoint orElse = list.stream().filter(endpoint4 -> {
            return endpoint4.securityProtocol() == SecurityProtocol.PLAINTEXT;
        }).findFirst().orElse(endpoint2);
        String str = (String) list.stream().map(EmbeddedKafkaBroker::toListenerString).distinct().collect(Collectors.joining(","));
        if (!Utils.isBlank(str)) {
            properties.put(KafkaConfig.AdvertisedListenersProp(), str);
        }
        properties.put(KafkaConfig.ListenerSecurityProtocolMapProp(), (String) map.values().stream().map(EmbeddedKafkaBroker::toProtocolMap).distinct().collect(Collectors.joining(",")));
        properties.put(KafkaConfig.InterBrokerListenerNameProp(), listenerName(orElse));
        properties.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
        properties.put(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
        properties.put(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
        properties.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(false));
        properties.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "100");
        properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(true));
        properties.put(KafkaConfig.LogDeleteDelayMsProp(), "1000");
        properties.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152");
        properties.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp(), String.valueOf(Long.MAX_VALUE));
        properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
        properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), "5");
        properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
        properties.put(KafkaConfig.NumPartitionsProp(), "1");
        properties.put(KafkaConfig.DefaultReplicationFactorProp(), "1");
        return properties;
    }

    public static KafkaConfig formatStorageFromConfig(Properties properties, String str, boolean z) {
        KafkaConfig fromProps = KafkaConfig.fromProps(properties, false);
        StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), StorageTool.configToLogDirectories(fromProps), StorageTool.buildMetadataProperties(str, fromProps), z);
        return fromProps;
    }

    public static void formatStorage(List<String> list, String str, int i, boolean z) {
        MetaProperties metaProperties = new MetaProperties(str, i);
        StorageTool.formatCommand(LoggingOutputStream.loggerPrintStream(LOGGER), CollectionConverters.ListHasAsScala(list).asScala().toSeq(), metaProperties, z);
    }

    public static KafkaRaftServer createServer(KafkaConfig kafkaConfig) {
        KafkaRaftServer kafkaRaftServer = new KafkaRaftServer(kafkaConfig, Time.SYSTEM, Option.apply(COMPANION_BROKER_PREFIX));
        kafkaRaftServer.startup();
        return kafkaRaftServer;
    }

    private static String getAdvertisedListeners(KafkaConfig kafkaConfig) {
        return (String) StreamConverters.asJavaParStream(kafkaConfig.effectiveAdvertisedListeners()).map((v0) -> {
            return v0.connectionString();
        }).collect(Collectors.joining(","));
    }

    private static List<String> getLogDirs(KafkaConfig kafkaConfig) {
        return (List) StreamConverters.asJavaParStream(kafkaConfig.logDirs()).collect(Collectors.toList());
    }

    private static int getUnusedPort(int i) {
        if (i != 0) {
            return i;
        }
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            try {
                int localPort = serverSocket.getLocalPort();
                serverSocket.close();
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void createAndSetlogDir(Properties properties) {
        try {
            properties.put(KafkaConfig.LogDirProp(), Files.createTempDirectory("companion-embedded-kafka-" + UUID.randomUUID(), new FileAttribute[0]).toString());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static Endpoint controller(String str, int i) {
        return endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, str, i);
    }

    public static String toListenerString(Endpoint endpoint) {
        return String.format("%s://%s:%d", listenerName(endpoint), endpoint.host(), Integer.valueOf(endpoint.port()));
    }

    private static String toProtocolMap(Endpoint endpoint) {
        return String.format("%s:%s", listenerName(endpoint), endpoint.securityProtocol().name);
    }

    private static String listenerName(Endpoint endpoint) {
        return (String) endpoint.listenerName().orElse(endpoint.securityProtocol().name);
    }
}
