package com.linkedin.venice.controller;

import com.linkedin.venice.VeniceResource;
import com.linkedin.venice.acl.AclCreationDeletionListener;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.stats.AggPartitionHealthStats;
import com.linkedin.venice.controller.stats.VeniceAdminStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.helix.HelixReadWriteSchemaRepository;
import com.linkedin.venice.helix.HelixReadWriteSchemaRepositoryAdapter;
import com.linkedin.venice.helix.HelixReadWriteStoreRepository;
import com.linkedin.venice.helix.HelixReadWriteStoreRepositoryAdapter;
import com.linkedin.venice.helix.HelixStatusMessageChannel;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.StoragePersonaRepository;
import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.helix.ZkStoreConfigAccessor;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.ReadWriteSchemaRepository;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.pushmonitor.AggPushHealthStats;
import com.linkedin.venice.pushmonitor.AggPushStatusCleanUpStats;
import com.linkedin.venice.pushmonitor.LeakedPushStatusCleanUpService;
import com.linkedin.venice.pushmonitor.PushMonitorDelegator;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/HelixVeniceClusterResources.class */
public class HelixVeniceClusterResources implements VeniceResource {
    private static final Logger LOGGER = LogManager.getLogger(HelixVeniceClusterResources.class);
    private final String clusterName;
    private final SafeHelixManager helixManager;
    private final ClusterLockManager clusterLockManager;
    private final ReadWriteStoreRepository storeMetadataRepository;
    private final HelixExternalViewRepository routingDataRepository;
    private HelixCustomizedViewOfflinePushRepository customizedViewRepo;
    private final ReadWriteSchemaRepository schemaRepository;
    private final HelixStatusMessageChannel messageChannel;
    private final VeniceControllerClusterConfig config;
    private final PushMonitorDelegator pushMonitor;
    private final LeakedPushStatusCleanUpService leakedPushStatusCleanUpService;
    private final ZkRoutersClusterManager routersClusterManager;
    private final AggPartitionHealthStats aggPartitionHealthStats;
    private final ZkStoreConfigAccessor storeConfigAccessor;
    private final Optional<DynamicAccessController> accessController;
    private final ExecutorService errorPartitionResetExecutorService = Executors.newSingleThreadExecutor();
    private final StoragePersonaRepository storagePersonaRepository;
    private ErrorPartitionResetTask errorPartitionResetTask;
    private final Optional<MetaStoreWriter> metaStoreWriter;
    private final VeniceAdminStats veniceAdminStats;
    private final VeniceHelixAdmin admin;

    public HelixVeniceClusterResources(String str, ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, SafeHelixManager safeHelixManager, VeniceControllerConfig veniceControllerConfig, VeniceHelixAdmin veniceHelixAdmin, MetricsRepository metricsRepository, RealTimeTopicSwitcher realTimeTopicSwitcher, Optional<DynamicAccessController> optional, HelixAdminClient helixAdminClient) {
        this.errorPartitionResetTask = null;
        this.clusterName = str;
        this.config = veniceControllerConfig;
        this.helixManager = safeHelixManager;
        this.admin = veniceHelixAdmin;
        if (veniceControllerConfig.isParent()) {
            this.metaStoreWriter = Optional.empty();
        } else {
            this.metaStoreWriter = Optional.of(veniceHelixAdmin.getMetaStoreWriter());
        }
        this.clusterLockManager = new ClusterLockManager(str);
        HelixReadWriteStoreRepository helixReadWriteStoreRepository = new HelixReadWriteStoreRepository(zkClient, helixAdapterSerializer, str, this.metaStoreWriter, this.clusterLockManager);
        this.storeMetadataRepository = new HelixReadWriteStoreRepositoryAdapter(veniceHelixAdmin.getReadOnlyZKSharedSystemStoreRepository(), helixReadWriteStoreRepository, str);
        this.schemaRepository = new HelixReadWriteSchemaRepositoryAdapter(veniceHelixAdmin.getReadOnlyZKSharedSchemaRepository(), new HelixReadWriteSchemaRepository(helixReadWriteStoreRepository, zkClient, helixAdapterSerializer, str, this.metaStoreWriter));
        this.routingDataRepository = new HelixExternalViewRepository(this.helixManager.getInstanceType() == InstanceType.SPECTATOR ? this.helixManager : getSpectatorManager(str, zkClient.getServers()));
        this.customizedViewRepo = new HelixCustomizedViewOfflinePushRepository(this.helixManager, this.storeMetadataRepository);
        this.messageChannel = new HelixStatusMessageChannel(safeHelixManager, new HelixMessageChannelStats(metricsRepository, str), veniceControllerConfig.getHelixSendMessageTimeoutMs());
        VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor = new VeniceOfflinePushMonitorAccessor(str, zkClient, helixAdapterSerializer, veniceControllerConfig.getRefreshAttemptsForZkReconnect(), veniceControllerConfig.getRefreshIntervalForZkReconnectInMs());
        String str2 = veniceControllerConfig.getChildDataCenterKafkaUrlMap().get(veniceControllerConfig.getAggregateRealTimeSourceRegion());
        boolean isUnregisterMetricForDeletedStoreEnabled = veniceControllerConfig.isUnregisterMetricForDeletedStoreEnabled();
        this.pushMonitor = new PushMonitorDelegator(str, this.routingDataRepository, veniceOfflinePushMonitorAccessor, veniceHelixAdmin, this.storeMetadataRepository, new AggPushHealthStats(str, metricsRepository, this.storeMetadataRepository, isUnregisterMetricForDeletedStoreEnabled), realTimeTopicSwitcher, this.clusterLockManager, str2, getActiveActiveRealTimeSourceKafkaURLs(veniceControllerConfig), helixAdminClient, veniceControllerConfig, veniceHelixAdmin.getPushStatusStoreReader().orElse(null));
        this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService(str, veniceOfflinePushMonitorAccessor, this.storeMetadataRepository, veniceHelixAdmin, new AggPushStatusCleanUpStats(str, metricsRepository, this.storeMetadataRepository, isUnregisterMetricForDeletedStoreEnabled), this.config.getLeakedPushStatusCleanUpServiceSleepIntervalInMs(), this.config.getLeakedResourceAllowedLingerTimeInMs());
        this.routersClusterManager = new ZkRoutersClusterManager(zkClient, helixAdapterSerializer, str, veniceControllerConfig.getRefreshAttemptsForZkReconnect(), veniceControllerConfig.getRefreshIntervalForZkReconnectInMs());
        this.aggPartitionHealthStats = new AggPartitionHealthStats(str, metricsRepository, this.routingDataRepository, this.storeMetadataRepository, this.pushMonitor);
        this.storeConfigAccessor = new ZkStoreConfigAccessor(zkClient, helixAdapterSerializer, this.metaStoreWriter);
        this.accessController = optional;
        if (veniceControllerConfig.getErrorPartitionAutoResetLimit() > 0) {
            this.errorPartitionResetTask = new ErrorPartitionResetTask(str, helixAdminClient, this.storeMetadataRepository, this.routingDataRepository, this.pushMonitor, metricsRepository, veniceControllerConfig.getErrorPartitionAutoResetLimit(), veniceControllerConfig.getErrorPartitionProcessingCycleDelay());
        }
        this.veniceAdminStats = new VeniceAdminStats(metricsRepository, "venice-admin-" + str);
        this.storagePersonaRepository = new StoragePersonaRepository(str, this.storeMetadataRepository, helixAdapterSerializer, zkClient);
    }

    private List<String> getActiveActiveRealTimeSourceKafkaURLs(VeniceControllerConfig veniceControllerConfig) {
        ArrayList arrayList = new ArrayList(veniceControllerConfig.getActiveActiveRealTimeSourceFabrics().size());
        for (String str : veniceControllerConfig.getActiveActiveRealTimeSourceFabrics()) {
            String str2 = veniceControllerConfig.getChildDataCenterKafkaUrlMap().get(str);
            if (str2 == null) {
                throw new VeniceException(String.format("No A/A source Kafka URL found for fabric %s in %s", str, veniceControllerConfig.getChildDataCenterKafkaUrlMap()));
            }
            arrayList.add(str2);
        }
        return Collections.unmodifiableList(arrayList);
    }

    private void repairStoreReplicationFactor(ReadWriteStoreRepository readWriteStoreRepository) {
        for (Store store : readWriteStoreRepository.getAllStores()) {
            VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(store.getName());
            if (systemStoreType == null || !systemStoreType.isStoreZkShared()) {
                if (store.getReplicationFactor() <= 0) {
                    int replicationFactor = store.getReplicationFactor();
                    store.setReplicationFactor(this.config.getReplicationFactor());
                    readWriteStoreRepository.updateStore(store);
                    LOGGER.info("Updated replication factor from {} to {} for store: {}, in cluster: {}", Integer.valueOf(replicationFactor), Integer.valueOf(this.config.getReplicationFactor()), store.getName(), this.clusterName);
                }
            }
        }
    }

    public void refresh() {
        clear();
        this.storeMetadataRepository.refresh();
        repairStoreReplicationFactor(this.storeMetadataRepository);
        if (this.accessController.isPresent()) {
            DynamicAccessController dynamicAccessController = this.accessController.get();
            dynamicAccessController.init((List) this.storeMetadataRepository.getAllStores().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
            this.storeMetadataRepository.registerStoreDataChangedListener(new AclCreationDeletionListener(dynamicAccessController));
        }
        this.schemaRepository.refresh();
        this.routingDataRepository.refresh();
        this.customizedViewRepo.refresh();
        this.pushMonitor.loadAllPushes();
        this.routersClusterManager.refresh();
        this.admin.startInstanceMonitor(this.clusterName);
    }

    public void clear() {
        this.pushMonitor.stopAllMonitoring();
        this.storeMetadataRepository.clear();
        this.schemaRepository.clear();
        this.routingDataRepository.clear();
        this.customizedViewRepo.clear();
        this.routersClusterManager.clear();
        this.admin.clearInstanceMonitor(this.clusterName);
    }

    public void startErrorPartitionResetTask() {
        if (this.errorPartitionResetTask != null) {
            this.errorPartitionResetExecutorService.submit(this.errorPartitionResetTask);
        }
    }

    public void stopErrorPartitionResetTask() {
        if (this.errorPartitionResetTask != null) {
            this.errorPartitionResetTask.close();
            this.errorPartitionResetExecutorService.shutdown();
            try {
                this.errorPartitionResetExecutorService.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void startLeakedPushStatusCleanUpService() {
        if (this.leakedPushStatusCleanUpService != null) {
            this.leakedPushStatusCleanUpService.start();
        }
    }

    public void stopLeakedPushStatusCleanUpService() {
        if (this.leakedPushStatusCleanUpService != null) {
            try {
                this.leakedPushStatusCleanUpService.stop();
            } catch (Exception e) {
                LOGGER.error("Error when stopping leaked push status clean-up service for cluster: {}", this.clusterName);
            }
        }
    }

    public ReadWriteStoreRepository getStoreMetadataRepository() {
        return this.storeMetadataRepository;
    }

    public ReadWriteSchemaRepository getSchemaRepository() {
        return this.schemaRepository;
    }

    public HelixExternalViewRepository getRoutingDataRepository() {
        return this.routingDataRepository;
    }

    public HelixCustomizedViewOfflinePushRepository getCustomizedViewRepository() {
        return this.customizedViewRepo;
    }

    void setCustomizedViewRepository(HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository) {
        this.customizedViewRepo = helixCustomizedViewOfflinePushRepository;
    }

    public HelixStatusMessageChannel getMessageChannel() {
        return this.messageChannel;
    }

    public SafeHelixManager getHelixManager() {
        return this.helixManager;
    }

    public VeniceControllerClusterConfig getConfig() {
        return this.config;
    }

    public PushMonitorDelegator getPushMonitor() {
        return this.pushMonitor;
    }

    public ZkRoutersClusterManager getRoutersClusterManager() {
        return this.routersClusterManager;
    }

    public Optional<MetaStoreWriter> getMetaStoreWriter() {
        return this.metaStoreWriter;
    }

    public ZkStoreConfigAccessor getStoreConfigAccessor() {
        return this.storeConfigAccessor;
    }

    public ClusterLockManager getClusterLockManager() {
        return this.clusterLockManager;
    }

    public VeniceAdminStats getVeniceAdminStats() {
        return this.veniceAdminStats;
    }

    public StoragePersonaRepository getStoragePersonaRepository() {
        return this.storagePersonaRepository;
    }

    public AutoCloseableLock lockForShutdown() {
        LOGGER.info("lockForShutdown() called. Will log the current stacktrace and then attempt to acquire the lock.", new VeniceException("Not thrown, for logging purposes only."));
        return this.clusterLockManager.createClusterWriteLock();
    }

    private SafeHelixManager getSpectatorManager(String str, String str2) {
        SafeHelixManager safeHelixManager = new SafeHelixManager(HelixManagerFactory.getZKHelixManager(str, "", InstanceType.SPECTATOR, str2));
        try {
            safeHelixManager.connect();
            return safeHelixManager;
        } catch (Exception e) {
            throw new VeniceException("Spectator manager could not connect to cluster: " + str, e);
        }
    }
}
