package com.linkedin.venice.integration.utils;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubClientsFactory;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.TestMockTime;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import scala.Some$;
import scala.collection.JavaConverters;

/* loaded from: input_file:com/linkedin/venice/integration/utils/KafkaBrokerFactory.class */
class KafkaBrokerFactory implements PubSubBrokerFactory {
    public static final String SERVICE_NAME = "Kafka";
    private static final int OFFSET_TOPIC_PARTITIONS = 1;
    private static final short OFFSET_TOPIC_REPLICATION_FACTOR = 1;
    private static final boolean LOG_CLEANER_ENABLE = false;
    private static final Logger LOGGER = LogManager.getLogger(ServiceFactory.class);
    private static final PubSubClientsFactory KAFKA_CLIENTS_FACTORY = new PubSubClientsFactory(new ApacheKafkaProducerAdapterFactory());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/integration/utils/KafkaBrokerFactory$KafkaBrokerWrapper.class */
    public static class KafkaBrokerWrapper extends PubSubBrokerWrapper {
        private static final Logger LOGGER = LogManager.getLogger(KafkaBrokerWrapper.class);
        private final int sslPort;
        private KafkaServer kafkaServer;
        private final KafkaConfig kafkaConfig;
        private final TestMockTime mockTime;
        private final ZkServerWrapper zkServerWrapper;
        private final boolean shouldCloseZkServer;
        private final PubSubClientsFactory pubSubClientsFactory;

        private KafkaBrokerWrapper(KafkaConfig kafkaConfig, KafkaServer kafkaServer, File file, ZkServerWrapper zkServerWrapper, boolean z, TestMockTime testMockTime, int i) {
            super(KafkaBrokerFactory.SERVICE_NAME, file);
            this.kafkaConfig = kafkaConfig;
            this.kafkaServer = kafkaServer;
            this.mockTime = testMockTime;
            this.sslPort = i;
            this.zkServerWrapper = zkServerWrapper;
            this.shouldCloseZkServer = z;
            this.pubSubClientsFactory = new PubSubClientsFactory(new ApacheKafkaProducerAdapterFactory());
        }

        @Override // com.linkedin.venice.integration.utils.ProcessWrapper
        public String getHost() {
            return this.kafkaServer.config().hostName();
        }

        @Override // com.linkedin.venice.integration.utils.ProcessWrapper
        public int getPort() {
            return this.kafkaServer.config().port().intValue();
        }

        @Override // com.linkedin.venice.integration.utils.PubSubBrokerWrapper
        public int getSslPort() {
            return this.sslPort;
        }

        @Override // com.linkedin.venice.integration.utils.ProcessWrapper
        public String getAddress() {
            return getHost() + ":" + getPort();
        }

        @Override // com.linkedin.venice.integration.utils.ProcessWrapper
        protected void internalStart() {
            Properties properties = System.getProperties();
            if (properties.contains("kafka_mx4jenable")) {
                throw new VeniceException("kafka_mx4jenable should not be set! kafka_mx4jenable = " + properties.getProperty("kafka_mx4jenable"));
            }
            this.kafkaServer.startup();
            LOGGER.info("Kafka broker listening on port: {} and ssl port: {}", Integer.valueOf(getPort()), Integer.valueOf(getSslPort()));
        }

        @Override // com.linkedin.venice.integration.utils.ProcessWrapper
        protected void internalStop() {
            this.kafkaServer.shutdown();
            this.kafkaServer.awaitShutdown();
            if (this.shouldCloseZkServer) {
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.zkServerWrapper});
            }
        }

        @Override // com.linkedin.venice.integration.utils.ProcessWrapper
        protected void newProcess() {
            this.kafkaServer = instantiateNewKafkaServer(this.kafkaConfig, this.mockTime);
        }

        @Override // com.linkedin.venice.integration.utils.PubSubBrokerWrapper
        public String toString() {
            return "KafkaBrokerWrapper{address: '" + getAddress() + "', sslAddress: '" + getSSLAddress() + "'}";
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static KafkaServer instantiateNewKafkaServer(KafkaConfig kafkaConfig, TestMockTime testMockTime) {
            return new KafkaServer(kafkaConfig, testMockTime == null ? SystemTime.SYSTEM : new KafkaMockTimeWrapper(testMockTime), Some$.MODULE$.apply("kafka-broker-port-" + kafkaConfig.getInt(KafkaConfig.PortProp()).intValue()), JavaConverters.asScalaBuffer(new ArrayList()));
        }

        @Override // com.linkedin.venice.integration.utils.PubSubBrokerWrapper
        public PubSubClientsFactory getPubSubClientsFactory() {
            return this.pubSubClientsFactory;
        }
    }

    KafkaBrokerFactory() {
    }

    @Override // com.linkedin.venice.integration.utils.PubSubBrokerFactory
    public StatefulServiceProvider<PubSubBrokerWrapper> generateService(PubSubBrokerConfigs pubSubBrokerConfigs) {
        return (str, file) -> {
            int freePort = Utils.getFreePort();
            int freePort2 = Utils.getFreePort();
            HashMap hashMap = new HashMap();
            boolean z = false;
            ZkServerWrapper zkWrapper = pubSubBrokerConfigs.getZkWrapper();
            if (zkWrapper == null) {
                LOGGER.info("Starting Zookeeper for Kafka...");
                z = true;
                zkWrapper = ServiceFactory.getZkServer();
            }
            hashMap.put(KafkaConfig.ZkConnectProp(), zkWrapper.getAddress());
            hashMap.put(KafkaConfig.PortProp(), Integer.valueOf(freePort));
            hashMap.put(KafkaConfig.HostNameProp(), "localhost");
            hashMap.put(KafkaConfig.LogDirProp(), file.getAbsolutePath());
            hashMap.put(KafkaConfig.AutoCreateTopicsEnableProp(), false);
            hashMap.put(KafkaConfig.DeleteTopicEnableProp(), true);
            hashMap.put(KafkaConfig.LogMessageTimestampTypeProp(), "LogAppendTime");
            hashMap.put(KafkaConfig.LogMessageFormatVersionProp(), "2.4");
            hashMap.put(KafkaConfig.OffsetsTopicPartitionsProp(), 1);
            hashMap.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);
            hashMap.put(KafkaConfig.LogCleanerEnableProp(), false);
            KafkaSSLUtils.getLocalKafkaBrokerSSlConfig("localhost", freePort, freePort2).entrySet().stream().forEach(entry -> {
                hashMap.put((String) entry.getKey(), entry.getValue());
            });
            KafkaConfig kafkaConfig = new KafkaConfig(hashMap, true);
            KafkaServer instantiateNewKafkaServer = KafkaBrokerWrapper.instantiateNewKafkaServer(kafkaConfig, pubSubBrokerConfigs.getMockTime());
            LOGGER.info("KafkaBroker URL: {}:{}", instantiateNewKafkaServer.config().hostName(), instantiateNewKafkaServer.config().port());
            return new KafkaBrokerWrapper(kafkaConfig, instantiateNewKafkaServer, file, zkWrapper, z, pubSubBrokerConfigs.getMockTime(), freePort2);
        };
    }

    @Override // com.linkedin.venice.integration.utils.PubSubBrokerFactory
    public String getServiceName() {
        return SERVICE_NAME;
    }

    @Override // com.linkedin.venice.integration.utils.PubSubBrokerFactory
    public PubSubClientsFactory getClientsFactory() {
        return KAFKA_CLIENTS_FACTORY;
    }
}
