package com.linkedin.venice.controller.kafka.consumer;

import com.linkedin.venice.controller.AdminTopicMetadataAccessor;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.ZkAdminTopicMetadataAccessor;
import com.linkedin.venice.controller.stats.AdminConsumptionStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ThreadFactory;

/* loaded from: input_file:com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.class */
public class AdminConsumerService extends AbstractVeniceService {
    private static final long WAITING_TIME_FOR_STOP_IN_MS = 5000;
    private final VeniceControllerConfig config;
    private final VeniceHelixAdmin admin;
    private final ZkAdminTopicMetadataAccessor adminTopicMetadataAccessor;
    private final PubSubConsumerAdapterFactory consumerFactory;
    private final MetricsRepository metricsRepository;
    private final boolean remoteConsumptionEnabled;
    private final Optional<String> remoteKafkaServerUrl;
    private AdminConsumptionTask consumerTask;
    private final ThreadFactory threadFactory = new DaemonThreadFactory("AdminTopicConsumer");
    private Thread consumerThread;
    private final PubSubTopicRepository pubSubTopicRepository;
    private final String localKafkaServerUrl;
    private final KafkaPubSubMessageDeserializer pubSubMessageDeserializer;

    public AdminConsumerService(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerConfig veniceControllerConfig, MetricsRepository metricsRepository, PubSubTopicRepository pubSubTopicRepository, KafkaPubSubMessageDeserializer kafkaPubSubMessageDeserializer) {
        this.config = veniceControllerConfig;
        this.admin = veniceHelixAdmin;
        this.adminTopicMetadataAccessor = new ZkAdminTopicMetadataAccessor(veniceHelixAdmin.getZkClient(), veniceHelixAdmin.getAdapterSerializer());
        this.metricsRepository = metricsRepository;
        this.remoteConsumptionEnabled = veniceControllerConfig.isAdminTopicRemoteConsumptionEnabled();
        this.pubSubTopicRepository = pubSubTopicRepository;
        this.pubSubMessageDeserializer = kafkaPubSubMessageDeserializer;
        if (this.remoteConsumptionEnabled) {
            this.remoteKafkaServerUrl = Optional.of(veniceControllerConfig.getChildDataCenterKafkaUrlMap().get(veniceControllerConfig.getAdminTopicSourceRegion()));
        } else {
            this.remoteKafkaServerUrl = Optional.empty();
        }
        this.localKafkaServerUrl = veniceHelixAdmin.getKafkaBootstrapServers(veniceHelixAdmin.isSslToKafka());
        this.consumerFactory = veniceHelixAdmin.getVeniceConsumerFactory();
    }

    public boolean startInner() throws Exception {
        this.consumerTask = getAdminConsumptionTaskForCluster(this.config.getClusterName());
        this.consumerThread = this.threadFactory.newThread(this.consumerTask);
        this.consumerThread.start();
        return true;
    }

    public void stopInner() throws Exception {
        if (this.consumerTask != null) {
            this.consumerTask.close();
        }
        if (this.consumerThread != null) {
            this.consumerThread.join(WAITING_TIME_FOR_STOP_IN_MS);
            if (this.consumerThread.isAlive()) {
                this.consumerThread.interrupt();
            }
        }
    }

    private AdminConsumptionTask getAdminConsumptionTaskForCluster(String str) {
        return new AdminConsumptionTask(str, createKafkaConsumer(str), this.remoteConsumptionEnabled, this.remoteKafkaServerUrl, this.admin, this.adminTopicMetadataAccessor, this.admin.getExecutionIdAccessor(), this.config.isParent(), new AdminConsumptionStats(this.metricsRepository, str + "-admin_consumption_task"), this.config.getAdminTopicReplicationFactor(), this.config.getMinInSyncReplicasAdminTopics(), this.config.getAdminConsumptionCycleTimeoutMs(), this.config.getAdminConsumptionMaxWorkerThreadPoolSize(), this.pubSubTopicRepository, this.pubSubMessageDeserializer, this.config.getRegionName());
    }

    public void setOffsetToSkip(String str, long j, boolean z) {
        if (!str.equals(this.config.getClusterName())) {
            throw new VeniceException("This AdminConsumptionService is for cluster " + this.config.getClusterName() + ".  Cannot skip admin message with offset " + j + " for cluster " + str);
        }
        if (z) {
            this.consumerTask.skipMessageDIVWithOffset(j);
        } else {
            this.consumerTask.skipMessageWithOffset(j);
        }
    }

    public Long getLastSucceededExecutionIdInCluster(String str) {
        if (str.equals(this.config.getClusterName())) {
            return this.consumerTask.getLastSucceededExecutionId();
        }
        throw new VeniceException("This AdminConsumptionService is for cluster: " + this.config.getClusterName() + ".  Cannot get the last succeed execution Id for cluster: " + str);
    }

    public Long getLastSucceededExecutionId(String str) {
        if (this.consumerTask == null) {
            return null;
        }
        return this.consumerTask.getLastSucceededExecutionId(str);
    }

    public Exception getLastExceptionForStore(String str) {
        if (this.consumerTask == null) {
            return null;
        }
        return this.consumerTask.getLastExceptionForStore(str);
    }

    public long getFailingOffset() {
        return this.consumerTask.getFailingOffset();
    }

    public Map<String, Long> getAdminTopicMetadata(String str) {
        if (str.equals(this.config.getClusterName())) {
            return this.adminTopicMetadataAccessor.getMetadata(str);
        }
        throw new VeniceException("This AdminConsumptionService is for cluster: " + this.config.getClusterName() + ".  Cannot get the last succeed execution Id for cluster: " + str);
    }

    public void updateAdminTopicMetadata(String str, long j, long j2, long j3) {
        if (!str.equals(this.config.getClusterName())) {
            throw new VeniceException("This AdminConsumptionService is for cluster: " + this.config.getClusterName() + ".  Cannot get the last succeed execution Id for cluster: " + str);
        }
        this.adminTopicMetadataAccessor.updateMetadata(str, AdminTopicMetadataAccessor.generateMetadataMap(j2, j3, j));
    }

    private PubSubConsumerAdapter createKafkaConsumer(String str) {
        Properties properties = this.admin.getPubSubSSLProperties(this.remoteConsumptionEnabled ? this.remoteKafkaServerUrl.get() : this.localKafkaServerUrl).toProperties();
        properties.setProperty("kafka.client.id", str);
        properties.setProperty("kafka.auto.offset.reset", "earliest");
        properties.setProperty("kafka.enable.auto.commit", "false");
        return this.consumerFactory.create(new VeniceProperties(properties), false, this.pubSubMessageDeserializer, str);
    }
}
