package com.linkedin.venice.controller;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService;
import com.linkedin.venice.controller.lingeringjob.DefaultLingeringStoreVersionChecker;
import com.linkedin.venice.controller.lingeringjob.HeartbeatBasedCheckerStats;
import com.linkedin.venice.controller.lingeringjob.HeartbeatBasedLingeringStoreVersionChecker;
import com.linkedin.venice.controller.lingeringjob.LingeringStoreVersionChecker;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubClientsFactory;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import io.tehuti.metrics.MetricsRepository;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/VeniceControllerService.class */
public class VeniceControllerService extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger(VeniceControllerService.class);
    private final Admin admin;
    private final VeniceControllerMultiClusterConfig multiClusterConfigs;
    private final Map<String, AdminConsumerService> consumerServicesByClusters;

    public VeniceControllerService(VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig, MetricsRepository metricsRepository, boolean z, Optional<SSLConfig> optional, Optional<DynamicAccessController> optional2, Optional<AuthorizerService> optional3, D2Client d2Client, Optional<ClientConfig> optional4, Optional<ICProvider> optional5, Optional<SupersetSchemaGenerator> optional6, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory) {
        this.multiClusterConfigs = veniceControllerMultiClusterConfig;
        VeniceHelixAdmin veniceHelixAdmin = new VeniceHelixAdmin(veniceControllerMultiClusterConfig, metricsRepository, z, d2Client, optional, optional2, optional5, pubSubTopicRepository, pubSubClientsFactory);
        if (veniceControllerMultiClusterConfig.isParent()) {
            this.admin = new VeniceParentHelixAdmin(veniceHelixAdmin, veniceControllerMultiClusterConfig, z, optional, optional2, optional3, createLingeringStoreVersionChecker(veniceControllerMultiClusterConfig, metricsRepository), WriteComputeSchemaConverter.getInstance(), optional6, pubSubTopicRepository);
            LOGGER.info("Controller works as a parent controller.");
        } else {
            this.admin = veniceHelixAdmin;
            LOGGER.info("Controller works as a child controller.");
        }
        Optional empty = Optional.empty();
        try {
            empty = optional4.isPresent() ? Optional.of(ClientFactory.getSchemaReader(ClientConfig.cloneConfig(optional4.get()).setStoreName(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName()), (ICProvider) null)) : Optional.empty();
        } catch (Exception e) {
            LOGGER.error("Exception in initializing KME schema reader", e);
        }
        this.consumerServicesByClusters = new HashMap(veniceControllerMultiClusterConfig.getClusters().size());
        OptimizedKafkaValueSerializer optimizedKafkaValueSerializer = new OptimizedKafkaValueSerializer();
        Objects.requireNonNull(optimizedKafkaValueSerializer);
        empty.ifPresent(optimizedKafkaValueSerializer::setSchemaReader);
        KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer = new KafkaPubSubMessageDeserializer(optimizedKafkaValueSerializer, new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new));
        for (String str : veniceControllerMultiClusterConfig.getClusters()) {
            AdminConsumerService adminConsumerService = new AdminConsumerService(veniceHelixAdmin, veniceControllerMultiClusterConfig.getControllerConfig(str), metricsRepository, pubSubTopicRepository, kafkaPubSubMessageDeserializer);
            this.consumerServicesByClusters.put(str, adminConsumerService);
            this.admin.setAdminConsumerService(str, adminConsumerService);
        }
    }

    private LingeringStoreVersionChecker createLingeringStoreVersionChecker(VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig, MetricsRepository metricsRepository) {
        if (veniceControllerMultiClusterConfig.getBatchJobHeartbeatEnabled()) {
            LOGGER.info("Batch job heartbeat is enabled. Hence use the heartbeat-based batch job liveness checker.");
            return new HeartbeatBasedLingeringStoreVersionChecker(veniceControllerMultiClusterConfig.getBatchJobHeartbeatTimeout(), veniceControllerMultiClusterConfig.getBatchJobHeartbeatInitialBufferTime(), new DefaultLingeringStoreVersionChecker(), new HeartbeatBasedCheckerStats(metricsRepository));
        }
        LOGGER.info("Batch job heartbeat is NOT enabled. Hence use the default batch job liveness checker.");
        return new DefaultLingeringStoreVersionChecker();
    }

    public boolean startInner() {
        for (String str : this.multiClusterConfigs.getClusters()) {
            this.admin.initStorageCluster(str);
            this.consumerServicesByClusters.get(str).start();
            LOGGER.info("started cluster: {}", str);
        }
        LOGGER.info("Started Venice controller.");
        return true;
    }

    public void stopInner() {
        for (String str : this.multiClusterConfigs.getClusters()) {
            this.admin.stop(str);
            try {
                this.consumerServicesByClusters.get(str).stop();
            } catch (Exception e) {
                LOGGER.error("Got exception when stop AdminConsumerService", e);
            }
            LOGGER.info("Stopped cluster: {}", str);
        }
        this.admin.stopVeniceController();
        this.admin.close();
        LOGGER.info("Stopped Venice controller.");
    }

    public Admin getVeniceHelixAdmin() {
        return this.admin;
    }

    public AdminConsumerService getAdminConsumerServiceByCluster(String str) {
        return this.consumerServicesByClusters.get(str);
    }
}
