package com.linkedin.davinci.helix;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory;
import com.linkedin.davinci.ingestion.DefaultIngestionBackend;
import com.linkedin.davinci.ingestion.IsolatedIngestionBackend;
import com.linkedin.davinci.ingestion.VeniceIngestionBackend;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.notifier.PartitionPushStatusNotifier;
import com.linkedin.davinci.notifier.PushMonitorNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.HelixInstanceConverter;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import com.linkedin.venice.helix.HelixStatusMessageChannel;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.pushmonitor.KillOfflinePushMessage;
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.stats.ThreadPoolStats;
import com.linkedin.venice.status.StatusMessageHandler;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriterFactory;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/helix/HelixParticipationService.class */
public class HelixParticipationService extends AbstractVeniceService implements StatusMessageHandler<KillOfflinePushMessage> {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) HelixParticipationService.class);
    private static final int MAX_RETRY = 30;
    private static final int RETRY_INTERVAL_SEC = 1;
    private final Instance instance;
    private final String clusterName;
    private final String participantName;
    private final String zkAddress;
    private final StoreIngestionService ingestionService;
    private final StorageService storageService;
    private final VeniceConfigLoader veniceConfigLoader;
    private final ReadOnlyStoreRepository helixReadOnlyStoreRepository;
    private final MetricsRepository metricsRepository;
    private final VeniceIngestionBackend ingestionBackend;
    private final CompletableFuture<SafeHelixManager> managerFuture;
    private final CompletableFuture<HelixPartitionStatusAccessor> partitionPushStatusAccessorFuture = new CompletableFuture<>();
    private PushStatusStoreWriter statusStoreWriter;
    private ZkClient zkClient;
    private SafeHelixManager helixManager;
    private AbstractStateModelFactory leaderFollowerParticipantModelFactory;
    private HelixPartitionStatusAccessor partitionPushStatusAccessor;
    private ThreadPoolExecutor leaderFollowerHelixStateTransitionThreadPool;
    private VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor;

    public ThreadPoolExecutor getLeaderFollowerHelixStateTransitionThreadPool() {
        return this.leaderFollowerHelixStateTransitionThreadPool;
    }

    public HelixParticipationService(StoreIngestionService storeIngestionService, StorageService storageService, StorageMetadataService storageMetadataService, VeniceConfigLoader veniceConfigLoader, ReadOnlyStoreRepository readOnlyStoreRepository, MetricsRepository metricsRepository, String str, String str2, int i, String str3, CompletableFuture<SafeHelixManager> completableFuture) {
        this.ingestionService = storeIngestionService;
        this.storageService = storageService;
        this.clusterName = str2;
        this.participantName = Utils.getHelixNodeIdentifier(str3, i);
        this.zkAddress = str;
        this.veniceConfigLoader = veniceConfigLoader;
        this.helixReadOnlyStoreRepository = readOnlyStoreRepository;
        this.metricsRepository = metricsRepository;
        this.instance = new Instance(this.participantName, str3, i);
        this.managerFuture = completableFuture;
        if (!(storeIngestionService instanceof KafkaStoreIngestionService)) {
            throw new VeniceException("Expecting " + KafkaStoreIngestionService.class.getName() + " for ingestion backend!");
        }
        if (veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED)) {
            this.ingestionBackend = new IsolatedIngestionBackend(veniceConfigLoader, readOnlyStoreRepository, metricsRepository, storageMetadataService, (KafkaStoreIngestionService) storeIngestionService, storageService);
        } else {
            this.ingestionBackend = new DefaultIngestionBackend(storageMetadataService, (KafkaStoreIngestionService) storeIngestionService, storageService);
        }
    }

    private ThreadPoolExecutor initHelixStateTransitionThreadPool(int i, String str, MetricsRepository metricsRepository, String str2) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i, 300L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DaemonThreadFactory(str));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        new ThreadPoolStats(metricsRepository, threadPoolExecutor, str2);
        return threadPoolExecutor;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() {
        LOGGER.info("Attempting to start HelixParticipation service");
        this.helixManager = new SafeHelixManager(HelixManagerFactory.getZKHelixManager(this.clusterName, this.participantName, InstanceType.PARTICIPANT, this.zkAddress));
        VeniceServerConfig veniceServerConfig = this.veniceConfigLoader.getVeniceServerConfig();
        this.leaderFollowerHelixStateTransitionThreadPool = initHelixStateTransitionThreadPool(veniceServerConfig.getMaxLeaderFollowerStateTransitionThreadNumber(), "Venice-L/F-state-transition", this.metricsRepository, "Venice_L/F_ST_thread_pool");
        if (veniceServerConfig.getLeaderFollowerThreadPoolStrategy().equals(LeaderFollowerPartitionStateModelFactory.LeaderFollowerThreadPoolStrategy.DUAL_POOL_STRATEGY)) {
            this.leaderFollowerParticipantModelFactory = new LeaderFollowerPartitionStateModelDualPoolFactory(this.ingestionBackend, this.veniceConfigLoader, this.leaderFollowerHelixStateTransitionThreadPool, initHelixStateTransitionThreadPool(veniceServerConfig.getMaxFutureVersionLeaderFollowerStateTransitionThreadNumber(), "venice-L/F-state-transition-future-version", this.metricsRepository, "Venice_L/F_ST_thread_pool_future_version"), this.helixReadOnlyStoreRepository, this.partitionPushStatusAccessorFuture, this.instance.getNodeId());
        } else {
            this.leaderFollowerParticipantModelFactory = new LeaderFollowerPartitionStateModelFactory(this.ingestionBackend, this.veniceConfigLoader, this.leaderFollowerHelixStateTransitionThreadPool, this.helixReadOnlyStoreRepository, this.partitionPushStatusAccessorFuture, this.instance.getNodeId());
        }
        LOGGER.info("LeaderFollower threadPool info: strategy = {}, max future state transition thread = {}", veniceServerConfig.getLeaderFollowerThreadPoolStrategy(), Integer.valueOf(veniceServerConfig.getMaxFutureVersionLeaderFollowerStateTransitionThreadNumber()));
        this.helixManager.getStateMachineEngine().registerStateModelFactory(LeaderStandbySMD.name, this.leaderFollowerParticipantModelFactory);
        this.helixManager.setLiveInstanceInfoProvider(() -> {
            return HelixInstanceConverter.convertInstanceToZNRecord(this.instance);
        });
        new HelixStatusMessageChannel(this.helixManager, new HelixMessageChannelStats(this.metricsRepository, this.clusterName)).registerHandler(KillOfflinePushMessage.class, this);
        asyncStart();
        return false;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws IOException {
        LOGGER.info("Attempting to stop HelixParticipation service.");
        this.ingestionBackend.prepareForShutdown();
        if (this.helixManager != null) {
            try {
                this.helixManager.disconnect();
                LOGGER.info("Disconnected Helix Manager.");
            } catch (Exception e) {
                LOGGER.error("Swallowed an exception while trying to disconnect the {}", this.helixManager.getClass().getSimpleName(), e);
            }
        } else {
            LOGGER.info("Helix Manager is null.");
        }
        this.ingestionBackend.close();
        LOGGER.info("Closed VeniceIngestionBackend.");
        this.leaderFollowerParticipantModelFactory.shutDownExecutor();
        try {
            this.leaderFollowerParticipantModelFactory.waitExecutorTermination(30L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        if (this.zkClient != null) {
            LOGGER.info("Start closing ZkClient.");
            this.zkClient.close();
            LOGGER.info("Closed ZkClient.");
        }
        LOGGER.info("Finished stopping HelixParticipation service.");
    }

    private void checkBeforeJoinInCluster() {
        ZKHelixAdmin zKHelixAdmin = new ZKHelixAdmin(this.zkAddress);
        try {
            HelixUtils.checkClusterSetup(zKHelixAdmin, this.clusterName, 30, 1);
            if (zKHelixAdmin.getInstancesInCluster(this.clusterName).contains(this.instance.getNodeId())) {
                LOGGER.info("{} is not a new node to cluster: {}, skip the cleaning up.", this.instance.getNodeId(), this.clusterName);
                return;
            }
            LOGGER.info("{} is a new node or had been removed from cluster: {} start cleaning up local storage.", this.instance.getNodeId(), this.clusterName);
            this.storageService.cleanupAllStores(this.veniceConfigLoader);
            LOGGER.info("Cleaning up complete, {} can now join cluster: {}", this.instance.getNodeId(), this.clusterName);
        } finally {
            zKHelixAdmin.close();
        }
    }

    private void asyncStart() {
        this.zkClient = ZkClientFactory.newZkClient(this.zkAddress);
        VeniceProperties clusterProperties = this.veniceConfigLoader.getVeniceServerConfig().getClusterProperties();
        this.statusStoreWriter = new PushStatusStoreWriter(new VeniceWriterFactory(clusterProperties.toProperties()), this.instance.getNodeId(), clusterProperties.getInt(ConfigKeys.PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1));
        this.veniceOfflinePushMonitorAccessor = new VeniceOfflinePushMonitorAccessor(this.clusterName, this.zkClient, new HelixAdapterSerializer(), this.veniceConfigLoader.getVeniceClusterConfig().getRefreshAttemptsForZkReconnect(), this.veniceConfigLoader.getVeniceClusterConfig().getRefreshIntervalForZkReconnectInMs());
        this.ingestionBackend.addPushStatusNotifier(new PushMonitorNotifier(this.veniceOfflinePushMonitorAccessor, this.statusStoreWriter, this.helixReadOnlyStoreRepository, this.instance.getNodeId()));
        CompletableFuture.runAsync(() -> {
            try {
                checkBeforeJoinInCluster();
                this.helixManager.connect();
                this.managerFuture.complete(this.helixManager);
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), (Throwable) e);
                LOGGER.error("Venice server is about to close");
                Utils.exit("Failed to start HelixParticipationService");
            }
            this.partitionPushStatusAccessor = new HelixPartitionStatusAccessor(this.helixManager.getOriginalManager(), this.instance.getNodeId(), this.veniceConfigLoader.getVeniceServerConfig().isHelixHybridStoreQuotaEnabled());
            this.ingestionBackend.addPushStatusNotifier(new PartitionPushStatusNotifier(this.partitionPushStatusAccessor));
            this.partitionPushStatusAccessorFuture.complete(this.partitionPushStatusAccessor);
            LOGGER.info("Successfully started Helix partition status accessor.");
            this.serviceState.set(AbstractVeniceService.ServiceState.STARTED);
            LOGGER.info("Successfully started Helix Participation Service.");
        });
    }

    public void replaceAndAddTestIngestionNotifier(VeniceNotifier veniceNotifier) {
        this.ingestionBackend.replaceAndAddTestPushStatusNotifier(veniceNotifier);
    }

    public Instance getInstance() {
        return this.instance;
    }

    public VeniceOfflinePushMonitorAccessor getVeniceOfflinePushMonitorAccessor() {
        return this.veniceOfflinePushMonitorAccessor;
    }

    public PushStatusStoreWriter getStatusStoreWriter() {
        return this.statusStoreWriter;
    }

    public ReadOnlyStoreRepository getHelixReadOnlyStoreRepository() {
        return this.helixReadOnlyStoreRepository;
    }

    @Override // com.linkedin.venice.status.StatusMessageHandler
    public void handleMessage(KillOfflinePushMessage killOfflinePushMessage) {
        VeniceStoreVersionConfig storeConfig = this.veniceConfigLoader.getStoreConfig(killOfflinePushMessage.getKafkaTopic());
        if (!this.ingestionService.containsRunningConsumption(storeConfig)) {
            LOGGER.info("Ignore the kill message for topic: {}", killOfflinePushMessage.getKafkaTopic());
            return;
        }
        LOGGER.info("Receive the message to kill consumption for topic: {}, msgId: {}", killOfflinePushMessage.getKafkaTopic(), killOfflinePushMessage.getMessageId());
        this.ingestionService.killConsumptionTask(storeConfig.getStoreVersionName());
        LOGGER.info("Killed Consumption for topic: {}, msgId: {}", killOfflinePushMessage.getKafkaTopic(), killOfflinePushMessage.getMessageId());
    }
}
