package com.linkedin.venice.controller;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.acl.AclException;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.authorization.AceEntry;
import com.linkedin.venice.authorization.AclBinding;
import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.authorization.IdentityParser;
import com.linkedin.venice.authorization.Method;
import com.linkedin.venice.authorization.Permission;
import com.linkedin.venice.authorization.Principal;
import com.linkedin.venice.authorization.Resource;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.authorization.SystemStoreAclSynchronizationTask;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService;
import com.linkedin.venice.controller.kafka.protocol.admin.AbortMigration;
import com.linkedin.venice.controller.kafka.protocol.admin.AddVersion;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.ConfigureActiveActiveReplicationForCluster;
import com.linkedin.venice.controller.kafka.protocol.admin.ConfigureNativeReplicationForCluster;
import com.linkedin.venice.controller.kafka.protocol.admin.CreateStoragePersona;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteAllVersions;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteOldVersion;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteStoragePersona;
import com.linkedin.venice.controller.kafka.protocol.admin.DeleteStore;
import com.linkedin.venice.controller.kafka.protocol.admin.DerivedSchemaCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.DisableStoreRead;
import com.linkedin.venice.controller.kafka.protocol.admin.ETLStoreConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.EnableStoreRead;
import com.linkedin.venice.controller.kafka.protocol.admin.HybridStoreConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.KillOfflinePushJob;
import com.linkedin.venice.controller.kafka.protocol.admin.MetaSystemStoreAutoCreationValidation;
import com.linkedin.venice.controller.kafka.protocol.admin.MetadataSchemaCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.MigrateStore;
import com.linkedin.venice.controller.kafka.protocol.admin.PartitionerConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.PauseStore;
import com.linkedin.venice.controller.kafka.protocol.admin.PushStatusSystemStoreAutoCreationValidation;
import com.linkedin.venice.controller.kafka.protocol.admin.ResumeStore;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.SetStoreCurrentVersion;
import com.linkedin.venice.controller.kafka.protocol.admin.SetStoreOwner;
import com.linkedin.venice.controller.kafka.protocol.admin.SetStorePartitionCount;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.SupersetSchemaCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStoragePersona;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore;
import com.linkedin.venice.controller.kafka.protocol.admin.ValueSchemaCreation;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
import com.linkedin.venice.controller.lingeringjob.DefaultLingeringStoreVersionChecker;
import com.linkedin.venice.controller.lingeringjob.LingeringStoreVersionChecker;
import com.linkedin.venice.controller.migration.MigrationPushStrategyZKAccessor;
import com.linkedin.venice.controller.supersetschema.DefaultSupersetSchemaGenerator;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
import com.linkedin.venice.controllerapi.AdminCommandExecution;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.MultiStoreStatusResponse;
import com.linkedin.venice.controllerapi.NodeReplicasReadinessState;
import com.linkedin.venice.controllerapi.ReadyForDataRecoveryResponse;
import com.linkedin.venice.controllerapi.RegionPushDetailsResponse;
import com.linkedin.venice.controllerapi.RepushInfo;
import com.linkedin.venice.controllerapi.StoreComparisonInfo;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateClusterConfigQueryParams;
import com.linkedin.venice.controllerapi.UpdateStoragePersonaQueryParams;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionResponse;
import com.linkedin.venice.exceptions.ConcurrentBatchPushException;
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.PartitionerSchemaMismatchException;
import com.linkedin.venice.exceptions.ResourceStillExistsException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceHttpException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSystemStoreRepository;
import com.linkedin.venice.helix.ParentHelixOfflinePushAccessor;
import com.linkedin.venice.helix.Replica;
import com.linkedin.venice.helix.StoragePersonaRepository;
import com.linkedin.venice.helix.ZkStoreConfigAccessor;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.ETLStoreConfig;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.RegionPushDetails;
import com.linkedin.venice.meta.RoutersClusterConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.StoreDataAudit;
import com.linkedin.venice.meta.StoreGraveyard;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.persona.StoragePersona;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.GeneratedSchemaID;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatKey;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.status.protocol.PushJobStatusRecordKey;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.AvroSchemaUtils;
import com.linkedin.venice.utils.CollectionUtils;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.ReflectUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.views.ViewUtils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/VeniceParentHelixAdmin.class */
public class VeniceParentHelixAdmin implements Admin {
    private static final long SLEEP_INTERVAL_FOR_DATA_CONSUMPTION_IN_MS = 1000;
    private static final long SLEEP_INTERVAL_FOR_ASYNC_SETUP_MS = 3000;
    private static final int MAX_ASYNC_SETUP_RETRY_COUNT = 10;
    private static final String VENICE_INTERNAL_STORE_OWNER = "venice-internal";
    private static final String PUSH_JOB_DETAILS_STORE_DESCRIPTOR = "push job details store: ";
    private static final String BATCH_JOB_HEARTBEAT_STORE_DESCRIPTOR = "batch job liveness heartbeat store: ";
    static final int STORE_VERSION_RETENTION_COUNT = 5;
    private static final long TOPIC_DELETION_DELAY_MS = 300000;
    final Map<String, Boolean> asyncSetupEnabledMap;
    private final VeniceHelixAdmin veniceHelixAdmin;
    private final Map<String, VeniceWriter<byte[], byte[], byte[]>> veniceWriterMap;
    private final AdminTopicMetadataAccessor adminTopicMetadataAccessor;
    private final byte[] emptyKeyByteArr;
    private final AdminOperationSerializer adminOperationSerializer;
    private final VeniceControllerMultiClusterConfig multiClusterConfigs;
    private final Map<String, Map<String, Lock>> perStoreAdminLocks;
    private final Map<String, Lock> perClusterAdminLocks;
    private final Map<String, AdminCommandExecutionTracker> adminCommandExecutionTrackers;
    private final Set<String> executionIdValidatedClusters;
    private final ExecutorService asyncSetupExecutor;
    private final ExecutorService topicCheckerExecutor;
    private final TerminalStateTopicCheckerForParentController terminalStateTopicChecker;
    private final SystemStoreAclSynchronizationTask systemStoreAclSynchronizationTask;
    private final UserSystemStoreLifeCycleHelper systemStoreLifeCycleHelper;
    private final WriteComputeSchemaConverter writeComputeSchemaConverter;
    private Time timer;
    private Optional<SSLFactory> sslFactory;
    private String token;
    private final MigrationPushStrategyZKAccessor pushStrategyZKAccessor;
    private final PubSubTopicRepository pubSubTopicRepository;
    private ParentHelixOfflinePushAccessor offlinePushAccessor;
    private int maxErroredTopicNumToKeep;
    private final int waitingTimeForConsumptionMs;
    private final boolean batchJobHeartbeatEnabled;
    private Optional<DynamicAccessController> accessController;
    private final Optional<AuthorizerService> authorizerService;
    private final ExecutorService systemStoreAclSynchronizationExecutor;
    private final LingeringStoreVersionChecker lingeringStoreVersionChecker;
    private final Optional<SupersetSchemaGenerator> externalSupersetSchemaGenerator;
    private final SupersetSchemaGenerator defaultSupersetSchemaGenerator;
    private final IdentityParser identityParser;
    private final Map<String, Map<String, ControllerClient>> newFabricControllerClientMap;
    private static final Logger LOGGER = LogManager.getLogger(VeniceParentHelixAdmin.class);
    private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0];

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.venice.controller.VeniceParentHelixAdmin$2, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/controller/VeniceParentHelixAdmin$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$common$VeniceSystemStoreType = new int[VeniceSystemStoreType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$common$VeniceSystemStoreType[VeniceSystemStoreType.META_STORE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$common$VeniceSystemStoreType[VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public VeniceParentHelixAdmin(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig) {
        this(veniceHelixAdmin, veniceControllerMultiClusterConfig, false, Optional.empty(), Optional.empty());
    }

    public VeniceParentHelixAdmin(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig, boolean z, Optional<SSLConfig> optional, Optional<AuthorizerService> optional2) {
        this(veniceHelixAdmin, veniceControllerMultiClusterConfig, z, optional, Optional.empty(), optional2, new DefaultLingeringStoreVersionChecker());
    }

    public VeniceParentHelixAdmin(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig, boolean z, Optional<SSLConfig> optional, Optional<DynamicAccessController> optional2, Optional<AuthorizerService> optional3, LingeringStoreVersionChecker lingeringStoreVersionChecker) {
        this(veniceHelixAdmin, veniceControllerMultiClusterConfig, z, optional, optional2, optional3, lingeringStoreVersionChecker, WriteComputeSchemaConverter.getInstance(), Optional.empty(), new PubSubTopicRepository());
    }

    public VeniceParentHelixAdmin(VeniceHelixAdmin veniceHelixAdmin, VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig, boolean z, Optional<SSLConfig> optional, Optional<DynamicAccessController> optional2, Optional<AuthorizerService> optional3, LingeringStoreVersionChecker lingeringStoreVersionChecker, WriteComputeSchemaConverter writeComputeSchemaConverter, Optional<SupersetSchemaGenerator> optional4, PubSubTopicRepository pubSubTopicRepository) {
        this.emptyKeyByteArr = new byte[0];
        this.adminOperationSerializer = new AdminOperationSerializer();
        this.perStoreAdminLocks = new ConcurrentHashMap();
        this.perClusterAdminLocks = new ConcurrentHashMap();
        this.executionIdValidatedClusters = new HashSet();
        this.asyncSetupExecutor = Executors.newCachedThreadPool();
        this.topicCheckerExecutor = Executors.newSingleThreadExecutor();
        this.timer = new SystemTime();
        this.sslFactory = Optional.empty();
        this.defaultSupersetSchemaGenerator = new DefaultSupersetSchemaGenerator();
        this.newFabricControllerClientMap = new VeniceConcurrentHashMap();
        Validate.notNull(lingeringStoreVersionChecker);
        Validate.notNull(writeComputeSchemaConverter);
        this.veniceHelixAdmin = veniceHelixAdmin;
        this.multiClusterConfigs = veniceControllerMultiClusterConfig;
        this.waitingTimeForConsumptionMs = this.multiClusterConfigs.getParentControllerWaitingTimeForConsumptionMs();
        this.batchJobHeartbeatEnabled = this.multiClusterConfigs.getBatchJobHeartbeatEnabled();
        this.veniceWriterMap = new ConcurrentHashMap();
        this.adminTopicMetadataAccessor = new ZkAdminTopicMetadataAccessor(this.veniceHelixAdmin.getZkClient(), this.veniceHelixAdmin.getAdapterSerializer());
        this.adminCommandExecutionTrackers = new HashMap();
        this.asyncSetupEnabledMap = new VeniceConcurrentHashMap();
        this.accessController = optional2;
        this.authorizerService = optional3;
        this.externalSupersetSchemaGenerator = optional4;
        this.pubSubTopicRepository = pubSubTopicRepository;
        this.systemStoreAclSynchronizationExecutor = (ExecutorService) optional3.map(authorizerService -> {
            return Executors.newSingleThreadExecutor();
        }).orElse(null);
        if (z) {
            try {
                this.sslFactory = Optional.of(SslUtils.getSSLFactory(optional.get().getSslProperties(), this.multiClusterConfigs.getSslFactoryClassName()));
            } catch (Exception e) {
                LOGGER.error("Failed to create SSL engine", e);
                throw new VeniceException(e);
            }
        }
        this.token = this.multiClusterConfigs.getToken();
        for (String str : this.multiClusterConfigs.getClusters()) {
            VeniceControllerConfig controllerConfig = this.multiClusterConfigs.getControllerConfig(str);
            this.adminCommandExecutionTrackers.put(str, new AdminCommandExecutionTracker(controllerConfig.getClusterName(), this.veniceHelixAdmin.getExecutionIdAccessor(), this.veniceHelixAdmin.getControllerClientMap(controllerConfig.getClusterName())));
            this.perStoreAdminLocks.put(str, new ConcurrentHashMap());
            this.perClusterAdminLocks.put(str, new ReentrantLock());
        }
        this.pushStrategyZKAccessor = new MigrationPushStrategyZKAccessor(this.veniceHelixAdmin.getZkClient(), this.veniceHelixAdmin.getAdapterSerializer());
        this.maxErroredTopicNumToKeep = this.multiClusterConfigs.getParentControllerMaxErroredTopicNumToKeep();
        this.offlinePushAccessor = new ParentHelixOfflinePushAccessor(this.veniceHelixAdmin.getZkClient(), this.veniceHelixAdmin.getAdapterSerializer());
        this.terminalStateTopicChecker = new TerminalStateTopicCheckerForParentController(this, this.veniceHelixAdmin.getStoreConfigRepo(), this.multiClusterConfigs.getTerminalStateTopicCheckerDelayMs());
        this.topicCheckerExecutor.submit(this.terminalStateTopicChecker);
        this.systemStoreAclSynchronizationTask = (SystemStoreAclSynchronizationTask) optional3.map(authorizerService2 -> {
            return new SystemStoreAclSynchronizationTask(authorizerService2, this, this.multiClusterConfigs.getSystemStoreAclSynchronizationDelayMs());
        }).orElse(null);
        if (this.systemStoreAclSynchronizationTask != null) {
            this.systemStoreAclSynchronizationExecutor.submit(this.systemStoreAclSynchronizationTask);
        }
        this.lingeringStoreVersionChecker = lingeringStoreVersionChecker;
        this.systemStoreLifeCycleHelper = new UserSystemStoreLifeCycleHelper(this, optional3, veniceControllerMultiClusterConfig);
        this.writeComputeSchemaConverter = writeComputeSchemaConverter;
        this.identityParser = (IdentityParser) ReflectUtils.callConstructor(ReflectUtils.loadClass(veniceControllerMultiClusterConfig.getCommonConfig().getIdentityParserClassName()), new Class[0], new Object[0]);
    }

    void setMaxErroredTopicNumToKeep(int i) {
        this.maxErroredTopicNumToKeep = i;
    }

    void setVeniceWriterForCluster(String str, VeniceWriter veniceWriter) {
        this.veniceWriterMap.putIfAbsent(str, veniceWriter);
    }

    @Override // com.linkedin.venice.controller.Admin
    public synchronized void initStorageCluster(String str) {
        getVeniceHelixAdmin().initStorageCluster(str);
        this.asyncSetupEnabledMap.put(str, true);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(str));
        TopicManager topicManager = getTopicManager();
        if (topicManager.containsTopicAndAllPartitionsAreOnline(topic)) {
            LOGGER.info("Admin topic: {} for cluster: {} already exists.", topic, str);
        } else {
            topicManager.createTopic(topic, 1, getMultiClusterConfigs().getControllerConfig(str).getAdminTopicReplicationFactor(), true, false, getMultiClusterConfigs().getControllerConfig(str).getMinInSyncReplicasAdminTopics());
            LOGGER.info("Created admin topic: {} for cluster: {}", topic, str);
        }
        this.veniceWriterMap.computeIfAbsent(str, str2 -> {
            return getVeniceWriterFactory().createVeniceWriter(new VeniceWriterOptions.Builder(topic.getName()).setTime(getTimer()).setPartitionCount(1).build());
        });
        if (!getMultiClusterConfigs().getPushJobStatusStoreClusterName().isEmpty() && str.equals(getMultiClusterConfigs().getPushJobStatusStoreClusterName())) {
            asyncSetupForInternalRTStore(getMultiClusterConfigs().getPushJobStatusStoreClusterName(), VeniceSystemStoreUtils.getPushJobDetailsStoreName(), PUSH_JOB_DETAILS_STORE_DESCRIPTOR + VeniceSystemStoreUtils.getPushJobDetailsStoreName(), PushJobStatusRecordKey.getClassSchema().toString(), PushJobDetails.getClassSchema().toString(), getMultiClusterConfigs().getControllerConfig(str).getNumberOfPartition(), new UpdateStoreQueryParams().setHybridDataReplicationPolicy(DataReplicationPolicy.AGGREGATE));
        }
        maybeSetupBatchJobLivenessHeartbeatStore(str);
    }

    private void maybeSetupBatchJobLivenessHeartbeatStore(String str) {
        String batchJobHeartbeatStoreCluster = getMultiClusterConfigs().getBatchJobHeartbeatStoreCluster();
        String systemStoreName = AvroProtocolDefinition.BATCH_JOB_HEARTBEAT.getSystemStoreName();
        if (!Objects.equals(str, batchJobHeartbeatStoreCluster)) {
            LOGGER.info("Skip creating the batch job liveness heartbeat store: {} in cluster: {} since the designated cluster is: {}", systemStoreName, str, batchJobHeartbeatStoreCluster);
        } else {
            asyncSetupForInternalRTStore(str, systemStoreName, BATCH_JOB_HEARTBEAT_STORE_DESCRIPTOR + systemStoreName, BatchJobHeartbeatKey.getClassSchema().toString(), BatchJobHeartbeatValue.getClassSchema().toString(), getMultiClusterConfigs().getControllerConfig(str).getNumberOfPartition(), new UpdateStoreQueryParams().setHybridDataReplicationPolicy(DataReplicationPolicy.ACTIVE_ACTIVE).setActiveActiveReplicationEnabled(true));
        }
    }

    private void asyncSetupForInternalRTStore(String str, String str2, String str3, String str4, String str5, int i, UpdateStoreQueryParams updateStoreQueryParams) {
        this.asyncSetupExecutor.submit(() -> {
            int i2 = 0;
            boolean z = false;
            while (!z && this.asyncSetupEnabledMap.get(str).booleanValue() && i2 < MAX_ASYNC_SETUP_RETRY_COUNT) {
                if (i2 > 0) {
                    try {
                        try {
                            try {
                                this.timer.sleep(SLEEP_INTERVAL_FOR_ASYNC_SETUP_MS);
                            } catch (VeniceException e) {
                                LOGGER.warn("VeniceException occurred during {} setup with store: {} in cluster: {}", str3, str2, str, e);
                                LOGGER.info("Async setup for {} attempts: {}/{}", str3, Integer.valueOf(i2), Integer.valueOf(MAX_ASYNC_SETUP_RETRY_COUNT));
                                i2++;
                            }
                        } catch (Exception e2) {
                            LOGGER.error("Exception occurred aborting {} setup with store: {} in cluster: {}", str3, str2, str, e2);
                            int i3 = i2 + 1;
                        }
                    } catch (Throwable th) {
                        int i4 = i2 + 1;
                        throw th;
                    }
                }
                z = createOrVerifyInternalStore(str, str2, str3, str4, str5, i, updateStoreQueryParams);
                i2++;
            }
            if (z) {
                LOGGER.info("{} has been successfully created or it already exists in cluster: {}", str3, str);
            } else {
                LOGGER.error("Unable to create or verify the {} in cluster: {}", str3, str);
            }
        });
    }

    private boolean createOrVerifyInternalStore(String str, String str2, String str3, String str4, String str5, int i, UpdateStoreQueryParams updateStoreQueryParams) {
        boolean z = false;
        if (isLeaderControllerFor(str)) {
            Store store = getStore(str, str2);
            if (store == null) {
                createStore(str, str2, VENICE_INTERNAL_STORE_OWNER, str4, str5, true);
                store = getStore(str, str2);
                if (store == null) {
                    throw new VeniceException("Unable to create or fetch the " + str3);
                }
            } else {
                LOGGER.info("Internal store: {} already exists in cluster: {}", str2, str);
            }
            if (!store.isHybrid()) {
                if (!updateStoreQueryParams.getHybridOffsetLagThreshold().isPresent()) {
                    updateStoreQueryParams.setHybridOffsetLagThreshold(100L);
                }
                if (!updateStoreQueryParams.getHybridRewindSeconds().isPresent()) {
                    updateStoreQueryParams.setHybridRewindSeconds(TimeUnit.DAYS.toSeconds(7L));
                }
                updateStore(str, str2, updateStoreQueryParams);
                store = getStore(str, str2);
                if (!store.isHybrid()) {
                    throw new VeniceException("Unable to update the " + str3 + " to a hybrid store");
                }
                LOGGER.info("Enabled hybrid for internal store: {} in cluster: {}", str2, str);
            }
            if (store.getVersions().isEmpty()) {
                writeEndOfPush(str, str2, incrementVersionIdempotent(str, str2, Version.guidBasedDummyPushId(), i, getReplicationFactor(str, str2)).getNumber(), true);
                if (getStore(str, str2).getVersions().isEmpty()) {
                    throw new VeniceException("Unable to initialize a version for the " + str3);
                }
                LOGGER.info("Created a version for internal store: {} in cluster: {}", str2, str);
            }
            if (!getRealTimeTopic(str, str2).equals(Version.composeRealTimeTopic(str2))) {
                throw new VeniceException("Unexpected real time topic name for the " + str3);
            }
            z = true;
        } else {
            ControllerClient constructClusterControllerClient = ControllerClient.constructClusterControllerClient(str, getLeaderController(str).getUrl(false), this.sslFactory, this.token);
            try {
                StoreResponse store2 = constructClusterControllerClient.getStore(str2);
                if (store2.isError()) {
                    LOGGER.warn("Failed to verify if {} exists from the controller with URL: {}", str3, constructClusterControllerClient.getControllerDiscoveryUrls());
                    if (constructClusterControllerClient != null) {
                        constructClusterControllerClient.close();
                    }
                    return false;
                }
                StoreInfo store3 = store2.getStore();
                PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(str2));
                if (store3.getHybridStoreConfig() != null && !store3.getVersions().isEmpty() && ((Version) store3.getVersion(store3.getLargestUsedVersionNumber()).get()).getPartitionCount() == i && getTopicManager().containsTopicAndAllPartitionsAreOnline(topic)) {
                    z = true;
                }
                if (constructClusterControllerClient != null) {
                    constructClusterControllerClient.close();
                }
            } catch (Throwable th) {
                if (constructClusterControllerClient != null) {
                    try {
                        constructClusterControllerClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return z;
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isClusterValid(String str) {
        return getVeniceHelixAdmin().isClusterValid(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isBatchJobHeartbeatEnabled() {
        return this.batchJobHeartbeatEnabled;
    }

    private void sendAdminMessageAndWaitForConsumed(String str, String str2, AdminOperation adminOperation) {
        if (!this.veniceWriterMap.containsKey(str)) {
            throw new VeniceException("Cluster: " + str + " is not started yet!");
        }
        acquireAdminMessageExecutionIdLock(str);
        try {
            checkAndRepairCorruptedExecutionId(str);
            AutoCloseableLock createClusterReadLock = this.veniceHelixAdmin.getHelixVeniceClusterResources(str).getClusterLockManager().createClusterReadLock();
            try {
                AdminCommandExecutionTracker adminCommandExecutionTracker = this.adminCommandExecutionTrackers.get(str);
                AdminCommandExecution createExecution = adminCommandExecutionTracker.createExecution(AdminMessageType.valueOf(adminOperation).name());
                adminOperation.executionId = createExecution.getExecutionId();
                try {
                    LOGGER.info("Sent message: {} to kafka, offset: {}", adminOperation, Long.valueOf(((PubSubProduceResult) this.veniceWriterMap.get(str).put(this.emptyKeyByteArr, this.adminOperationSerializer.serialize(adminOperation), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get()).getOffset()));
                    adminCommandExecutionTracker.startTrackingExecution(createExecution);
                    if (createClusterReadLock != null) {
                        createClusterReadLock.close();
                    }
                    waitingMessageToBeConsumed(str, str2, adminOperation.executionId);
                } catch (Exception e) {
                    throw new VeniceException("Got exception during sending message to Kafka -- " + e.getMessage(), e);
                }
            } finally {
            }
        } finally {
            releaseAdminMessageExecutionIdLock(str);
        }
    }

    private void checkAndRepairCorruptedExecutionId(String str) {
        if (this.executionIdValidatedClusters.contains(str)) {
            return;
        }
        ExecutionIdAccessor executionIdAccessor = getVeniceHelixAdmin().getExecutionIdAccessor();
        long longValue = executionIdAccessor.getLastGeneratedExecutionId(str).longValue();
        long executionId = AdminTopicMetadataAccessor.getExecutionId(this.adminTopicMetadataAccessor.getMetadata(str));
        if (longValue < executionId) {
            LOGGER.warn("Invalid executionId state detected, last generated execution id: {}, last consumed execution id: {}. Resetting last generated execution id to: {}", Long.valueOf(longValue), Long.valueOf(executionId), Long.valueOf(executionId));
            executionIdAccessor.updateLastGeneratedExecutionId(str, Long.valueOf(executionId));
        }
        this.executionIdValidatedClusters.add(str);
    }

    private void waitingMessageToBeConsumed(String str, String str2, long j) {
        long milliseconds = SystemTime.INSTANCE.getMilliseconds();
        while (true) {
            Long lastSucceededExecutionId = getVeniceHelixAdmin().getLastSucceededExecutionId(str, str2);
            if (lastSucceededExecutionId != null && lastSucceededExecutionId.longValue() >= j) {
                LOGGER.info("The message has been consumed, execution id: {}", Long.valueOf(j));
                return;
            } else {
                if (SystemTime.INSTANCE.getMilliseconds() - milliseconds > this.waitingTimeForConsumptionMs) {
                    Exception lastExceptionForStore = str2 == null ? null : getVeniceHelixAdmin().getLastExceptionForStore(str, str2);
                    throw new VeniceException((("Timed out after waiting for " + this.waitingTimeForConsumptionMs + "ms for admin consumption to catch up.") + " Consumed execution id: " + lastSucceededExecutionId + ", waiting to be consumed id: " + j) + (lastExceptionForStore == null ? "" : " Last exception: " + lastExceptionForStore.getMessage()), lastExceptionForStore);
                }
                LOGGER.info("Waiting execution id: {} to be consumed, currently at: {}", Long.valueOf(j), lastSucceededExecutionId);
                Utils.sleep(SLEEP_INTERVAL_FOR_DATA_CONSUMPTION_IN_MS);
            }
        }
    }

    private void acquireAdminMessageExecutionIdLock(String str) {
        try {
            if (str == null) {
                throw new VeniceException("Cannot acquire admin message execution id lock with a null cluster name");
            }
            if (!this.perClusterAdminLocks.get(str).tryLock(this.waitingTimeForConsumptionMs, TimeUnit.MILLISECONDS)) {
                throw new VeniceException("Failed to acquire cluster level admin message execution id lock after waiting for " + this.waitingTimeForConsumptionMs + "ms. Another ongoing admin operation might be holding up the lock for cluster:" + str);
            }
        } catch (InterruptedException e) {
            throw new VeniceException("Got interrupted during acquiring lock", e);
        }
    }

    private void releaseAdminMessageExecutionIdLock(String str) {
        if (str == null) {
            throw new VeniceException("Cannot release admin message execution id lock with null cluster name");
        }
        this.perClusterAdminLocks.get(str).unlock();
    }

    private void acquireAdminMessageLock(String str, String str2) {
        try {
            if (str == null) {
                throw new VeniceException("Cannot acquire admin message lock with a null cluster name");
            }
            if (str2 == null) {
                throw new VeniceException("Cannot acquire admin message lock with a null name");
            }
            Exception lastExceptionForStore = getVeniceHelixAdmin().getLastExceptionForStore(str, str2);
            if (lastExceptionForStore != null) {
                throw new VeniceException("Unable to start new admin operations for store: " + str2 + " in cluster: " + str + " due to existing exception: " + lastExceptionForStore.getMessage(), lastExceptionForStore);
            }
            if (!this.perStoreAdminLocks.get(str).computeIfAbsent(str2, str3 -> {
                return new ReentrantLock();
            }).tryLock(this.waitingTimeForConsumptionMs, TimeUnit.MILLISECONDS)) {
                throw new VeniceException("Failed to acquire store level admin message lock after waiting for " + this.waitingTimeForConsumptionMs + "ms. Another ongoing admin operation might be holding up the lock for store:" + str2);
            }
        } catch (InterruptedException e) {
            throw new VeniceException("Got interrupted during acquiring lock", e);
        }
    }

    private void releaseAdminMessageLock(String str, String str2) {
        if (str == null) {
            throw new VeniceException("Cannot release admin message lock with null cluster name");
        }
        if (str2 == null) {
            throw new VeniceException("Cannot release admin message lock with null store name");
        }
        Lock lock = this.perStoreAdminLocks.get(str).get(str2);
        if (lock != null) {
            lock.unlock();
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void createStore(String str, String str2, String str3, String str4, String str5, boolean z, Optional<String> optional) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForCreateStore(str, str2, str4, str5, z, false);
            LOGGER.info("Adding store: {} to cluster: {}", str2, str);
            provisionAclsForStore(str2, optional, Collections.emptyList());
            sendStoreCreationAdminMessage(str, str2, str3, str4, str5);
            boolean z2 = false;
            ZkStoreConfigAccessor storeConfigAccessor = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getStoreConfigAccessor();
            if (storeConfigAccessor.containsConfig(str2)) {
                StoreConfig storeConfig = storeConfigAccessor.getStoreConfig(str2);
                z2 = !storeConfig.isDeleting() && str.equals(storeConfig.getMigrationDestCluster());
                if (z2) {
                    LOGGER.info("Store: {} is migrating to cluster: {}, will skip system store auto-materialization", str2, str);
                }
            }
            if (!z2) {
                for (VeniceSystemStoreType veniceSystemStoreType : getSystemStoreLifeCycleHelper().maybeMaterializeSystemStoresForUserStore(str, str2)) {
                    LOGGER.info("Materializing system store: {} in cluster: {}", veniceSystemStoreType, str);
                    sendUserSystemStoreCreationValidationAdminMessage(str, str2, veniceSystemStoreType);
                }
            }
            if (VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE.getPrefix().equals(str2)) {
                setupResourceForBatchJobHeartbeatStore(str2);
            }
        } finally {
            releaseAdminMessageLock(str, str2);
        }
    }

    private void sendStoreCreationAdminMessage(String str, String str2, String str3, String str4, String str5) {
        StoreCreation storeCreation = (StoreCreation) AdminMessageType.STORE_CREATION.getNewInstance();
        storeCreation.clusterName = str;
        storeCreation.storeName = str2;
        storeCreation.owner = str3;
        storeCreation.keySchema = new SchemaMeta();
        storeCreation.keySchema.schemaType = SchemaType.AVRO_1_4.getValue();
        storeCreation.keySchema.definition = str4;
        storeCreation.valueSchema = new SchemaMeta();
        storeCreation.valueSchema.schemaType = SchemaType.AVRO_1_4.getValue();
        storeCreation.valueSchema.definition = str5;
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.STORE_CREATION.getValue();
        adminOperation.payloadUnion = storeCreation;
        sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
    }

    private void sendUserSystemStoreCreationValidationAdminMessage(String str, String str2, VeniceSystemStoreType veniceSystemStoreType) {
        AdminOperation adminOperation = new AdminOperation();
        switch (AnonymousClass2.$SwitchMap$com$linkedin$venice$common$VeniceSystemStoreType[veniceSystemStoreType.ordinal()]) {
            case AdminTopicUtils.PARTITION_NUM_FOR_ADMIN_TOPIC /* 1 */:
                MetaSystemStoreAutoCreationValidation metaSystemStoreAutoCreationValidation = (MetaSystemStoreAutoCreationValidation) AdminMessageType.META_SYSTEM_STORE_AUTO_CREATION_VALIDATION.getNewInstance();
                metaSystemStoreAutoCreationValidation.clusterName = str;
                metaSystemStoreAutoCreationValidation.storeName = str2;
                adminOperation.operationType = AdminMessageType.META_SYSTEM_STORE_AUTO_CREATION_VALIDATION.getValue();
                adminOperation.payloadUnion = metaSystemStoreAutoCreationValidation;
                break;
            case 2:
                PushStatusSystemStoreAutoCreationValidation pushStatusSystemStoreAutoCreationValidation = (PushStatusSystemStoreAutoCreationValidation) AdminMessageType.PUSH_STATUS_SYSTEM_STORE_AUTO_CREATION_VALIDATION.getNewInstance();
                pushStatusSystemStoreAutoCreationValidation.clusterName = str;
                pushStatusSystemStoreAutoCreationValidation.storeName = str2;
                adminOperation.operationType = AdminMessageType.PUSH_STATUS_SYSTEM_STORE_AUTO_CREATION_VALIDATION.getValue();
                adminOperation.payloadUnion = pushStatusSystemStoreAutoCreationValidation;
                break;
            default:
                LOGGER.warn("System store type: {} is not a user store level system store, will not send store creation validation message.", veniceSystemStoreType);
                return;
        }
        LOGGER.info("Sending system store creation validation message for user store: {}, system store type: {}", str2, veniceSystemStoreType);
        sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
    }

    private void setupResourceForBatchJobHeartbeatStore(String str) {
        if (!this.authorizerService.isPresent()) {
            LOGGER.warn("Skip setting up wildcard ACL regex for store: {} since the authorizer service is not provided", str);
        } else {
            this.authorizerService.get().setupResource(new Resource(str));
            LOGGER.info("Set up wildcard ACL regex for store: {}", str);
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void deleteStore(String str, String str2, int i, boolean z) {
        acquireAdminMessageLock(str, str2);
        try {
            LOGGER.info("Deleting store: {} from cluster: {}", str2, str);
            Store store = null;
            try {
                store = getVeniceHelixAdmin().checkPreConditionForDeletion(str, str2);
            } catch (VeniceNoStoreException e) {
                LOGGER.warn("Store object is missing for store: {} will proceed with the rest of store deletion", str2);
            }
            DeleteStore deleteStore = (DeleteStore) AdminMessageType.DELETE_STORE.getNewInstance();
            deleteStore.clusterName = str;
            deleteStore.storeName = str2;
            deleteStore.largestUsedVersionNumber = store == null ? -1 : store.getLargestUsedVersionNumber();
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.DELETE_STORE.getValue();
            adminOperation.payloadUnion = deleteStore;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            if (store == null) {
                LOGGER.warn("Store object for {} is missing! Skipping acl deletion!", str2);
            } else if (store.isMigrating()) {
                LOGGER.info("Store: {} is migrating! Skipping acl deletion!", str2);
            } else {
                cleanUpAclsForStore(str2, VeniceSystemStoreType.getEnabledSystemStoreTypes(store));
            }
        } finally {
            releaseAdminMessageLock(str, str2);
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void addVersionAndStartIngestion(String str, String str2, String str3, int i, int i2, Version.PushType pushType, String str4, long j, int i3, boolean z) {
        Version addVersionOnly = getVeniceHelixAdmin().addVersionOnly(str, str2, str3, i, i2, pushType, str4, j, getRmdVersionID(str2, str));
        if (addVersionOnly.isActiveActiveReplicationEnabled()) {
            updateReplicationMetadataSchemaForAllValueSchema(str, str2);
        }
        acquireAdminMessageLock(str, str2);
        try {
            sendAddVersionAdminMessage(str, str2, str3, addVersionOnly, i2, pushType);
            releaseAdminMessageLock(str, str2);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    private int getRmdVersionID(String str, String str2) {
        Store store = getVeniceHelixAdmin().getStore(str2, str);
        if (store == null) {
            LOGGER.warn("No store found in the store repository. Will get store-level RMD version ID from cluster config. Store name: {}, cluster: {}", str, str2);
        } else {
            if (store.getRmdVersion() != -1) {
                LOGGER.info("Found store-level RMD version ID {} for store {} in cluster {}", Integer.valueOf(store.getRmdVersion()), str, str2);
                return store.getRmdVersion();
            }
            LOGGER.info("No store-level RMD version ID found for store {} in cluster {}", str, str2);
        }
        VeniceControllerConfig controllerConfig = getMultiClusterConfigs().getControllerConfig(str2);
        if (controllerConfig == null) {
            throw new VeniceException("No controller cluster config found for cluster " + str2);
        }
        int replicationMetadataVersion = controllerConfig.getReplicationMetadataVersion();
        LOGGER.info("Use RMD version ID {} for cluster {}", Integer.valueOf(replicationMetadataVersion), str2);
        return replicationMetadataVersion;
    }

    void cleanupHistoricalVersions(String str, String str2) {
        HelixVeniceClusterResources helixVeniceClusterResources = getVeniceHelixAdmin().getHelixVeniceClusterResources(str);
        AutoCloseableLock createStoreWriteLock = helixVeniceClusterResources.getClusterLockManager().createStoreWriteLock(str2);
        try {
            ReadWriteStoreRepository storeMetadataRepository = helixVeniceClusterResources.getStoreMetadataRepository();
            Store store = storeMetadataRepository.getStore(str2);
            if (store == null) {
                LOGGER.info("The store to clean up: {} doesn't exist", str2);
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                    return;
                }
                return;
            }
            List versions = store.getVersions();
            if (versions.size() <= 5) {
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                }
            } else {
                Map<String, Integer> currentVersionsForMultiColos = getCurrentVersionsForMultiColos(str, str2);
                new ArrayList(versions).stream().sorted().filter(version -> {
                    return !currentVersionsForMultiColos.containsValue(Integer.valueOf(version.getNumber()));
                }).limit(r0 - 5).forEach(version2 -> {
                    store.deleteVersion(version2.getNumber());
                });
                storeMetadataRepository.updateStore(store);
                if (createStoreWriteLock != null) {
                    createStoreWriteLock.close();
                }
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    List<PubSubTopic> existingVersionTopicsForStore(String str) {
        ArrayList arrayList = new ArrayList();
        for (PubSubTopic pubSubTopic : getTopicManager().listTopics()) {
            if (!AdminTopicUtils.isAdminTopic(pubSubTopic.getName()) && !AdminTopicUtils.isKafkaInternalTopic(pubSubTopic.getName()) && !pubSubTopic.isRealTime() && !VeniceView.isViewTopic(pubSubTopic.getName())) {
                try {
                    if (Version.parseStoreFromKafkaTopicName(pubSubTopic.getName()).equals(str)) {
                        arrayList.add(pubSubTopic);
                    }
                } catch (Exception e) {
                    LOGGER.debug("Failed to parse StoreName from topic: {}, and error message: {}", pubSubTopic, e.getMessage());
                }
            }
        }
        return arrayList;
    }

    List<PubSubTopic> getKafkaTopicsByAge(String str) {
        List<PubSubTopic> existingVersionTopicsForStore = existingVersionTopicsForStore(str);
        if (!existingVersionTopicsForStore.isEmpty()) {
            existingVersionTopicsForStore.sort((pubSubTopic, pubSubTopic2) -> {
                return Version.parseVersionFromKafkaTopicName(pubSubTopic2.getName()) - Version.parseVersionFromKafkaTopicName(pubSubTopic.getName());
            });
        }
        return existingVersionTopicsForStore;
    }

    Optional<String> getTopicForCurrentPushJob(String str, String str2, boolean z, boolean z2) {
        List<PubSubTopic> kafkaTopicsByAge = getKafkaTopicsByAge(str2);
        Optional empty = Optional.empty();
        if (!kafkaTopicsByAge.isEmpty()) {
            empty = Optional.of(kafkaTopicsByAge.get(0));
        }
        if (empty.isPresent()) {
            LOGGER.debug("Latest kafka topic for store: {} is {}", str2, empty.get());
            String name = ((PubSubTopic) empty.get()).getName();
            if (!isTopicTruncated(name)) {
                if (getVeniceHelixAdmin().waitVersion(str, str2, Version.parseVersionFromKafkaTopicName(name), Duration.ofSeconds(30L)).getSecond() == null) {
                    Long inMemoryTopicCreationTime = getVeniceHelixAdmin().getInMemoryTopicCreationTime(name);
                    if (inMemoryTopicCreationTime != null && SystemTime.INSTANCE.getMilliseconds() < inMemoryTopicCreationTime.longValue() + TOPIC_DELETION_DELAY_MS) {
                        throw new VeniceException("Failed to get version information but the topic exists and has been created recently. Try again after some time.");
                    }
                    killOfflinePush(str, name, true);
                    LOGGER.info("Found topic: {} without the corresponding version, will kill it", name);
                    return Optional.empty();
                }
                long millis = TimeUnit.SECONDS.toMillis(10L);
                ExecutionStatus executionStatus = ExecutionStatus.PROGRESS;
                Map<String, String> hashMap = new HashMap();
                int i = 0;
                while (true) {
                    int i2 = i;
                    i++;
                    if (i2 >= 5) {
                        break;
                    }
                    Admin.OfflinePushStatusInfo offLinePushStatus = getOffLinePushStatus(str, name);
                    executionStatus = offLinePushStatus.getExecutionStatus();
                    hashMap = offLinePushStatus.getExtraInfo();
                    if (!hashMap.containsValue(ExecutionStatus.UNKNOWN.toString())) {
                        break;
                    }
                    try {
                        this.timer.sleep(millis);
                    } catch (InterruptedException e) {
                        throw new VeniceException("Received InterruptedException during sleep between 'getOffLinePushStatus' calls");
                    }
                }
                if (hashMap.containsValue(ExecutionStatus.UNKNOWN.toString())) {
                    LOGGER.error("Failed to get job status for topic: {} after retrying {} times, extra info: {}", name, 5, hashMap);
                }
                if (!executionStatus.isTerminal()) {
                    LOGGER.info("Job status: {} for Kafka topic: {} is not terminal, extra info: {}", executionStatus, name, hashMap);
                    return empty.isPresent() ? Optional.of(((PubSubTopic) empty.get()).getName()) : Optional.empty();
                }
                if (!z) {
                    truncateTopicsBasedOnMaxErroredTopicNumToKeep((List) kafkaTopicsByAge.stream().map(pubSubTopic -> {
                        return pubSubTopic.getName();
                    }).collect(Collectors.toList()), z2, getCurrentVersionsForMultiColos(str, str2));
                }
            }
        }
        return Optional.empty();
    }

    void truncateTopicsBasedOnMaxErroredTopicNumToKeep(List<String> list, boolean z, Map<String, Integer> map) {
        List list2 = (List) list.stream().filter(str -> {
            return !isTopicTruncated(str);
        }).sorted((str2, str3) -> {
            return Version.parseVersionFromKafkaTopicName(str2) - Version.parseVersionFromKafkaTopicName(str3);
        }).collect(Collectors.toList());
        Set set = (Set) list2.stream().filter(Version::isStreamReprocessingTopic).collect(Collectors.toSet());
        List<String> list3 = (List) list2.stream().filter(str4 -> {
            return !Version.isStreamReprocessingTopic(str4);
        }).collect(Collectors.toList());
        if (list3.size() <= this.maxErroredTopicNumToKeep) {
            LOGGER.info("Non-truncated version topics size: {} isn't bigger than maxErroredTopicNumToKeep: {}, so no topic will be truncated this time", Integer.valueOf(list2.size()), Integer.valueOf(this.maxErroredTopicNumToKeep));
            return;
        }
        int size = list3.size() - this.maxErroredTopicNumToKeep;
        int i = 0;
        for (String str5 : list3) {
            if (z && map.containsValue(Integer.valueOf(Version.parseVersionFromVersionTopicName(str5)))) {
                LOGGER.info("Do not delete the current version topic: {} since the incoming push is a Venice internal re-push.", str5);
            } else {
                i++;
                if (i > size) {
                    return;
                }
                truncateKafkaTopic(str5);
                LOGGER.info("Errored topic: {} got truncated", str5);
                String composeStreamReprocessingTopicFromVersionTopic = Version.composeStreamReprocessingTopicFromVersionTopic(str5);
                if (set.contains(composeStreamReprocessingTopicFromVersionTopic)) {
                    truncateKafkaTopic(composeStreamReprocessingTopicFromVersionTopic);
                    LOGGER.info("Corresponding stream reprocessing topic: {} also got truncated.", composeStreamReprocessingTopicFromVersionTopic);
                }
            }
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean hasWritePermissionToBatchJobHeartbeatStore(X509Certificate x509Certificate, String str) throws AclException {
        if (!this.accessController.isPresent()) {
            throw new VeniceException(String.format("Cannot check write permission on store %s since the access controller does not present for cert %s", str, x509Certificate));
        }
        String name = Method.Write.name();
        boolean hasAccessToTopic = this.accessController.get().hasAccessToTopic(x509Certificate, str, name);
        StringBuilder sb = new StringBuilder();
        sb.append("Requester");
        sb.append(hasAccessToTopic ? " has " : " does not have ");
        sb.append(name + " access on " + str);
        sb.append(" with identity: ");
        sb.append(this.identityParser.parseIdentityFromCert(x509Certificate));
        LOGGER.info(sb.toString());
        return hasAccessToTopic;
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isActiveActiveReplicationEnabledInAllRegion(String str, String str2, boolean z) {
        Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
        Store store = getVeniceHelixAdmin().getStore(str, str2);
        if (!store.isActiveActiveReplicationEnabled()) {
            LOGGER.info("isActiveActiveReplicationEnabledInAllRegion: {} store is not enabled for Active/Active in parent region", str2);
            return false;
        }
        for (Map.Entry<String, ControllerClient> entry : controllerClientMap.entrySet()) {
            String key = entry.getKey();
            StoreResponse retryableRequest = entry.getValue().retryableRequest(MAX_ASYNC_SETUP_RETRY_COUNT, controllerClient -> {
                return controllerClient.getStore(str2);
            });
            if (retryableRequest.isError()) {
                LOGGER.warn("isActiveActiveReplicationEnabledInAllRegion: Could not query store from region: {} for cluster: {}. {}. Default child AA config to true, since AA is already enabled in parent.", key, str, retryableRequest.getError());
            } else {
                if (!retryableRequest.getStore().isActiveActiveReplicationEnabled()) {
                    if (store.isActiveActiveReplicationEnabled()) {
                        throw new VeniceException(String.format("Store %s doesn't have Active/Active enabled in region %s, but A/A is enabled in parent which indicates A/A is fully ramped", str2, key));
                    }
                    LOGGER.info("isActiveActiveReplicationEnabledInAllRegion: store: {} is not enabled for Active/Active in region: {}", str2, key);
                    return false;
                }
                if (z) {
                    int currentVersion = retryableRequest.getStore().getCurrentVersion();
                    for (Version version : retryableRequest.getStore().getVersions()) {
                        if (currentVersion == version.getNumber() && !version.isActiveActiveReplicationEnabled()) {
                            LOGGER.info("isActiveActiveReplicationEnabledInAllRegion: store: {} current version: {} is not enabled for Active/Active in region: {}", str2, Integer.valueOf(version.getNumber()), key);
                            return false;
                        }
                    }
                } else {
                    continue;
                }
            }
        }
        return true;
    }

    @Override // com.linkedin.venice.controller.Admin
    public Version incrementVersionIdempotent(String str, String str2, String str3, int i, int i2, Version.PushType pushType, boolean z, boolean z2, String str4, Optional<String> optional, Optional<X509Certificate> optional2, long j, Optional<String> optional3, boolean z3) {
        Optional<String> topicForCurrentPushJob = getTopicForCurrentPushJob(str, str2, pushType.isIncremental(), Version.isPushIdRePush(str3));
        if (topicForCurrentPushJob.isPresent()) {
            int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(topicForCurrentPushJob.get());
            Store store = getStore(str, str2);
            Optional version = store.getVersion(parseVersionFromKafkaTopicName);
            if (!version.isPresent()) {
                throw new VeniceException("A corresponding version should exist with the ongoing push with topic " + topicForCurrentPushJob);
            }
            String pushJobId = ((Version) version.get()).getPushJobId();
            if (pushJobId.equals(str3)) {
                return (Version) version.get();
            }
            boolean isPushIdRePush = Version.isPushIdRePush(pushJobId);
            boolean isPushIdRePush2 = Version.isPushIdRePush(str3);
            if (getLingeringStoreVersionChecker().isStoreVersionLingering(store, (Version) version.get(), this.timer, this, optional2, this.identityParser)) {
                if (pushType.isIncremental()) {
                    throw new VeniceException("Version " + ((Version) version.get()).getNumber() + " is not healthy in Venice backend; please consider running a full batch push for your store: " + str2 + " before running incremental push, or reach out to Venice team.");
                }
                LOGGER.info("Found lingering topic: {} with push id: {}. Killing the lingering version that was created at: {}", topicForCurrentPushJob.get(), pushJobId, Long.valueOf(((Version) version.get()).getCreatedTime()));
                killOfflinePush(str, topicForCurrentPushJob.get(), true);
            } else if (isPushIdRePush && !pushType.isIncremental() && !isPushIdRePush2) {
                LOGGER.info("Found running repush job with push id: {} and incoming push is a batch job or stream reprocessing job with push id: {}. Killing the repush job for store: {}", pushJobId, str3, str2);
                killOfflinePush(str, topicForCurrentPushJob.get(), true);
            } else {
                if (!pushType.isIncremental()) {
                    ConcurrentBatchPushException concurrentBatchPushException = new ConcurrentBatchPushException("Unable to start the push with pushJobId " + str3 + " for store " + str2 + ". An ongoing push with pushJobId " + pushJobId + " and topic " + topicForCurrentPushJob.get() + " is found and it must be terminated before another push can be started.");
                    concurrentBatchPushException.setStackTrace(EMPTY_STACK_TRACE);
                    throw concurrentBatchPushException;
                }
                LOGGER.info("Found a running batch push job: {} and incoming push: {} is an incremental push. Letting the push continue for the store: {}", pushJobId, str3, str2);
            }
        }
        Version incrementalPushVersion = pushType.isIncremental() ? getVeniceHelixAdmin().getIncrementalPushVersion(str, str2) : addVersionAndTopicOnly(str, str2, str3, -1, i, i2, pushType, z, z2, str4, optional, j, optional3, z3);
        cleanupHistoricalVersions(str, str2);
        if (VeniceSystemStoreType.getSystemStoreType(str2) == null) {
            if (pushType.isBatch()) {
                getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getVeniceAdminStats().recordSuccessfullyStartedUserBatchPushParentAdminCount();
            } else if (pushType.isIncremental()) {
                getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getVeniceAdminStats().recordSuccessfullyStartedUserIncrementalPushParentAdminCount();
            }
        }
        return incrementalPushVersion;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Version addVersionAndTopicOnly(String str, String str2, String str3, int i, int i2, int i3, Version.PushType pushType, boolean z, boolean z2, String str4, Optional<String> optional, long j, Optional<String> optional2, boolean z3) {
        Pair<Boolean, Version> addVersionAndTopicOnly = getVeniceHelixAdmin().addVersionAndTopicOnly(str, str2, str3, i, i2, i3, z, z2, pushType, str4, null, optional, j, getRmdVersionID(str2, str), optional2, z3);
        Version version = (Version) addVersionAndTopicOnly.getSecond();
        if (((Boolean) addVersionAndTopicOnly.getFirst()).booleanValue()) {
            if (version.isActiveActiveReplicationEnabled()) {
                updateReplicationMetadataSchemaForAllValueSchema(str, str2);
            }
            acquireAdminMessageLock(str, str2);
            try {
                sendAddVersionAdminMessage(str, str2, str3, version, i2, pushType);
                releaseAdminMessageLock(str, str2);
                getSystemStoreLifeCycleHelper().maybeCreateSystemStoreWildcardAcl(str2);
            } catch (Throwable th) {
                releaseAdminMessageLock(str, str2);
                throw th;
            }
        }
        return version;
    }

    void sendAddVersionAdminMessage(String str, String str2, String str3, Version version, int i, Version.PushType pushType) {
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.ADD_VERSION.getValue();
        adminOperation.payloadUnion = getAddVersionMessage(str, str2, str3, version, i, pushType);
        sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
    }

    private AddVersion getAddVersionMessage(String str, String str2, String str3, Version version, int i, Version.PushType pushType) {
        AddVersion addVersion = (AddVersion) AdminMessageType.ADD_VERSION.getNewInstance();
        addVersion.clusterName = str;
        addVersion.storeName = str2;
        addVersion.pushJobId = str3;
        addVersion.versionNum = version.getNumber();
        addVersion.numberOfPartitions = i;
        addVersion.pushType = pushType.getValue();
        if (version.isNativeReplicationEnabled()) {
            addVersion.pushStreamSourceAddress = version.getPushStreamSourceAddress();
        }
        if (version.getHybridStoreConfig() != null) {
            addVersion.rewindTimeInSecondsOverride = version.getHybridStoreConfig().getRewindTimeInSeconds();
        } else {
            addVersion.rewindTimeInSecondsOverride = -1L;
        }
        addVersion.timestampMetadataVersionId = version.getRmdVersionId();
        addVersion.versionSwapDeferred = version.isVersionSwapDeferred();
        return addVersion;
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getRealTimeTopic(String str, String str2) {
        return getVeniceHelixAdmin().getRealTimeTopic(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Version getIncrementalPushVersion(String str, String str2) {
        Version incrementalPushVersion = getVeniceHelixAdmin().getIncrementalPushVersion(str, str2);
        return getIncrementalPushVersion(incrementalPushVersion, getOffLinePushStatus(str, incrementalPushVersion.kafkaTopicName()).getExecutionStatus());
    }

    Version getIncrementalPushVersion(Version version, ExecutionStatus executionStatus) {
        String storeName = version.getStoreName();
        if (!executionStatus.isTerminal()) {
            throw new VeniceException("Cannot start incremental push since batch push is on going. store: " + storeName);
        }
        String composeRealTimeTopic = Version.composeRealTimeTopic(storeName);
        if (executionStatus == ExecutionStatus.ERROR || getVeniceHelixAdmin().isTopicTruncated(composeRealTimeTopic)) {
            throw new VeniceException("Cannot start incremental push since previous batch push has failed. Please run another bash job. store: " + storeName);
        }
        return version;
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getCurrentVersion(String str, String str2) {
        throw new VeniceUnsupportedOperationException("getCurrentVersion", "Please use getCurrentVersionsForMultiColos in Parent controller.");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, Integer> getCurrentVersionsForMultiColos(String str, String str2) {
        return getCurrentVersionForMultiRegions(str, str2, getVeniceHelixAdmin().getControllerClientMap(str));
    }

    @Override // com.linkedin.venice.controller.Admin
    public RepushInfo getRepushInfo(String str, String str2, Optional<String> optional) {
        Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
        if (optional.isPresent()) {
            StoreResponse store = controllerClientMap.get(optional.get()).getStore(str2);
            if (store.isError()) {
                throw new VeniceException("Could not query store from colo: " + optional.get() + " for cluster: " + str + ". " + store.getError());
            }
            return RepushInfo.createRepushInfo((Version) store.getStore().getVersion(store.getStore().getCurrentVersion()).get(), store.getStore().getKafkaBrokerUrl());
        }
        int i = Integer.MIN_VALUE;
        String str3 = null;
        for (Map.Entry<String, Integer> entry : getCurrentVersionForMultiRegions(str, str2, controllerClientMap).entrySet()) {
            if (entry.getValue().intValue() > i) {
                i = entry.getValue().intValue();
                str3 = entry.getKey();
            }
        }
        StoreResponse store2 = controllerClientMap.get(str3).getStore(str2);
        if (store2.isError()) {
            throw new VeniceException("Could not query store from largest version colo: " + optional.get() + " for cluster: " + str + ". " + store2.getError());
        }
        return RepushInfo.createRepushInfo((Version) store2.getStore().getVersion(store2.getStore().getCurrentVersion()).get(), store2.getStore().getKafkaBrokerUrl());
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> getFutureVersionsForMultiColos(String str, String str2) {
        Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ControllerClient> entry : controllerClientMap.entrySet()) {
            String key = entry.getKey();
            MultiStoreStatusResponse futureVersions = entry.getValue().getFutureVersions(str, str2);
            if (futureVersions.isError()) {
                LOGGER.error("Could not query store from region: {} for cluster: {}. Error: {}", key, str, futureVersions.getError());
                hashMap.put(key, String.valueOf(-1));
            } else {
                hashMap.put(key, (String) futureVersions.getStoreStatusMap().get(str2));
            }
        }
        return hashMap;
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getFutureVersion(String str, String str2) {
        return 0;
    }

    Map<String, Integer> getCurrentVersionForMultiRegions(String str, String str2, Map<String, ControllerClient> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ControllerClient> entry : map.entrySet()) {
            String key = entry.getKey();
            StoreResponse store = entry.getValue().getStore(str2);
            if (store.isError()) {
                LOGGER.error("Could not query store from region: {} for cluster: {}. Error: {}", key, str, store.getError());
                hashMap.put(key, -1);
            } else {
                hashMap.put(key, Integer.valueOf(store.getStore().getCurrentVersion()));
            }
        }
        return hashMap;
    }

    @Override // com.linkedin.venice.controller.Admin
    public Version peekNextVersion(String str, String str2) {
        throw new VeniceUnsupportedOperationException("peekNextVersion");
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<Version> deleteAllVersionsInStore(String str, String str2) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForDeletion(str, str2);
            DeleteAllVersions deleteAllVersions = (DeleteAllVersions) AdminMessageType.DELETE_ALL_VERSIONS.getNewInstance();
            deleteAllVersions.clusterName = str;
            deleteAllVersions.storeName = str2;
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.DELETE_ALL_VERSIONS.getValue();
            adminOperation.payloadUnion = deleteAllVersions;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            List<Version> emptyList = Collections.emptyList();
            releaseAdminMessageLock(str, str2);
            return emptyList;
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void deleteOldVersionInStore(String str, String str2, int i) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForSingleVersionDeletion(str, str2, i);
            DeleteOldVersion deleteOldVersion = (DeleteOldVersion) AdminMessageType.DELETE_OLD_VERSION.getNewInstance();
            deleteOldVersion.clusterName = str;
            deleteOldVersion.storeName = str2;
            deleteOldVersion.versionNum = i;
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.DELETE_OLD_VERSION.getValue();
            adminOperation.payloadUnion = deleteOldVersion;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            releaseAdminMessageLock(str, str2);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<Version> versionsForStore(String str, String str2) {
        return getVeniceHelixAdmin().versionsForStore(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<Store> getAllStores(String str) {
        return getVeniceHelixAdmin().getAllStores(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> getAllStoreStatuses(String str) {
        throw new VeniceUnsupportedOperationException("getAllStoreStatuses");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Store getStore(String str, String str2) {
        return getVeniceHelixAdmin().getStore(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean hasStore(String str, String str2) {
        return getVeniceHelixAdmin().hasStore(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStoreCurrentVersion(String str, String str2, int i) {
        throw new VeniceUnsupportedOperationException("setStoreCurrentVersion", "Please use set-version only on child controllers, setting version on parent is not supported, since the version list could be different fabric by fabric");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void rollbackToBackupVersion(String str, String str2) {
        acquireAdminMessageLock(str, str2);
        try {
            try {
                getVeniceHelixAdmin().checkPreConditionForUpdateStoreMetadata(str, str2);
                Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
                ArrayList arrayList = new ArrayList();
                controllerClientMap.forEach((str3, controllerClient) -> {
                    arrayList.add(() -> {
                        StoreResponse store = controllerClient.getStore(str2, this.waitingTimeForConsumptionMs);
                        if (store.isError()) {
                            throw new VeniceException(store.getError() + " in region " + str3);
                        }
                        StoreInfo store2 = store.getStore();
                        if (!store2.isEnableStoreWrites()) {
                            throw new VeniceException("Unable to rollback since store does not enable write in region " + str3);
                        }
                        int backupVersionNumber = getVeniceHelixAdmin().getBackupVersionNumber(store2.getVersions(), store2.getCurrentVersion());
                        if (backupVersionNumber == 0) {
                            throw new VeniceException("Unable to rollback since backup version does not exist in region " + str3);
                        }
                        return Integer.valueOf(backupVersionNumber);
                    });
                });
                int i = 0;
                Iterator it = Executors.newFixedThreadPool(arrayList.size()).invokeAll(arrayList).iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) ((Future) it.next()).get()).intValue();
                    if (i != 0 && i != intValue) {
                        throw new VeniceException("Unable to rollback since backup version number is inconsistent across regions");
                    }
                    i = intValue;
                }
                SetStoreCurrentVersion setStoreCurrentVersion = (SetStoreCurrentVersion) AdminMessageType.SET_STORE_CURRENT_VERSION.getNewInstance();
                setStoreCurrentVersion.clusterName = str;
                setStoreCurrentVersion.storeName = str2;
                setStoreCurrentVersion.currentVersion = i;
                AdminOperation adminOperation = new AdminOperation();
                adminOperation.operationType = AdminMessageType.SET_STORE_CURRENT_VERSION.getValue();
                adminOperation.payloadUnion = setStoreCurrentVersion;
                sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
                releaseAdminMessageLock(str, str2);
            } catch (InterruptedException e) {
                throw new VeniceException("Unable to rollback since thread is interrupted");
            } catch (ExecutionException e2) {
                throw new VeniceException(e2.getMessage());
            }
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStoreLargestUsedVersion(String str, String str2, int i) {
        throw new VeniceUnsupportedOperationException("setStoreLargestUsedVersion", "This is only supported in the Child Controller.");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStoreOwner(String str, String str2, String str3) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForUpdateStoreMetadata(str, str2);
            SetStoreOwner setStoreOwner = (SetStoreOwner) AdminMessageType.SET_STORE_OWNER.getNewInstance();
            setStoreOwner.clusterName = str;
            setStoreOwner.storeName = str2;
            setStoreOwner.owner = str3;
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.SET_STORE_OWNER.getValue();
            adminOperation.payloadUnion = setStoreOwner;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            releaseAdminMessageLock(str, str2);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStorePartitionCount(String str, String str2, int i) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForUpdateStoreMetadata(str, str2);
            int maxNumberOfPartition = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getConfig().getMaxNumberOfPartition();
            if (i > maxNumberOfPartition) {
                throw new ConfigurationException("Partition count: " + i + " should be less than max: " + maxNumberOfPartition);
            }
            if (i < 0) {
                throw new ConfigurationException("Partition count: " + i + " should NOT be negative");
            }
            SetStorePartitionCount setStorePartitionCount = (SetStorePartitionCount) AdminMessageType.SET_STORE_PARTITION.getNewInstance();
            setStorePartitionCount.clusterName = str;
            setStorePartitionCount.storeName = str2;
            setStorePartitionCount.partitionNum = i;
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.SET_STORE_PARTITION.getValue();
            adminOperation.payloadUnion = setStorePartitionCount;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            releaseAdminMessageLock(str, str2);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStoreReadability(String str, String str2, boolean z) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForUpdateStoreMetadata(str, str2);
            AdminOperation adminOperation = new AdminOperation();
            if (z) {
                adminOperation.operationType = AdminMessageType.ENABLE_STORE_READ.getValue();
                EnableStoreRead enableStoreRead = (EnableStoreRead) AdminMessageType.ENABLE_STORE_READ.getNewInstance();
                enableStoreRead.clusterName = str;
                enableStoreRead.storeName = str2;
                adminOperation.payloadUnion = enableStoreRead;
            } else {
                adminOperation.operationType = AdminMessageType.DISABLE_STORE_READ.getValue();
                DisableStoreRead disableStoreRead = (DisableStoreRead) AdminMessageType.DISABLE_STORE_READ.getNewInstance();
                disableStoreRead.clusterName = str;
                disableStoreRead.storeName = str2;
                adminOperation.payloadUnion = disableStoreRead;
            }
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            releaseAdminMessageLock(str, str2);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStoreWriteability(String str, String str2, boolean z) {
        acquireAdminMessageLock(str, str2);
        try {
            getVeniceHelixAdmin().checkPreConditionForUpdateStoreMetadata(str, str2);
            AdminOperation adminOperation = new AdminOperation();
            if (z) {
                adminOperation.operationType = AdminMessageType.ENABLE_STORE_WRITE.getValue();
                ResumeStore resumeStore = (ResumeStore) AdminMessageType.ENABLE_STORE_WRITE.getNewInstance();
                resumeStore.clusterName = str;
                resumeStore.storeName = str2;
                adminOperation.payloadUnion = resumeStore;
            } else {
                adminOperation.operationType = AdminMessageType.DISABLE_STORE_WRITE.getValue();
                PauseStore pauseStore = (PauseStore) AdminMessageType.DISABLE_STORE_WRITE.getNewInstance();
                pauseStore.clusterName = str;
                pauseStore.storeName = str2;
                adminOperation.payloadUnion = pauseStore;
            }
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            releaseAdminMessageLock(str, str2);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStoreReadWriteability(String str, String str2, boolean z) {
        setStoreReadability(str, str2, z);
        setStoreWriteability(str, str2, z);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateStore(String str, String str2, UpdateStoreQueryParams updateStoreQueryParams) {
        acquireAdminMessageLock(str, str2);
        try {
            Optional owner = updateStoreQueryParams.getOwner();
            Optional enableReads = updateStoreQueryParams.getEnableReads();
            Optional enableWrites = updateStoreQueryParams.getEnableWrites();
            Optional partitionCount = updateStoreQueryParams.getPartitionCount();
            Optional partitionerClass = updateStoreQueryParams.getPartitionerClass();
            Optional partitionerParams = updateStoreQueryParams.getPartitionerParams();
            Optional amplificationFactor = updateStoreQueryParams.getAmplificationFactor();
            Optional storageQuotaInByte = updateStoreQueryParams.getStorageQuotaInByte();
            Optional readQuotaInCU = updateStoreQueryParams.getReadQuotaInCU();
            Optional currentVersion = updateStoreQueryParams.getCurrentVersion();
            Optional largestUsedVersionNumber = updateStoreQueryParams.getLargestUsedVersionNumber();
            Optional hybridRewindSeconds = updateStoreQueryParams.getHybridRewindSeconds();
            Optional hybridOffsetLagThreshold = updateStoreQueryParams.getHybridOffsetLagThreshold();
            Optional hybridTimeLagThreshold = updateStoreQueryParams.getHybridTimeLagThreshold();
            Optional hybridDataReplicationPolicy = updateStoreQueryParams.getHybridDataReplicationPolicy();
            Optional hybridBufferReplayPolicy = updateStoreQueryParams.getHybridBufferReplayPolicy();
            Optional accessControlled = updateStoreQueryParams.getAccessControlled();
            Optional compressionStrategy = updateStoreQueryParams.getCompressionStrategy();
            Optional clientDecompressionEnabled = updateStoreQueryParams.getClientDecompressionEnabled();
            Optional chunkingEnabled = updateStoreQueryParams.getChunkingEnabled();
            Optional rmdChunkingEnabled = updateStoreQueryParams.getRmdChunkingEnabled();
            Optional batchGetLimit = updateStoreQueryParams.getBatchGetLimit();
            Optional numVersionsToPreserve = updateStoreQueryParams.getNumVersionsToPreserve();
            Optional incrementalPushEnabled = updateStoreQueryParams.getIncrementalPushEnabled();
            Optional storeMigration = updateStoreQueryParams.getStoreMigration();
            Optional writeComputationEnabled = updateStoreQueryParams.getWriteComputationEnabled();
            Optional replicationMetadataVersionID = updateStoreQueryParams.getReplicationMetadataVersionID();
            Optional readComputationEnabled = updateStoreQueryParams.getReadComputationEnabled();
            Optional bootstrapToOnlineTimeoutInHours = updateStoreQueryParams.getBootstrapToOnlineTimeoutInHours();
            Optional backupStrategy = updateStoreQueryParams.getBackupStrategy();
            Optional autoSchemaRegisterPushJobEnabled = updateStoreQueryParams.getAutoSchemaRegisterPushJobEnabled();
            Optional hybridStoreDiskQuotaEnabled = updateStoreQueryParams.getHybridStoreDiskQuotaEnabled();
            Optional<Boolean> regularVersionETLEnabled = updateStoreQueryParams.getRegularVersionETLEnabled();
            Optional<Boolean> futureVersionETLEnabled = updateStoreQueryParams.getFutureVersionETLEnabled();
            Optional<String> eTLedProxyUserAccount = updateStoreQueryParams.getETLedProxyUserAccount();
            Optional<Boolean> nativeReplicationEnabled = updateStoreQueryParams.getNativeReplicationEnabled();
            Optional pushStreamSourceAddress = updateStoreQueryParams.getPushStreamSourceAddress();
            Optional backupVersionRetentionMs = updateStoreQueryParams.getBackupVersionRetentionMs();
            Optional replicationFactor = updateStoreQueryParams.getReplicationFactor();
            Optional migrationDuplicateStore = updateStoreQueryParams.getMigrationDuplicateStore();
            Optional nativeReplicationSourceFabric = updateStoreQueryParams.getNativeReplicationSourceFabric();
            Optional<Boolean> activeActiveReplicationEnabled = updateStoreQueryParams.getActiveActiveReplicationEnabled();
            Optional regionsFilter = updateStoreQueryParams.getRegionsFilter();
            Optional storagePersona = updateStoreQueryParams.getStoragePersona();
            Optional storeViews = updateStoreQueryParams.getStoreViews();
            Optional latestSupersetSchemaId = updateStoreQueryParams.getLatestSupersetSchemaId();
            Optional replicateAllConfigs = updateStoreQueryParams.getReplicateAllConfigs();
            boolean z = replicateAllConfigs.isPresent() && ((Boolean) replicateAllConfigs.get()).booleanValue();
            LinkedList linkedList = new LinkedList();
            String str3 = "Store update error for " + str2 + " in cluster: " + str + ": ";
            Store store = getVeniceHelixAdmin().getStore(str, str2);
            if (store == null) {
                LOGGER.error(str3 + "store does not exist, and thus cannot be updated.");
                throw new VeniceException(str3 + "store does not exist, and thus cannot be updated.");
            }
            UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance();
            updateStore.clusterName = str;
            updateStore.storeName = str2;
            Optional map = owner.map(addToUpdatedConfigList(linkedList, "owner"));
            Objects.requireNonNull(store);
            updateStore.owner = (CharSequence) map.orElseGet(store::getOwner);
            if (store.isHybrid() && (partitionerClass.isPresent() || partitionerParams.isPresent())) {
                String str4 = str3 + "Cannot change partitioner class and parameters for hybrid stores";
                LOGGER.error(str4);
                throw new VeniceHttpException(400, str4, ErrorType.BAD_REQUEST);
            }
            if (partitionCount.isPresent()) {
                getVeniceHelixAdmin().preCheckStorePartitionCountUpdate(str, store, ((Integer) partitionCount.get()).intValue());
                updateStore.partitionNum = ((Integer) partitionCount.get()).intValue();
                linkedList.add("partition_count");
            } else {
                updateStore.partitionNum = store.getPartitionCount();
            }
            validateActiveActiveReplicationEnableConfigs(activeActiveReplicationEnabled, nativeReplicationEnabled, store);
            Optional<U> map2 = nativeReplicationEnabled.map(addToUpdatedConfigList(linkedList, "native_replication_enabled"));
            Objects.requireNonNull(store);
            updateStore.nativeReplicationEnabled = ((Boolean) map2.orElseGet(store::isNativeReplicationEnabled)).booleanValue();
            Optional map3 = pushStreamSourceAddress.map(addToUpdatedConfigList(linkedList, "push_stream_source_address"));
            Objects.requireNonNull(store);
            updateStore.pushStreamSourceAddress = (CharSequence) map3.orElseGet(store::getPushStreamSourceAddress);
            Optional<U> map4 = activeActiveReplicationEnabled.map(addToUpdatedConfigList(linkedList, "active_active_replication_enabled"));
            Objects.requireNonNull(store);
            updateStore.activeActiveReplicationEnabled = ((Boolean) map4.orElseGet(store::isActiveActiveReplicationEnabled)).booleanValue();
            if (storeViews.isPresent()) {
                validateStoreViewConfig((Map) storeViews.get(), store);
                updateStore.views = VeniceHelixAdmin.mergeNewViewConfigsIntoOldConfigs(store, (Map) storeViews.get());
                linkedList.add("store_view");
            }
            PartitionerConfig mergeNewSettingsIntoOldPartitionerConfig = VeniceHelixAdmin.mergeNewSettingsIntoOldPartitionerConfig(store, partitionerClass, partitionerParams, amplificationFactor);
            if (partitionerClass.isPresent() || partitionerParams.isPresent() || amplificationFactor.isPresent()) {
                partitionerClass.ifPresent(str5 -> {
                    linkedList.add("partitioner_class");
                });
                partitionerParams.ifPresent(map5 -> {
                    linkedList.add("partitioner_params");
                });
                amplificationFactor.ifPresent(num -> {
                    linkedList.add("amplification_factor");
                });
                PartitionerConfigRecord partitionerConfigRecord = new PartitionerConfigRecord();
                partitionerConfigRecord.partitionerClass = mergeNewSettingsIntoOldPartitionerConfig.getPartitionerClass();
                partitionerConfigRecord.partitionerParams = CollectionUtils.getCharSequenceMapFromStringMap(mergeNewSettingsIntoOldPartitionerConfig.getPartitionerParams());
                partitionerConfigRecord.amplificationFactor = mergeNewSettingsIntoOldPartitionerConfig.getAmplificationFactor();
                try {
                    PartitionUtils.getVenicePartitioner(partitionerConfigRecord.partitionerClass.toString(), partitionerConfigRecord.amplificationFactor, new VeniceProperties(partitionerConfigRecord.partitionerParams), getKeySchema(str, str2).getSchema());
                    updateStore.partitionerConfig = partitionerConfigRecord;
                } catch (PartitionerSchemaMismatchException e) {
                    String str6 = str3 + e.getMessage();
                    LOGGER.error(str6);
                    throw new VeniceHttpException(400, str6, ErrorType.INVALID_SCHEMA);
                } catch (Exception e2) {
                    String str7 = str3 + "Partitioner Configs invalid, please verify that partitioner configs like classpath and parameters are correct!";
                    LOGGER.error(str7);
                    throw new VeniceHttpException(400, str7, ErrorType.INVALID_CONFIG);
                }
            }
            Optional map6 = enableReads.map(addToUpdatedConfigList(linkedList, "enable_reads"));
            Objects.requireNonNull(store);
            updateStore.enableReads = ((Boolean) map6.orElseGet(store::isEnableReads)).booleanValue();
            Optional map7 = enableWrites.map(addToUpdatedConfigList(linkedList, "enable_writes"));
            Objects.requireNonNull(store);
            updateStore.enableWrites = ((Boolean) map7.orElseGet(store::isEnableWrites)).booleanValue();
            Optional map8 = readQuotaInCU.map(addToUpdatedConfigList(linkedList, "read_quota_in_cu"));
            Objects.requireNonNull(store);
            updateStore.readQuotaInCU = ((Long) map8.orElseGet(store::getReadQuotaInCU)).longValue();
            updateStore.currentVersion = ((Integer) currentVersion.map(addToUpdatedConfigList(linkedList, "version")).orElse(-1)).intValue();
            Optional map9 = incrementalPushEnabled.map(addToUpdatedConfigList(linkedList, "incremental_push_enabled"));
            Objects.requireNonNull(store);
            updateStore.incrementalPushEnabled = ((Boolean) map9.orElseGet(store::isIncrementalPushEnabled)).booleanValue();
            hybridRewindSeconds.map(addToUpdatedConfigList(linkedList, "rewind_time_seconds"));
            hybridOffsetLagThreshold.map(addToUpdatedConfigList(linkedList, "offset_lag_to_go_online"));
            hybridTimeLagThreshold.map(addToUpdatedConfigList(linkedList, "time_lag_to_go_online"));
            hybridDataReplicationPolicy.map(addToUpdatedConfigList(linkedList, "data_replication_policy"));
            hybridBufferReplayPolicy.map(addToUpdatedConfigList(linkedList, "buffer_replay_policy"));
            HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig = VeniceHelixAdmin.mergeNewSettingsIntoOldHybridStoreConfig(store, hybridRewindSeconds, hybridOffsetLagThreshold, hybridTimeLagThreshold, hybridDataReplicationPolicy, hybridBufferReplayPolicy);
            if (this.veniceHelixAdmin.isHybrid(store.getHybridStoreConfig()) && !this.veniceHelixAdmin.isHybrid(mergeNewSettingsIntoOldHybridStoreConfig) && updateStore.incrementalPushEnabled) {
                throw new VeniceHttpException(400, "Cannot convert store to batch-only, incremental push enabled stores require valid hybrid configs. Please disable incremental push if you'd like to convert the store to batch-only", ErrorType.BAD_REQUEST);
            }
            if (mergeNewSettingsIntoOldHybridStoreConfig == null) {
                updateStore.hybridStoreConfig = null;
            } else {
                HybridStoreConfigRecord hybridStoreConfigRecord = new HybridStoreConfigRecord();
                hybridStoreConfigRecord.offsetLagThresholdToGoOnline = mergeNewSettingsIntoOldHybridStoreConfig.getOffsetLagThresholdToGoOnline();
                hybridStoreConfigRecord.rewindTimeInSeconds = mergeNewSettingsIntoOldHybridStoreConfig.getRewindTimeInSeconds();
                hybridStoreConfigRecord.producerTimestampLagThresholdToGoOnlineInSeconds = mergeNewSettingsIntoOldHybridStoreConfig.getProducerTimestampLagThresholdToGoOnlineInSeconds();
                hybridStoreConfigRecord.dataReplicationPolicy = mergeNewSettingsIntoOldHybridStoreConfig.getDataReplicationPolicy().getValue();
                hybridStoreConfigRecord.bufferReplayPolicy = mergeNewSettingsIntoOldHybridStoreConfig.getBufferReplayPolicy().getValue();
                updateStore.hybridStoreConfig = hybridStoreConfigRecord;
            }
            if (((Boolean) incrementalPushEnabled.orElse(Boolean.valueOf(store.isIncrementalPushEnabled()))).booleanValue() && !this.veniceHelixAdmin.isHybrid(store.getHybridStoreConfig()) && !this.veniceHelixAdmin.isHybrid(mergeNewSettingsIntoOldHybridStoreConfig)) {
                LOGGER.info("Enabling incremental push for a batch store:{}. Converting it to a hybrid store with default configs.", str2);
                HybridStoreConfigRecord hybridStoreConfigRecord2 = new HybridStoreConfigRecord();
                hybridStoreConfigRecord2.rewindTimeInSeconds = 86400L;
                linkedList.add("rewind_time_seconds");
                hybridStoreConfigRecord2.offsetLagThresholdToGoOnline = SLEEP_INTERVAL_FOR_DATA_CONSUMPTION_IN_MS;
                linkedList.add("offset_lag_to_go_online");
                hybridStoreConfigRecord2.producerTimestampLagThresholdToGoOnlineInSeconds = -1L;
                linkedList.add("time_lag_to_go_online");
                hybridStoreConfigRecord2.dataReplicationPolicy = DataReplicationPolicy.NONE.getValue();
                linkedList.add("data_replication_policy");
                hybridStoreConfigRecord2.bufferReplayPolicy = BufferReplayPolicy.REWIND_FROM_EOP.getValue();
                linkedList.add("buffer_replay_policy");
                updateStore.hybridStoreConfig = hybridStoreConfigRecord2;
            }
            Optional map10 = storageQuotaInByte.map(addToUpdatedConfigList(linkedList, "storage_quota_in_byte"));
            Objects.requireNonNull(store);
            updateStore.storageQuotaInByte = ((Long) map10.orElseGet(store::getStorageQuotaInByte)).longValue();
            Optional map11 = accessControlled.map(addToUpdatedConfigList(linkedList, "access_controlled"));
            Objects.requireNonNull(store);
            updateStore.accessControlled = ((Boolean) map11.orElseGet(store::isAccessControlled)).booleanValue();
            updateStore.compressionStrategy = ((Integer) compressionStrategy.map(addToUpdatedConfigList(linkedList, "compression_strategy")).map((v0) -> {
                return v0.getValue();
            }).orElse(Integer.valueOf(store.getCompressionStrategy().getValue()))).intValue();
            Optional map12 = clientDecompressionEnabled.map(addToUpdatedConfigList(linkedList, "client_decompression_enabled"));
            Objects.requireNonNull(store);
            updateStore.clientDecompressionEnabled = ((Boolean) map12.orElseGet(store::getClientDecompressionEnabled)).booleanValue();
            Optional map13 = chunkingEnabled.map(addToUpdatedConfigList(linkedList, "chunking_enabled"));
            Objects.requireNonNull(store);
            updateStore.chunkingEnabled = ((Boolean) map13.orElseGet(store::isChunkingEnabled)).booleanValue();
            Optional map14 = rmdChunkingEnabled.map(addToUpdatedConfigList(linkedList, "rmd_chunking_enabled"));
            Objects.requireNonNull(store);
            updateStore.rmdChunkingEnabled = ((Boolean) map14.orElseGet(store::isRmdChunkingEnabled)).booleanValue();
            Optional map15 = batchGetLimit.map(addToUpdatedConfigList(linkedList, "batch_get_limit"));
            Objects.requireNonNull(store);
            updateStore.batchGetLimit = ((Integer) map15.orElseGet(store::getBatchGetLimit)).intValue();
            Optional map16 = numVersionsToPreserve.map(addToUpdatedConfigList(linkedList, "num_versions_to_preserve"));
            Objects.requireNonNull(store);
            updateStore.numVersionsToPreserve = ((Integer) map16.orElseGet(store::getNumVersionsToPreserve)).intValue();
            Optional map17 = storeMigration.map(addToUpdatedConfigList(linkedList, "store_migration"));
            Objects.requireNonNull(store);
            updateStore.isMigrating = ((Boolean) map17.orElseGet(store::isMigrating)).booleanValue();
            Optional map18 = writeComputationEnabled.map(addToUpdatedConfigList(linkedList, "write_computation_enabled"));
            Objects.requireNonNull(store);
            updateStore.writeComputationEnabled = ((Boolean) map18.orElseGet(store::isWriteComputationEnabled)).booleanValue();
            updateStore.replicationMetadataVersionID = ((Integer) replicationMetadataVersionID.map(addToUpdatedConfigList(linkedList, "replication_metadata_protocol_version_id")).orElse(Integer.valueOf(store.getRmdVersion()))).intValue();
            Optional map19 = readComputationEnabled.map(addToUpdatedConfigList(linkedList, "read_computation_enabled"));
            Objects.requireNonNull(store);
            updateStore.readComputationEnabled = ((Boolean) map19.orElseGet(store::isReadComputationEnabled)).booleanValue();
            Optional map20 = bootstrapToOnlineTimeoutInHours.map(addToUpdatedConfigList(linkedList, "bootstrap_to_online_timeout_in_hours"));
            Objects.requireNonNull(store);
            updateStore.bootstrapToOnlineTimeoutInHours = ((Integer) map20.orElseGet(store::getBootstrapToOnlineTimeoutInHours)).intValue();
            updateStore.leaderFollowerModelEnabled = true;
            updateStore.backupStrategy = ((BackupStrategy) backupStrategy.map(addToUpdatedConfigList(linkedList, "backup_strategy")).orElse(store.getBackupStrategy())).ordinal();
            updateStore.schemaAutoRegisterFromPushJobEnabled = ((Boolean) autoSchemaRegisterPushJobEnabled.map(addToUpdatedConfigList(linkedList, "auto_auto_register_for_pushjob_enabled")).orElse(Boolean.valueOf(store.isSchemaAutoRegisterFromPushJobEnabled()))).booleanValue();
            updateStore.hybridStoreDiskQuotaEnabled = ((Boolean) hybridStoreDiskQuotaEnabled.map(addToUpdatedConfigList(linkedList, "hybrid-store-disk-quota-enabled")).orElse(Boolean.valueOf(store.isHybridStoreDiskQuotaEnabled()))).booleanValue();
            regularVersionETLEnabled.map(addToUpdatedConfigList(linkedList, "regular_version_etl_enabled"));
            futureVersionETLEnabled.map(addToUpdatedConfigList(linkedList, "future_version_etl_enabled"));
            eTLedProxyUserAccount.map(addToUpdatedConfigList(linkedList, "etled_proxy_user_account"));
            updateStore.ETLStoreConfig = mergeNewSettingIntoOldETLStoreConfig(store, regularVersionETLEnabled, futureVersionETLEnabled, eTLedProxyUserAccount);
            Optional map21 = largestUsedVersionNumber.map(addToUpdatedConfigList(linkedList, "largest_used_version_number"));
            Objects.requireNonNull(store);
            updateStore.largestUsedVersionNumber = (Integer) map21.orElseGet(store::getLargestUsedVersionNumber);
            Optional map22 = backupVersionRetentionMs.map(addToUpdatedConfigList(linkedList, "backup_version_retention_ms"));
            Objects.requireNonNull(store);
            updateStore.backupVersionRetentionMs = ((Long) map22.orElseGet(store::getBackupVersionRetentionMs)).longValue();
            Optional map23 = replicationFactor.map(addToUpdatedConfigList(linkedList, "replication_factor"));
            Objects.requireNonNull(store);
            updateStore.replicationFactor = ((Integer) map23.orElseGet(store::getReplicationFactor)).intValue();
            Optional map24 = migrationDuplicateStore.map(addToUpdatedConfigList(linkedList, "migration_duplicate_store"));
            Objects.requireNonNull(store);
            updateStore.migrationDuplicateStore = ((Boolean) map24.orElseGet(store::isMigrationDuplicateStore)).booleanValue();
            Optional map25 = nativeReplicationSourceFabric.map(addToUpdatedConfigList(linkedList, "native_replication_source_fabric"));
            Objects.requireNonNull(store);
            updateStore.nativeReplicationSourceFabric = (CharSequence) map25.orElseGet(store::getNativeReplicationSourceFabric);
            updateStore.disableMetaStore = ((Boolean) updateStoreQueryParams.disableMetaStore().map(addToUpdatedConfigList(linkedList, "disable_meta_store")).orElse(false)).booleanValue();
            updateStore.disableDavinciPushStatusStore = ((Boolean) updateStoreQueryParams.disableDavinciPushStatusStore().map(addToUpdatedConfigList(linkedList, "disable_davinci_push_status_store")).orElse(false)).booleanValue();
            updateStore.storagePersona = (CharSequence) storagePersona.map(addToUpdatedConfigList(linkedList, "persona_name")).orElse(null);
            if (latestSupersetSchemaId.isPresent() && ((Integer) latestSupersetSchemaId.get()).intValue() != -1 && this.veniceHelixAdmin.getValueSchema(str, str2, ((Integer) latestSupersetSchemaId.get()).intValue()) == null) {
                throw new VeniceException("Unknown value schema id: " + latestSupersetSchemaId.get() + " in store: " + str2);
            }
            Optional map26 = latestSupersetSchemaId.map(addToUpdatedConfigList(linkedList, "latest_superset_schema_id"));
            Objects.requireNonNull(store);
            updateStore.latestSuperSetValueSchemaId = ((Integer) map26.orElseGet(store::getLatestSuperSetValueSchemaId)).intValue();
            StoragePersonaRepository storagePersonaRepository = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getStoragePersonaRepository();
            StoragePersona storagePersona2 = null;
            StoragePersona personaContainingStore = storagePersonaRepository.getPersonaContainingStore(store.getName());
            if (updateStoreQueryParams.getStoragePersona().isPresent()) {
                storagePersona2 = getVeniceHelixAdmin().getStoragePersona(str, (String) updateStoreQueryParams.getStoragePersona().get());
                if (storagePersona2 == null) {
                    throw new VeniceException("UpdateStore command failed for store " + str2 + ".  The provided StoragePersona " + ((String) updateStoreQueryParams.getStoragePersona().get()) + " does not exist.");
                }
            } else if (personaContainingStore != null) {
                storagePersona2 = personaContainingStore;
            }
            if (storagePersona2 != null) {
                Store store2 = getVeniceHelixAdmin().getStore(str, str2);
                store2.setStorageQuotaInByte(updateStore.getStorageQuotaInByte());
                storagePersonaRepository.validateAddUpdatedStore(storagePersona2, Optional.of(store2));
            }
            updateStore.replicateAllConfigs = z;
            if (z) {
                updateStore.updatedConfigsList = Collections.emptyList();
            } else {
                if (linkedList.size() == 0) {
                    String str8 = "UpdateStore command failed for store " + str2 + ". The command didn't change any specific store config and didn't specify \"--replicate-all-configs\" flag.";
                    LOGGER.error(str8);
                    throw new VeniceException(str8);
                }
                updateStore.updatedConfigsList = linkedList;
            }
            updateStore.regionsFilter = (CharSequence) regionsFilter.orElse(null);
            if ((updateStore.getActiveActiveReplicationEnabled() || updateStore.getWriteComputationEnabled()) && mergeNewSettingsIntoOldPartitionerConfig.getAmplificationFactor() > 1) {
                throw new VeniceHttpException(400, "Non-default amplification factor is not compatible with active-active replication and/or write compute.", ErrorType.BAD_REQUEST);
            }
            boolean z2 = ((Boolean) writeComputationEnabled.orElse(false)).booleanValue() && !store.isWriteComputationEnabled();
            if (z2) {
                addWriteComputeSchemaForStore(str, str2, true);
            }
            if (!this.veniceHelixAdmin.isHybrid(store.getHybridStoreConfig()) && this.veniceHelixAdmin.isHybrid(updateStore.hybridStoreConfig) && updateStore.partitionNum == 0) {
                updateStore.partitionNum = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getConfig().getNumberOfPartitionForHybrid();
                LOGGER.info("Enforcing default hybrid partition count:{} for a new hybrid store:{}.", Integer.valueOf(updateStore.partitionNum), str2);
                linkedList.add("partition_count");
            }
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.UPDATE_STORE.getValue();
            adminOperation.payloadUnion = updateStore;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            boolean z3 = ((Boolean) readComputationEnabled.orElse(false)).booleanValue() && !store.isReadComputationEnabled();
            if (!store.isSystemStore() && (z3 || z2)) {
                addSupersetSchemaForStore(str, str2, store.isActiveActiveReplicationEnabled());
            }
            if (z2) {
                LOGGER.info("Enabling write compute for the first time on store {} in cluster {}", str2, str);
                addWriteComputeSchemaForStore(str, str2, false);
            }
            if (activeActiveReplicationEnabled.orElse(false).booleanValue() && !store.isActiveActiveReplicationEnabled()) {
                updateReplicationMetadataSchemaForAllValueSchema(str, str2);
            }
        } finally {
            releaseAdminMessageLock(str, str2);
        }
    }

    private void validateStoreViewConfig(Map<String, String> map, Store store) {
        for (Map.Entry<String, ViewConfig> entry : StoreViewUtils.convertStringMapViewToViewConfig(map).entrySet()) {
            ViewUtils.getVeniceView(entry.getValue().getViewClassName(), new Properties(), store, entry.getValue().getViewParameters()).validateConfigs();
        }
    }

    private SupersetSchemaGenerator getSupersetSchemaGenerator(String str) {
        return (this.externalSupersetSchemaGenerator.isPresent() && getMultiClusterConfigs().getControllerConfig(str).isParentExternalSupersetSchemaGenerationEnabled()) ? this.externalSupersetSchemaGenerator.get() : this.defaultSupersetSchemaGenerator;
    }

    private void addSupersetSchemaForStore(String str, String str2, boolean z) {
        SchemaEntry generateSupersetSchemaFromSchemas = getSupersetSchemaGenerator(str).generateSupersetSchemaFromSchemas(getValueSchemas(str, str2));
        Schema schema = generateSupersetSchemaFromSchemas.getSchema();
        int id = generateSupersetSchemaFromSchemas.getId();
        addValueSchemaEntry(str, str2, schema.toString(), id, true);
        if (z) {
            updateReplicationMetadataSchema(str, str2, schema, id);
        }
    }

    private void addWriteComputeSchemaForStore(String str, String str2, boolean z) {
        Collection<SchemaEntry> valueSchemas = getValueSchemas(str, str2);
        ArrayList<SchemaEntry> arrayList = new ArrayList(valueSchemas.size());
        int intValue = ((Integer) valueSchemas.stream().map((v0) -> {
            return v0.getId();
        }).max(Comparator.naturalOrder()).get()).intValue();
        for (SchemaEntry schemaEntry : valueSchemas) {
            try {
                arrayList.add(new SchemaEntry(schemaEntry.getId(), this.writeComputeSchemaConverter.convertFromValueRecordSchema(schemaEntry.getSchema())));
            } catch (Exception e) {
                if (schemaEntry.getId() == intValue) {
                    throw new VeniceException("For store " + str2 + " cannot generate update schema for value schema ID :" + schemaEntry.getId() + ", top level field probably missing defaults.", e);
                }
            }
        }
        if (z) {
            return;
        }
        for (SchemaEntry schemaEntry2 : arrayList) {
            addDerivedSchema(str, str2, schemaEntry2.getId(), schemaEntry2.getSchemaStr());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateClusterConfig(String str, UpdateClusterConfigQueryParams updateClusterConfigQueryParams) {
        getVeniceHelixAdmin().updateClusterConfig(str, updateClusterConfigQueryParams);
    }

    private void validateActiveActiveReplicationEnableConfigs(Optional<Boolean> optional, Optional<Boolean> optional2, Store store) {
        if (optional.orElse(false).booleanValue()) {
            if (!(optional2.isPresent() ? optional2.get().booleanValue() : store.isNativeReplicationEnabled())) {
                throw new VeniceHttpException(400, "Active/Active Replication cannot be enabled for store " + store.getName() + " since Native Replication is not enabled on it.", ErrorType.INVALID_CONFIG);
            }
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public double getStorageEngineOverheadRatio(String str) {
        return getVeniceHelixAdmin().getStorageEngineOverheadRatio(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public SchemaEntry getKeySchema(String str, String str2) {
        return getVeniceHelixAdmin().getKeySchema(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Collection<SchemaEntry> getValueSchemas(String str, String str2) {
        return getVeniceHelixAdmin().getValueSchemas(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Collection<DerivedSchemaEntry> getDerivedSchemas(String str, String str2) {
        return getVeniceHelixAdmin().getDerivedSchemas(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getValueSchemaId(String str, String str2, String str3) {
        return getVeniceHelixAdmin().getValueSchemaId(str, str2, str3);
    }

    @Override // com.linkedin.venice.controller.Admin
    public GeneratedSchemaID getDerivedSchemaId(String str, String str2, String str3) {
        return getVeniceHelixAdmin().getDerivedSchemaId(str, str2, str3);
    }

    @Override // com.linkedin.venice.controller.Admin
    public SchemaEntry getValueSchema(String str, String str2, int i) {
        return getVeniceHelixAdmin().getValueSchema(str, str2, i);
    }

    @Override // com.linkedin.venice.controller.Admin
    public SchemaEntry addValueSchema(String str, String str2, String str3, DirectionalSchemaCompatibilityType directionalSchemaCompatibilityType) {
        boolean z;
        acquireAdminMessageLock(str, str2);
        try {
            Schema parseSchemaFromJSONStrictValidation = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(str3);
            int checkPreConditionForAddValueSchemaAndGetNewSchemaId = getVeniceHelixAdmin().checkPreConditionForAddValueSchemaAndGetNewSchemaId(str, str2, str3, directionalSchemaCompatibilityType);
            if (checkPreConditionForAddValueSchemaAndGetNewSchemaId == -2) {
                SchemaEntry schemaEntry = new SchemaEntry(getVeniceHelixAdmin().getValueSchemaId(str, str2, str3), str3);
                releaseAdminMessageLock(str, str2);
                return schemaEntry;
            }
            Store store = getVeniceHelixAdmin().getStore(str, str2);
            Schema supersetOrLatestValueSchema = getVeniceHelixAdmin().getSupersetOrLatestValueSchema(str, store);
            if (supersetOrLatestValueSchema == null || !(store.isReadComputationEnabled() || store.isWriteComputationEnabled())) {
                z = false;
            } else {
                SupersetSchemaGenerator supersetSchemaGenerator = getSupersetSchemaGenerator(str);
                Schema generateSupersetSchema = supersetSchemaGenerator.generateSupersetSchema(supersetOrLatestValueSchema, parseSchemaFromJSONStrictValidation);
                String schema = generateSupersetSchema.toString();
                if (supersetSchemaGenerator.compareSchema(generateSupersetSchema, parseSchemaFromJSONStrictValidation)) {
                    z = true;
                } else if (supersetSchemaGenerator.compareSchema(generateSupersetSchema, supersetOrLatestValueSchema)) {
                    z = false;
                } else {
                    if (!store.isSystemStore()) {
                        getVeniceHelixAdmin().checkPreConditionForAddValueSchemaAndGetNewSchemaId(str, str2, schema, directionalSchemaCompatibilityType);
                        int valueSchemaIdIgnoreFieldOrder = getVeniceHelixAdmin().getValueSchemaIdIgnoreFieldOrder(str, str2, schema, (schema2, schema3) -> {
                            return supersetSchemaGenerator.compareSchema(schema2, schema3) ? 0 : 1;
                        });
                        if (valueSchemaIdIgnoreFieldOrder == -1) {
                            valueSchemaIdIgnoreFieldOrder = checkPreConditionForAddValueSchemaAndGetNewSchemaId + 1;
                        }
                        SchemaEntry addValueAndSupersetSchemaEntries = addValueAndSupersetSchemaEntries(str, str2, new SchemaEntry(checkPreConditionForAddValueSchemaAndGetNewSchemaId, parseSchemaFromJSONStrictValidation), new SchemaEntry(valueSchemaIdIgnoreFieldOrder, generateSupersetSchema), store.isWriteComputationEnabled());
                        releaseAdminMessageLock(str, str2);
                        return addValueAndSupersetSchemaEntries;
                    }
                    z = false;
                }
            }
            SchemaEntry addValueSchemaEntry = addValueSchemaEntry(str, str2, str3, checkPreConditionForAddValueSchemaAndGetNewSchemaId, z);
            if (store.isActiveActiveReplicationEnabled()) {
                Schema supersetOrLatestValueSchema2 = getVeniceHelixAdmin().getSupersetOrLatestValueSchema(str, store);
                updateReplicationMetadataSchema(str, str2, supersetOrLatestValueSchema2, getValueSchemaId(str, str2, supersetOrLatestValueSchema2.toString()));
            }
            if (store.isWriteComputationEnabled()) {
                addDerivedSchema(str, str2, addValueSchemaEntry.getId(), this.writeComputeSchemaConverter.convertFromValueRecordSchema(addValueSchemaEntry.getSchema()).toString());
            }
            return addValueSchemaEntry;
        } finally {
            releaseAdminMessageLock(str, str2);
        }
    }

    private SchemaEntry addValueAndSupersetSchemaEntries(String str, String str2, SchemaEntry schemaEntry, SchemaEntry schemaEntry2, boolean z) {
        validateNewSupersetAndValueSchemaEntries(str2, str, schemaEntry, schemaEntry2);
        LOGGER.info("Adding value schema {} and superset schema {} to store: {} in cluster: {}", schemaEntry, schemaEntry2, str2, str);
        SupersetSchemaCreation supersetSchemaCreation = (SupersetSchemaCreation) AdminMessageType.SUPERSET_SCHEMA_CREATION.getNewInstance();
        supersetSchemaCreation.clusterName = str;
        supersetSchemaCreation.storeName = str2;
        SchemaMeta schemaMeta = new SchemaMeta();
        schemaMeta.definition = schemaEntry.getSchemaStr();
        schemaMeta.schemaType = SchemaType.AVRO_1_4.getValue();
        supersetSchemaCreation.valueSchema = schemaMeta;
        supersetSchemaCreation.valueSchemaId = schemaEntry.getId();
        SchemaMeta schemaMeta2 = new SchemaMeta();
        schemaMeta2.definition = schemaEntry2.getSchemaStr();
        schemaMeta2.schemaType = SchemaType.AVRO_1_4.getValue();
        supersetSchemaCreation.supersetSchema = schemaMeta2;
        supersetSchemaCreation.supersetSchemaId = schemaEntry2.getId();
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.SUPERSET_SCHEMA_CREATION.getValue();
        adminOperation.payloadUnion = supersetSchemaCreation;
        sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
        updateReplicationMetadataSchema(str, str2, schemaEntry.getSchema(), schemaEntry.getId());
        updateReplicationMetadataSchema(str, str2, schemaEntry2.getSchema(), schemaEntry2.getId());
        if (z) {
            Schema convertFromValueRecordSchema = this.writeComputeSchemaConverter.convertFromValueRecordSchema(schemaEntry.getSchema());
            Schema convertFromValueRecordSchema2 = this.writeComputeSchemaConverter.convertFromValueRecordSchema(schemaEntry2.getSchema());
            addDerivedSchema(str, str2, schemaEntry.getId(), convertFromValueRecordSchema.toString());
            addDerivedSchema(str, str2, schemaEntry2.getId(), convertFromValueRecordSchema2.toString());
        }
        return schemaEntry;
    }

    private void validateNewSupersetAndValueSchemaEntries(String str, String str2, SchemaEntry schemaEntry, SchemaEntry schemaEntry2) {
        if (schemaEntry.getId() == schemaEntry2.getId()) {
            throw new IllegalArgumentException(String.format("Superset schema ID and value schema ID are expected to be different for store %s in cluster %s. Got ID: %d", str, str2, Integer.valueOf(schemaEntry.getId())));
        }
        if (AvroSchemaUtils.compareSchemaIgnoreFieldOrder(schemaEntry.getSchema(), schemaEntry2.getSchema())) {
            throw new IllegalArgumentException(String.format("Superset and value schemas are expected to be different for store %s in cluster %s. Got schema: %s", str, str2, schemaEntry.getSchema()));
        }
    }

    private SchemaEntry addValueSchemaEntry(String str, String str2, String str3, int i, boolean z) {
        LOGGER.info("Adding value schema: {} to store: {} in cluster: {}", str3, str2, str);
        ValueSchemaCreation valueSchemaCreation = (ValueSchemaCreation) AdminMessageType.VALUE_SCHEMA_CREATION.getNewInstance();
        valueSchemaCreation.clusterName = str;
        valueSchemaCreation.storeName = str2;
        SchemaMeta schemaMeta = new SchemaMeta();
        schemaMeta.definition = str3;
        schemaMeta.schemaType = SchemaType.AVRO_1_4.getValue();
        valueSchemaCreation.schema = schemaMeta;
        valueSchemaCreation.schemaId = i;
        valueSchemaCreation.doUpdateSupersetSchemaID = z;
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.VALUE_SCHEMA_CREATION.getValue();
        adminOperation.payloadUnion = valueSchemaCreation;
        sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
        int valueSchemaId = getValueSchemaId(str, str2, str3);
        if (valueSchemaId != i) {
            throw new VeniceException("Something bad happens, the expected new value schema id is: " + i + ", but got: " + valueSchemaId);
        }
        return new SchemaEntry(valueSchemaId, str3);
    }

    @Override // com.linkedin.venice.controller.Admin
    public SchemaEntry addSupersetSchema(String str, String str2, String str3, int i, String str4, int i2) {
        throw new VeniceUnsupportedOperationException("addValueSchema");
    }

    @Override // com.linkedin.venice.controller.Admin
    public SchemaEntry addValueSchema(String str, String str2, String str3, int i, boolean z) {
        throw new VeniceUnsupportedOperationException("addValueSchema");
    }

    @Override // com.linkedin.venice.controller.Admin
    public DerivedSchemaEntry addDerivedSchema(String str, String str2, int i, String str3) {
        acquireAdminMessageLock(str, str2);
        try {
            int checkPreConditionForAddDerivedSchemaAndGetNewSchemaId = this.veniceHelixAdmin.checkPreConditionForAddDerivedSchemaAndGetNewSchemaId(str, str2, i, str3);
            if (checkPreConditionForAddDerivedSchemaAndGetNewSchemaId == -2) {
                DerivedSchemaEntry derivedSchemaEntry = new DerivedSchemaEntry(i, getVeniceHelixAdmin().getDerivedSchemaId(str, str2, str3).getGeneratedSchemaVersion(), str3);
                releaseAdminMessageLock(str, str2);
                return derivedSchemaEntry;
            }
            LOGGER.info("Adding derived schema: {} to store: {}, version: {} in cluster: {}", str3, str2, Integer.valueOf(i), str);
            DerivedSchemaCreation derivedSchemaCreation = (DerivedSchemaCreation) AdminMessageType.DERIVED_SCHEMA_CREATION.getNewInstance();
            derivedSchemaCreation.clusterName = str;
            derivedSchemaCreation.storeName = str2;
            SchemaMeta schemaMeta = new SchemaMeta();
            schemaMeta.definition = str3;
            schemaMeta.schemaType = SchemaType.AVRO_1_4.getValue();
            derivedSchemaCreation.schema = schemaMeta;
            derivedSchemaCreation.valueSchemaId = i;
            derivedSchemaCreation.derivedSchemaId = checkPreConditionForAddDerivedSchemaAndGetNewSchemaId;
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.DERIVED_SCHEMA_CREATION.getValue();
            adminOperation.payloadUnion = derivedSchemaCreation;
            sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
            DerivedSchemaEntry derivedSchemaEntry2 = new DerivedSchemaEntry(i, checkPreConditionForAddDerivedSchemaAndGetNewSchemaId, str3);
            releaseAdminMessageLock(str, str2);
            return derivedSchemaEntry2;
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public DerivedSchemaEntry addDerivedSchema(String str, String str2, int i, int i2, String str3) {
        throw new VeniceUnsupportedOperationException("addDerivedSchema");
    }

    @Override // com.linkedin.venice.controller.Admin
    public DerivedSchemaEntry removeDerivedSchema(String str, String str2, int i, int i2) {
        throw new VeniceUnsupportedOperationException("removeDerivedSchema");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Collection<RmdSchemaEntry> getReplicationMetadataSchemas(String str, String str2) {
        return getVeniceHelixAdmin().getReplicationMetadataSchemas(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Optional<Schema> getReplicationMetadataSchema(String str, String str2, int i, int i2) {
        return getVeniceHelixAdmin().getReplicationMetadataSchema(str, str2, i, i2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public RmdSchemaEntry addReplicationMetadataSchema(String str, String str2, int i, int i2, String str3) {
        acquireAdminMessageLock(str, str2);
        try {
            try {
                RmdSchemaEntry rmdSchemaEntry = new RmdSchemaEntry(i, i2, str3);
                if (getVeniceHelixAdmin().checkIfMetadataSchemaAlreadyPresent(str, str2, i, rmdSchemaEntry)) {
                    LOGGER.info("Replication metadata schema already exists for store: {} in cluster: {} metadataSchema: {} replicationMetadataVersionId: {} valueSchemaId: {}", str2, str, str3, Integer.valueOf(i2), Integer.valueOf(i));
                    releaseAdminMessageLock(str, str2);
                    return rmdSchemaEntry;
                }
                LOGGER.info("Adding Replication metadata schema: for store: {} in cluster: {} metadataSchema: {} replicationMetadataVersionId: {} valueSchemaId: {}", str2, str, str3, Integer.valueOf(i2), Integer.valueOf(i));
                MetadataSchemaCreation metadataSchemaCreation = (MetadataSchemaCreation) AdminMessageType.REPLICATION_METADATA_SCHEMA_CREATION.getNewInstance();
                metadataSchemaCreation.clusterName = str;
                metadataSchemaCreation.storeName = str2;
                metadataSchemaCreation.valueSchemaId = i;
                SchemaMeta schemaMeta = new SchemaMeta();
                schemaMeta.definition = str3;
                schemaMeta.schemaType = SchemaType.AVRO_1_4.getValue();
                metadataSchemaCreation.metadataSchema = schemaMeta;
                metadataSchemaCreation.timestampMetadataVersionId = i2;
                AdminOperation adminOperation = new AdminOperation();
                adminOperation.operationType = AdminMessageType.REPLICATION_METADATA_SCHEMA_CREATION.getValue();
                adminOperation.payloadUnion = metadataSchemaCreation;
                sendAdminMessageAndWaitForConsumed(str, str2, adminOperation);
                validateRmdSchemaIsAddedAsExpected(str, str2, i, i2, AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(str3));
                RmdSchemaEntry rmdSchemaEntry2 = new RmdSchemaEntry(i, i2, str3);
                releaseAdminMessageLock(str, str2);
                return rmdSchemaEntry2;
            } catch (Exception e) {
                LOGGER.error("Error when adding replication metadata schema for store: {}, value schema id: {}", str2, Integer.valueOf(i), e);
                throw e;
            }
        } catch (Throwable th) {
            releaseAdminMessageLock(str, str2);
            throw th;
        }
    }

    private void validateRmdSchemaIsAddedAsExpected(String str, String str2, int i, int i2, Schema schema) {
        Schema orElse = getReplicationMetadataSchema(str, str2, i, i2).orElse(null);
        if (orElse == null) {
            throw new VeniceException(String.format("No replication metadata schema found for store %s in cluster %s with value schema ID %s and RMD protocol version ID %d", str2, str, Integer.valueOf(i), Integer.valueOf(i2)));
        }
        if (!AvroSchemaUtils.compareSchemaIgnoreFieldOrder(orElse, schema)) {
            throw new VeniceException(String.format("For store %s in cluster %s with value schema ID %d and RMD protocol version ID %d. Expected RMD schema %s. But got RMD schema: %s", str2, str, Integer.valueOf(i), Integer.valueOf(i2), schema.toString(true), orElse.toString(true)));
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void validateAndMaybeRetrySystemStoreAutoCreation(String str, String str2, VeniceSystemStoreType veniceSystemStoreType) {
        throw new VeniceUnsupportedOperationException("validateAndMaybeRetrySystemStoreAutoCreation");
    }

    private void updateReplicationMetadataSchemaForAllValueSchema(String str, String str2) {
        for (SchemaEntry schemaEntry : getValueSchemas(str, str2)) {
            updateReplicationMetadataSchema(str, str2, schemaEntry.getSchema(), schemaEntry.getId());
        }
    }

    private void updateReplicationMetadataSchema(String str, String str2, Schema schema, int i) {
        int rmdVersionID = getRmdVersionID(str2, str);
        if (getVeniceHelixAdmin().checkIfValueSchemaAlreadyHasRmdSchema(str, str2, i, rmdVersionID)) {
            LOGGER.info("Store {} in cluster {} already has a replication metadata schema for its value schema with ID {} and replication metadata version ID {}. So skip updating this value schema's RMD schema.", str2, str, Integer.valueOf(i), Integer.valueOf(rmdVersionID));
        } else {
            addReplicationMetadataSchema(str, str2, i, rmdVersionID, RmdSchemaGenerator.generateMetadataSchema(schema, rmdVersionID).toString());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<String> getStorageNodes(String str) {
        throw new VeniceUnsupportedOperationException("getStorageNodes");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> getStorageNodesStatus(String str, boolean z) {
        throw new VeniceUnsupportedOperationException("getStorageNodesStatus");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void removeStorageNode(String str, String str2) {
        throw new VeniceUnsupportedOperationException("removeStorageNode");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Admin.OfflinePushStatusInfo getOffLinePushStatus(String str, String str2) {
        return getOffLineJobStatus(str, str2, getVeniceHelixAdmin().getControllerClientMap(str));
    }

    @Override // com.linkedin.venice.controller.Admin
    public Admin.OfflinePushStatusInfo getOffLinePushStatus(String str, String str2, Optional<String> optional, String str3) {
        Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
        if (str3 == null) {
            return getOffLineJobStatus(str, str2, controllerClientMap, optional);
        }
        if (!controllerClientMap.containsKey(str3)) {
            throw new VeniceException("Region " + str3 + " does not exist in " + controllerClientMap.keySet());
        }
        JobStatusQueryResponse queryDetailedJobStatus = controllerClientMap.get(str3).queryDetailedJobStatus(str2, str3);
        if (queryDetailedJobStatus.isError()) {
            throw new VeniceException("Couldn't query " + str3 + " for job " + str2 + " status: " + queryDetailedJobStatus.getError());
        }
        Admin.OfflinePushStatusInfo offlinePushStatusInfo = new Admin.OfflinePushStatusInfo(ExecutionStatus.valueOf(queryDetailedJobStatus.getStatus()), (String) queryDetailedJobStatus.getOptionalStatusDetails().orElse(null));
        offlinePushStatusInfo.setUncompletedPartitions(queryDetailedJobStatus.getUncompletedPartitions());
        return offlinePushStatusInfo;
    }

    Admin.OfflinePushStatusInfo getOffLineJobStatus(String str, String str2, Map<String, ControllerClient> map) {
        return getOffLineJobStatus(str, str2, map, Optional.empty());
    }

    private Admin.OfflinePushStatusInfo getOffLineJobStatus(String str, String str2, Map<String, ControllerClient> map, Optional<String> optional) {
        Set<String> keySet = map.keySet();
        ExecutionStatus executionStatus = ExecutionStatus.NEW;
        String str3 = null;
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        int i = 0;
        for (Map.Entry<String, ControllerClient> entry : map.entrySet()) {
            String key = entry.getKey();
            ControllerClient value = entry.getValue();
            try {
                String leaderControllerUrl = value.getLeaderControllerUrl();
                JobStatusQueryResponse queryJobStatus = value.queryJobStatus(str2, optional);
                if (queryJobStatus.isError()) {
                    i++;
                    LOGGER.warn("Couldn't query {} for job {} status: {}", key, str2, queryJobStatus.getError());
                    arrayList.add(ExecutionStatus.UNKNOWN);
                    hashMap.put(key, ExecutionStatus.UNKNOWN.toString());
                    hashMap2.put(key, leaderControllerUrl + " " + queryJobStatus.getError());
                } else {
                    arrayList.add(ExecutionStatus.valueOf(queryJobStatus.getStatus()));
                    hashMap.put(key, queryJobStatus.getStatus());
                    queryJobStatus.getOptionalStatusDetails().ifPresent(str4 -> {
                        hashMap2.put(key, leaderControllerUrl + " " + str4);
                    });
                }
            } catch (VeniceException e) {
                LOGGER.warn("Couldn't query {} for job status of {}", key, str2, e);
                arrayList.add(ExecutionStatus.UNKNOWN);
                hashMap.put(key, ExecutionStatus.UNKNOWN.toString());
                hashMap2.put(key, "Failed to get leader controller url " + e.getMessage());
            }
        }
        List<ExecutionStatus> list = VeniceHelixAdmin.STATUS_PRIORITIES;
        Objects.requireNonNull(list);
        arrayList.sort(Comparator.comparingInt((v1) -> {
            return r1.indexOf(v1);
        }));
        if (arrayList.size() > 0) {
            executionStatus = (ExecutionStatus) arrayList.get(0);
        }
        if (keySet.size() - i < (keySet.size() / 2) + 1) {
            executionStatus = ExecutionStatus.PROGRESS;
        }
        if (executionStatus.isTerminal()) {
            if (i > 0) {
                executionStatus = ExecutionStatus.ERROR;
                str3 = i + "/" + keySet.size() + " DCs unreachable. ";
            }
            if (this.maxErroredTopicNumToKeep <= 0 || !executionStatus.equals(ExecutionStatus.ERROR)) {
                Store store = getVeniceHelixAdmin().getStore(str, Version.parseStoreFromKafkaTopicName(str2));
                boolean z = !optional.isPresent() && executionStatus == ExecutionStatus.ERROR;
                boolean z2 = !optional.isPresent() && store.isIncrementalPushEnabled();
                boolean z3 = (store.isIncrementalPushEnabled() || executionStatus == ExecutionStatus.ERROR) ? false : true;
                if ((z || z3 || z2) && !getMultiClusterConfigs().getCommonConfig().disableParentTopicTruncationUponCompletion()) {
                    LOGGER.info("Truncating kafka topic: {} with job status: {}", str2, executionStatus);
                    truncateKafkaTopic(str2);
                    Optional version = store.getVersion(Version.parseVersionFromKafkaTopicName(str2));
                    if (version.isPresent() && ((Version) version.get()).getPushType().isStreamReprocessing()) {
                        truncateKafkaTopic(Version.composeStreamReprocessingTopic(store.getName(), ((Version) version.get()).getNumber()));
                    }
                    str3 = ((String) Optional.ofNullable(str3).orElse("")) + "Parent Kafka topic truncated";
                }
            } else {
                str3 = ((String) Optional.ofNullable(str3).orElse("")) + "Parent Kafka topic won't be truncated";
                LOGGER.info("The errored kafka topic {} won't be truncated since it will be used to investigate some Kafka related issue", str2);
            }
        }
        return new Admin.OfflinePushStatusInfo(executionStatus, hashMap, str3, hashMap2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getKafkaBootstrapServers(boolean z) {
        return getVeniceHelixAdmin().getKafkaBootstrapServers(z);
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getNativeReplicationKafkaBootstrapServerAddress(String str) {
        return getVeniceHelixAdmin().getNativeReplicationKafkaBootstrapServerAddress(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getNativeReplicationSourceFabric(String str, Store store, Optional<String> optional, Optional<String> optional2) {
        return getVeniceHelixAdmin().getNativeReplicationSourceFabric(str, store, optional, optional2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isSSLEnabledForPush(String str, String str2) {
        return getVeniceHelixAdmin().isSSLEnabledForPush(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isSslToKafka() {
        return getVeniceHelixAdmin().isSslToKafka();
    }

    @Override // com.linkedin.venice.controller.Admin
    public TopicManager getTopicManager() {
        return getVeniceHelixAdmin().getTopicManager();
    }

    @Override // com.linkedin.venice.controller.Admin
    public TopicManager getTopicManager(String str) {
        return getVeniceHelixAdmin().getTopicManager(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isLeaderControllerFor(String str) {
        return getVeniceHelixAdmin().isLeaderControllerFor(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public int calculateNumberOfPartitions(String str, String str2) {
        return getVeniceHelixAdmin().calculateNumberOfPartitions(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getReplicationFactor(String str, String str2) {
        return getVeniceHelixAdmin().getReplicationFactor(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getDatacenterCount(String str) {
        return getMultiClusterConfigs().getControllerConfig(str).getChildDataCenterControllerUrlMap().size();
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<Replica> getReplicas(String str, String str2) {
        throw new VeniceException("getReplicas is not supported!");
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<Replica> getReplicasOfStorageNode(String str, String str2) {
        throw new VeniceException("getReplicasOfStorageNode is not supported!");
    }

    @Override // com.linkedin.venice.controller.Admin
    public NodeRemovableResult isInstanceRemovable(String str, String str2, List<String> list, boolean z) {
        throw new VeniceException("isInstanceRemovable is not supported!");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Pair<NodeReplicasReadinessState, List<Replica>> nodeReplicaReadiness(String str, String str2) {
        throw new VeniceUnsupportedOperationException("nodeReplicaReadiness is not supported");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void initiateDataRecovery(String str, String str2, int i, String str3, String str4, boolean z, Optional<Version> optional) {
        ControllerClient fabricBuildoutControllerClient = getFabricBuildoutControllerClient(str, str3);
        ControllerClient fabricBuildoutControllerClient2 = getFabricBuildoutControllerClient(str, str4);
        Optional version = fabricBuildoutControllerClient.getStore(str2).getStore().getVersion(i);
        if (!version.isPresent()) {
            throw new VeniceException("Version: " + i + " does not exist in the given source fabric: " + str3);
        }
        ControllerResponse dataRecovery = fabricBuildoutControllerClient2.dataRecovery(str3, str4, str2, i, true, z, version);
        if (dataRecovery.isError()) {
            throw new VeniceException("Failed to initiate data recovery in destination fabric, error: " + dataRecovery.getError());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void prepareDataRecovery(String str, String str2, int i, String str3, String str4, Optional<Integer> optional) {
        ControllerResponse prepareDataRecovery = getFabricBuildoutControllerClient(str, str4).prepareDataRecovery(str3, str4, str2, i, Optional.of(Integer.valueOf(getFabricBuildoutControllerClient(str, str3).getStore(str2).getStore().getPartitionerConfig().getAmplificationFactor())));
        if (prepareDataRecovery.isError()) {
            throw new VeniceException("Failed to prepare for data recovery in destination fabric, error: " + prepareDataRecovery.getError());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public Pair<Boolean, String> isStoreVersionReadyForDataRecovery(String str, String str2, int i, String str3, String str4, Optional<Integer> optional) {
        try {
            ReadyForDataRecoveryResponse isStoreVersionReadyForDataRecovery = getFabricBuildoutControllerClient(str, str4).isStoreVersionReadyForDataRecovery(str3, str4, str2, i, Optional.of(Integer.valueOf(getFabricBuildoutControllerClient(str, str3).getStore(str2).getStore().getPartitionerConfig().getAmplificationFactor())));
            return new Pair<>(Boolean.valueOf(isStoreVersionReadyForDataRecovery.isReady()), isStoreVersionReadyForDataRecovery.getReason());
        } catch (Exception e) {
            return new Pair<>(false, e.getMessage());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public Instance getLeaderController(String str) {
        return getVeniceHelixAdmin().getLeaderController(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void addInstanceToAllowlist(String str, String str2) {
        throw new VeniceException("addInstanceToAllowlist is not supported!");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void removeInstanceFromAllowList(String str, String str2) {
        throw new VeniceException("removeInstanceFromAllowList is not supported!");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Set<String> getAllowlist(String str) {
        throw new VeniceException("getAllowlist is not supported!");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void killOfflinePush(String str, String str2, boolean z) {
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(str2);
        if (getStore(str, parseStoreFromKafkaTopicName) == null) {
            throw new VeniceNoStoreException(parseStoreFromKafkaTopicName, str);
        }
        acquireAdminMessageLock(str, parseStoreFromKafkaTopicName);
        try {
            getVeniceHelixAdmin().checkPreConditionForKillOfflinePush(str, str2);
            LOGGER.info("Killing offline push job for topic: {} in cluster: {}", str2, str);
            if (this.maxErroredTopicNumToKeep == 0) {
                LOGGER.info("Truncating topic when kill offline push job, topic: {}", str2);
                truncateKafkaTopic(str2);
                PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeStreamReprocessingTopicFromVersionTopic(str2));
                if (getTopicManager().containsTopic(topic)) {
                    truncateKafkaTopic(topic.getName());
                }
            }
            KillOfflinePushJob killOfflinePushJob = (KillOfflinePushJob) AdminMessageType.KILL_OFFLINE_PUSH_JOB.getNewInstance();
            killOfflinePushJob.clusterName = str;
            killOfflinePushJob.kafkaTopic = str2;
            AdminOperation adminOperation = new AdminOperation();
            adminOperation.operationType = AdminMessageType.KILL_OFFLINE_PUSH_JOB.getValue();
            adminOperation.payloadUnion = killOfflinePushJob;
            sendAdminMessageAndWaitForConsumed(str, parseStoreFromKafkaTopicName, adminOperation);
            releaseAdminMessageLock(str, parseStoreFromKafkaTopicName);
        } catch (Throwable th) {
            releaseAdminMessageLock(str, parseStoreFromKafkaTopicName);
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public StorageNodeStatus getStorageNodesStatus(String str, String str2) {
        throw new VeniceUnsupportedOperationException("getStorageNodesStatus");
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isStorageNodeNewerOrEqualTo(String str, String str2, StorageNodeStatus storageNodeStatus) {
        throw new VeniceUnsupportedOperationException("isStorageNodeNewerOrEqualTo");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setAdminConsumerService(String str, AdminConsumerService adminConsumerService) {
        getVeniceHelixAdmin().setAdminConsumerService(str, adminConsumerService);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void skipAdminMessage(String str, long j, boolean z) {
        getVeniceHelixAdmin().skipAdminMessage(str, j, z);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Long getLastSucceedExecutionId(String str) {
        return getVeniceHelixAdmin().getLastSucceedExecutionId(str);
    }

    private Time getTimer() {
        return this.timer;
    }

    void setTimer(Time time) {
        this.timer = time;
    }

    @Override // com.linkedin.venice.controller.Admin
    public Optional<AdminCommandExecutionTracker> getAdminCommandExecutionTracker(String str) {
        return this.adminCommandExecutionTrackers.containsKey(str) ? Optional.of(this.adminCommandExecutionTrackers.get(str)) : Optional.empty();
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, Long> getAdminTopicMetadata(String str, Optional<String> optional) {
        throw new VeniceUnsupportedOperationException("getAdminTopicMetadata");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateAdminTopicMetadata(String str, long j, Optional<String> optional, Optional<Long> optional2, Optional<Long> optional3) {
        throw new VeniceUnsupportedOperationException("updateAdminTopicMetadata");
    }

    @Override // com.linkedin.venice.controller.Admin
    public RoutersClusterConfig getRoutersClusterConfig(String str) {
        throw new VeniceUnsupportedOperationException("getRoutersClusterConfig");
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateRoutersClusterConfig(String str, Optional<Boolean> optional, Optional<Boolean> optional2, Optional<Boolean> optional3, Optional<Integer> optional4) {
        throw new VeniceUnsupportedOperationException("updateRoutersClusterConfig");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> getAllStorePushStrategyForMigration() {
        return this.pushStrategyZKAccessor.getAllPushStrategies();
    }

    @Override // com.linkedin.venice.controller.Admin
    public void setStorePushStrategyForMigration(String str, String str2) {
        this.pushStrategyZKAccessor.setPushStrategy(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Pair<String, String> discoverCluster(String str) {
        return getVeniceHelixAdmin().discoverCluster(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getServerD2Service(String str) {
        return getVeniceHelixAdmin().getServerD2Service(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> findAllBootstrappingVersions(String str) {
        throw new VeniceUnsupportedOperationException("findAllBootstrappingVersions");
    }

    @Override // com.linkedin.venice.controller.Admin
    public VeniceWriterFactory getVeniceWriterFactory() {
        return getVeniceHelixAdmin().getVeniceWriterFactory();
    }

    @Override // com.linkedin.venice.controller.Admin
    public PubSubConsumerAdapterFactory getVeniceConsumerFactory() {
        return getVeniceHelixAdmin().getVeniceConsumerFactory();
    }

    @Override // com.linkedin.venice.controller.Admin
    public VeniceProperties getPubSubSSLProperties(String str) {
        return getVeniceHelixAdmin().getPubSubSSLProperties(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public synchronized void stop(String str) {
        getVeniceHelixAdmin().stop(str);
        VeniceWriter<byte[], byte[], byte[]> veniceWriter = this.veniceWriterMap.get(str);
        if (veniceWriter != null) {
            veniceWriter.close();
        }
        this.asyncSetupEnabledMap.put(str, false);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void stopVeniceController() {
        getVeniceHelixAdmin().stopVeniceController();
    }

    @Override // com.linkedin.venice.controller.Admin, java.lang.AutoCloseable, java.io.Closeable
    public synchronized void close() {
        this.veniceWriterMap.keySet().forEach(this::stop);
        getVeniceHelixAdmin().close();
        this.terminalStateTopicChecker.close();
        if (this.systemStoreAclSynchronizationTask != null) {
            this.systemStoreAclSynchronizationTask.close();
        }
        this.topicCheckerExecutor.shutdownNow();
        this.asyncSetupExecutor.shutdownNow();
        if (this.systemStoreAclSynchronizationExecutor != null) {
            this.systemStoreAclSynchronizationExecutor.shutdownNow();
        }
        try {
            this.topicCheckerExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            this.asyncSetupExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            if (this.systemStoreAclSynchronizationExecutor != null) {
                this.systemStoreAclSynchronizationExecutor.awaitTermination(30L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.newFabricControllerClientMap.forEach((str, map) -> {
            map.values().forEach(closeable -> {
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
            });
        });
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isLeaderControllerOfControllerCluster() {
        return getVeniceHelixAdmin().isLeaderControllerOfControllerCluster();
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isTopicTruncated(String str) {
        return getVeniceHelixAdmin().isTopicTruncated(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isTopicTruncatedBasedOnRetention(long j) {
        return getVeniceHelixAdmin().isTopicTruncatedBasedOnRetention(j);
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getMinNumberOfUnusedKafkaTopicsToPreserve() {
        return getVeniceHelixAdmin().getMinNumberOfUnusedKafkaTopicsToPreserve();
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean truncateKafkaTopic(String str) {
        return getVeniceHelixAdmin().truncateKafkaTopic(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isResourceStillAlive(String str) {
        throw new VeniceException("VeniceParentHelixAdmin#isResourceStillAlive is not supported!");
    }

    void setOfflinePushAccessor(ParentHelixOfflinePushAccessor parentHelixOfflinePushAccessor) {
        this.offlinePushAccessor = parentHelixOfflinePushAccessor;
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateClusterDiscovery(String str, String str2, String str3, String str4) {
        getVeniceHelixAdmin().updateClusterDiscovery(str, str2, str3, str4);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void sendPushJobDetails(PushJobStatusRecordKey pushJobStatusRecordKey, PushJobDetails pushJobDetails) {
        getVeniceHelixAdmin().sendPushJobDetails(pushJobStatusRecordKey, pushJobDetails);
    }

    @Override // com.linkedin.venice.controller.Admin
    public PushJobDetails getPushJobDetails(PushJobStatusRecordKey pushJobStatusRecordKey) {
        return getVeniceHelixAdmin().getPushJobDetails(pushJobStatusRecordKey);
    }

    @Override // com.linkedin.venice.controller.Admin
    public BatchJobHeartbeatValue getBatchJobHeartbeatValue(BatchJobHeartbeatKey batchJobHeartbeatKey) {
        return getVeniceHelixAdmin().getBatchJobHeartbeatValue(batchJobHeartbeatKey);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void writeEndOfPush(String str, String str2, int i, boolean z) {
        getVeniceHelixAdmin().writeEndOfPush(str, str2, i, z);
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean whetherEnableBatchPushFromAdmin(String str) {
        return true;
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isStoreMigrationAllowed(String str) {
        return getVeniceHelixAdmin().isStoreMigrationAllowed(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void migrateStore(String str, String str2, String str3) {
        if (str.equals(str2)) {
            throw new VeniceException("Source cluster and destination cluster cannot be the same!");
        }
        MigrateStore migrateStore = (MigrateStore) AdminMessageType.MIGRATE_STORE.getNewInstance();
        migrateStore.srcClusterName = str;
        migrateStore.destClusterName = str2;
        migrateStore.storeName = str3;
        updateStore(str, str3, new UpdateStoreQueryParams().setStoreMigration(true));
        getVeniceHelixAdmin().setStoreConfigForMigration(str3, str, str2);
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.MIGRATE_STORE.getValue();
        adminOperation.payloadUnion = migrateStore;
        sendAdminMessageAndWaitForConsumed(str, str3, adminOperation);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void completeMigration(String str, String str2, String str3) {
        getVeniceHelixAdmin().updateClusterDiscovery(str3, str, str2, str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void abortMigration(String str, String str2, String str3) {
        if (str.equals(str2)) {
            throw new VeniceException("Source cluster and destination cluster cannot be the same!");
        }
        AbortMigration abortMigration = (AbortMigration) AdminMessageType.ABORT_MIGRATION.getNewInstance();
        abortMigration.srcClusterName = str;
        abortMigration.destClusterName = str2;
        abortMigration.storeName = str3;
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.ABORT_MIGRATION.getValue();
        adminOperation.payloadUnion = abortMigration;
        sendAdminMessageAndWaitForConsumed(str, str3, adminOperation);
    }

    private ETLStoreConfigRecord mergeNewSettingIntoOldETLStoreConfig(Store store, Optional<Boolean> optional, Optional<Boolean> optional2, Optional<String> optional3) {
        ETLStoreConfig etlStoreConfig = store.getEtlStoreConfig();
        if ((optional.orElse(false).booleanValue() || optional2.orElse(false).booleanValue()) && ((!optional3.isPresent() || optional3.get().isEmpty()) && (etlStoreConfig.getEtledUserProxyAccount() == null || etlStoreConfig.getEtledUserProxyAccount().isEmpty()))) {
            throw new VeniceException("Cannot enable ETL for this store because etled user proxy account is not set");
        }
        ETLStoreConfigRecord eTLStoreConfigRecord = new ETLStoreConfigRecord();
        eTLStoreConfigRecord.etledUserProxyAccount = optional3.orElse(etlStoreConfig.getEtledUserProxyAccount());
        eTLStoreConfigRecord.regularVersionETLEnabled = optional.orElse(Boolean.valueOf(etlStoreConfig.isRegularVersionETLEnabled())).booleanValue();
        eTLStoreConfigRecord.futureVersionETLEnabled = optional2.orElse(Boolean.valueOf(etlStoreConfig.isFutureVersionETLEnabled())).booleanValue();
        return eTLStoreConfigRecord;
    }

    private void provisionAclsForStore(String str, Optional<String> optional, List<VeniceSystemStoreType> list) {
        if (this.authorizerService.isPresent() && optional.isPresent()) {
            Resource resource = new Resource(str);
            Iterator it = null;
            Iterator it2 = null;
            try {
                JsonNode path = ObjectMapperFactory.getInstance().readTree(optional.get()).path("AccessPermissions");
                if (path.has("Read")) {
                    it = path.path("Read").elements();
                }
                if (path.has("Write")) {
                    it2 = path.path("Write").elements();
                }
                try {
                    AclBinding aclBinding = new AclBinding(resource);
                    if (it != null) {
                        while (it.hasNext()) {
                            aclBinding.addAceEntry(new AceEntry(new Principal(((JsonNode) it.next()).textValue()), Method.Read, Permission.ALLOW));
                        }
                    }
                    if (it2 != null) {
                        while (it2.hasNext()) {
                            aclBinding.addAceEntry(new AceEntry(new Principal(((JsonNode) it2.next()).textValue()), Method.Write, Permission.ALLOW));
                        }
                    }
                    this.authorizerService.get().setAcls(aclBinding);
                    Iterator<VeniceSystemStoreType> it3 = list.iterator();
                    while (it3.hasNext()) {
                        this.authorizerService.get().setAcls(it3.next().generateSystemStoreAclBinding(aclBinding));
                    }
                } catch (Exception e) {
                    LOGGER.error("ACLProvisioning: failure when setting ACL's for store: {}", str, e);
                    throw new VeniceException(e);
                }
            } catch (Exception e2) {
                LOGGER.error("ACLProvisioning: invalid accessPermission schema for store: {}", str, e2);
                throw new VeniceException(e2);
            }
        }
    }

    private String fetchAclsForStore(String str) {
        try {
            AclBinding describeAcls = this.authorizerService.get().describeAcls(new Resource(str));
            if (describeAcls == null) {
                LOGGER.error("ACLProvisioning: null ACL returned for store: {}", str);
                return "";
            }
            if (describeAcls.countAceEntries() == 0) {
                return "";
            }
            JsonNodeFactory jsonNodeFactory = JsonNodeFactory.instance;
            ObjectMapper objectMapperFactory = ObjectMapperFactory.getInstance();
            ObjectNode objectNode = jsonNodeFactory.objectNode();
            ObjectNode objectNode2 = jsonNodeFactory.objectNode();
            ArrayNode arrayNode = jsonNodeFactory.arrayNode();
            ArrayNode arrayNode2 = jsonNodeFactory.arrayNode();
            for (AceEntry aceEntry : describeAcls.getAceEntries()) {
                if (aceEntry.getPermission() == Permission.ALLOW) {
                    if (aceEntry.getMethod() == Method.Read) {
                        arrayNode.add(aceEntry.getPrincipal().getName());
                    } else if (aceEntry.getMethod() == Method.Write) {
                        arrayNode2.add(aceEntry.getPrincipal().getName());
                    }
                }
            }
            objectNode2.replace("Read", arrayNode);
            objectNode2.replace("Write", arrayNode2);
            objectNode.replace("AccessPermissions", objectNode2);
            return objectMapperFactory.writeValueAsString(objectNode);
        } catch (Exception e) {
            LOGGER.error("ACLProvisioning: failure in getting ACL's for store: {}", str, e);
            throw new VeniceException(e);
        }
    }

    private void cleanUpAclsForStore(String str, List<VeniceSystemStoreType> list) {
        if (this.authorizerService.isPresent()) {
            try {
                this.authorizerService.get().clearAcls(new Resource(str));
                Iterator<VeniceSystemStoreType> it = list.iterator();
                while (it.hasNext()) {
                    Resource resource = new Resource(it.next().getSystemStoreName(str));
                    this.authorizerService.get().clearAcls(resource);
                    this.authorizerService.get().clearResource(resource);
                }
            } catch (Exception e) {
                LOGGER.error("ACLProvisioning: failure in deleting ACL's for store: {}", str, e);
                throw new VeniceException(e);
            }
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateAclForStore(String str, String str2, String str3) {
        AutoCloseableLock createStoreWriteLock = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getClusterLockManager().createStoreWriteLock(str2);
        try {
            LOGGER.info("ACLProvisioning: UpdateAcl for store: {} in cluster: {}", str2, str);
            if (!this.authorizerService.isPresent()) {
                throw new VeniceUnsupportedOperationException("updateAclForStore is not supported yet!");
            }
            provisionAclsForStore(str2, Optional.of(str3), VeniceSystemStoreType.getEnabledSystemStoreTypes(getVeniceHelixAdmin().checkPreConditionForAclOp(str, str2)));
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void updateSystemStoreAclForStore(String str, String str2, AclBinding aclBinding) {
        AutoCloseableLock createStoreWriteLock = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getClusterLockManager().createStoreWriteLock(str2);
        try {
            if (!this.authorizerService.isPresent()) {
                throw new VeniceUnsupportedOperationException("updateAclForStore is not supported yet!");
            }
            getVeniceHelixAdmin().checkPreConditionForAclOp(str, str2);
            this.authorizerService.get().setAcls(aclBinding);
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getAclForStore(String str, String str2) {
        AutoCloseableLock createStoreReadLock = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getClusterLockManager().createStoreReadLock(str2);
        try {
            LOGGER.info("ACLProvisioning: GetAcl for store: {} in cluster: {}", str2, str);
            if (!this.authorizerService.isPresent()) {
                throw new VeniceUnsupportedOperationException("getAclForStore is not supported yet!");
            }
            getVeniceHelixAdmin().checkPreConditionForAclOp(str, str2);
            String fetchAclsForStore = fetchAclsForStore(str2);
            if (createStoreReadLock != null) {
                createStoreReadLock.close();
            }
            return fetchAclsForStore;
        } catch (Throwable th) {
            if (createStoreReadLock != null) {
                try {
                    createStoreReadLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void deleteAclForStore(String str, String str2) {
        AutoCloseableLock createStoreWriteLock = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getClusterLockManager().createStoreWriteLock(str2);
        try {
            LOGGER.info("ACLProvisioning: DeleteAcl for store: {} in cluster: {}", str2, str);
            if (!this.authorizerService.isPresent()) {
                throw new VeniceUnsupportedOperationException("deleteAclForStore is not supported yet!");
            }
            Store checkPreConditionForAclOp = getVeniceHelixAdmin().checkPreConditionForAclOp(str, str2);
            if (checkPreConditionForAclOp.isMigrating()) {
                LOGGER.info("Store {} is migrating! Skipping acl deletion!", str2);
            } else {
                cleanUpAclsForStore(str2, VeniceSystemStoreType.getEnabledSystemStoreTypes(checkPreConditionForAclOp));
            }
            if (createStoreWriteLock != null) {
                createStoreWriteLock.close();
            }
        } catch (Throwable th) {
            if (createStoreWriteLock != null) {
                try {
                    createStoreWriteLock.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void configureNativeReplication(String str, VeniceUserStoreType veniceUserStoreType, Optional<String> optional, boolean z, Optional<String> optional2, Optional<String> optional3) {
        ConfigureNativeReplicationForCluster configureNativeReplicationForCluster = (ConfigureNativeReplicationForCluster) AdminMessageType.CONFIGURE_NATIVE_REPLICATION_FOR_CLUSTER.getNewInstance();
        configureNativeReplicationForCluster.clusterName = str;
        configureNativeReplicationForCluster.storeType = veniceUserStoreType.toString();
        configureNativeReplicationForCluster.enabled = z;
        configureNativeReplicationForCluster.nativeReplicationSourceRegion = optional2.orElse(null);
        configureNativeReplicationForCluster.regionsFilter = optional3.orElse(null);
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.CONFIGURE_NATIVE_REPLICATION_FOR_CLUSTER.getValue();
        adminOperation.payloadUnion = configureNativeReplicationForCluster;
        sendAdminMessageAndWaitForConsumed(str, null, adminOperation);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void configureActiveActiveReplication(String str, VeniceUserStoreType veniceUserStoreType, Optional<String> optional, boolean z, Optional<String> optional2) {
        ConfigureActiveActiveReplicationForCluster configureActiveActiveReplicationForCluster = (ConfigureActiveActiveReplicationForCluster) AdminMessageType.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER.getNewInstance();
        configureActiveActiveReplicationForCluster.clusterName = str;
        configureActiveActiveReplicationForCluster.storeType = veniceUserStoreType.toString();
        configureActiveActiveReplicationForCluster.enabled = z;
        configureActiveActiveReplicationForCluster.regionsFilter = optional2.orElse(null);
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER.getValue();
        adminOperation.payloadUnion = configureActiveActiveReplicationForCluster;
        sendAdminMessageAndWaitForConsumed(str, null, adminOperation);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, StoreDataAudit> getClusterStaleStores(String str) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        try {
            for (Map.Entry<String, ControllerClient> entry : getVeniceHelixAdmin().getControllerClientMap(str).entrySet()) {
                entry.getValue().getClusterStores(str).getStoreInfoList().forEach(storeInfo -> {
                    hashMap.putIfAbsent(storeInfo.getName(), new StoreDataAudit());
                    ((StoreDataAudit) hashMap.get(storeInfo.getName())).setStoreName(storeInfo.getName());
                    ((StoreDataAudit) hashMap.get(storeInfo.getName())).insert((String) entry.getKey(), storeInfo);
                });
            }
            for (Map.Entry entry2 : hashMap.entrySet()) {
                StoreDataAudit storeDataAudit = (StoreDataAudit) entry2.getValue();
                Optional<String> topicForCurrentPushJob = getTopicForCurrentPushJob(str, ((StoreDataAudit) entry2.getValue()).getStoreName(), false, false);
                if (storeDataAudit.getStaleRegions().size() > 0 && !topicForCurrentPushJob.isPresent()) {
                    hashMap2.put((String) entry2.getKey(), storeDataAudit);
                }
            }
            return hashMap2;
        } catch (Exception e) {
            throw new VeniceException("Something went wrong trying to fetch stale stores.", e);
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public int getLargestUsedVersionFromStoreGraveyard(String str, String str2) {
        Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
        int largestUsedVersionNumber = getVeniceHelixAdmin().getStoreGraveyard().getLargestUsedVersionNumber(str2);
        Iterator<Map.Entry<String, ControllerClient>> it = controllerClientMap.entrySet().iterator();
        while (it.hasNext()) {
            VersionResponse storeLargestUsedVersion = it.next().getValue().getStoreLargestUsedVersion(str, str2);
            if (storeLargestUsedVersion.getVersion() > largestUsedVersionNumber) {
                largestUsedVersionNumber = storeLargestUsedVersion.getVersion();
            }
        }
        return largestUsedVersionNumber;
    }

    @Override // com.linkedin.venice.controller.Admin
    public ArrayList<StoreInfo> getClusterStores(String str) {
        throw new UnsupportedOperationException("This function has no implementation.");
    }

    @Override // com.linkedin.venice.controller.Admin
    public RegionPushDetails getRegionPushDetails(String str, String str2, boolean z) {
        throw new UnsupportedOperationException("This function has no implementation.");
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, RegionPushDetails> listStorePushInfo(String str, String str2, boolean z) {
        HashMap hashMap = new HashMap();
        try {
            for (Map.Entry<String, ControllerClient> entry : getVeniceHelixAdmin().getControllerClientMap(str).entrySet()) {
                RegionPushDetailsResponse regionPushDetails = entry.getValue().getRegionPushDetails(str2, z);
                if (regionPushDetails != null && regionPushDetails.getRegionPushDetails() != null) {
                    regionPushDetails.getRegionPushDetails().setRegionName(entry.getKey());
                    hashMap.put(entry.getKey(), regionPushDetails.getRegionPushDetails());
                }
            }
            return hashMap;
        } catch (Exception e) {
            throw new VeniceException("Something went wrong trying to get store push info. ", e);
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public void checkResourceCleanupBeforeStoreCreation(String str, String str2) {
        try {
            getVeniceHelixAdmin().checkResourceCleanupBeforeStoreCreation(str, str2, false);
            getVeniceHelixAdmin().getControllerClientMap(str).forEach((str3, controllerClient) -> {
                ControllerResponse checkResourceCleanupForStoreCreation = controllerClient.checkResourceCleanupForStoreCreation(str2);
                if (checkResourceCleanupForStoreCreation.isError()) {
                    throw new VeniceException(checkResourceCleanupForStoreCreation.getError() + " in colo: " + str3);
                }
            });
        } catch (VeniceException e) {
            throw new VeniceException("Encountered the following error during re-creation check, please try to recreate your store later: " + e.getMessage());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public boolean isParent() {
        return getVeniceHelixAdmin().isParent();
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> getChildDataCenterControllerUrlMap(String str) {
        return getVeniceHelixAdmin().getChildDataCenterControllerUrlMap(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public Map<String, String> getChildDataCenterControllerD2Map(String str) {
        return getVeniceHelixAdmin().getChildDataCenterControllerD2Map(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public String getChildControllerD2ServiceName(String str) {
        return getVeniceHelixAdmin().getChildControllerD2ServiceName(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public HelixReadOnlyStoreConfigRepository getStoreConfigRepo() {
        return getVeniceHelixAdmin().getStoreConfigRepo();
    }

    @Override // com.linkedin.venice.controller.Admin
    public HelixReadOnlyZKSharedSystemStoreRepository getReadOnlyZKSharedSystemStoreRepository() {
        return getVeniceHelixAdmin().getReadOnlyZKSharedSystemStoreRepository();
    }

    @Override // com.linkedin.venice.controller.Admin
    public HelixReadOnlyZKSharedSchemaRepository getReadOnlyZKSharedSchemaRepository() {
        return getVeniceHelixAdmin().getReadOnlyZKSharedSchemaRepository();
    }

    @Override // com.linkedin.venice.controller.Admin
    public MetaStoreWriter getMetaStoreWriter() {
        return getVeniceHelixAdmin().getMetaStoreWriter();
    }

    @Override // com.linkedin.venice.controller.Admin
    public Optional<PushStatusStoreRecordDeleter> getPushStatusStoreRecordDeleter() {
        return getVeniceHelixAdmin().getPushStatusStoreRecordDeleter();
    }

    @Override // com.linkedin.venice.controller.Admin
    public Optional<String> getEmergencySourceRegion() {
        return getMultiClusterConfigs().getEmergencySourceRegion().equals("") ? Optional.empty() : Optional.of(getMultiClusterConfigs().getEmergencySourceRegion());
    }

    @Override // com.linkedin.venice.controller.Admin
    public Optional<String> getAggregateRealTimeTopicSource(String str) {
        return getVeniceHelixAdmin().getAggregateRealTimeTopicSource(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<String> getClustersLeaderOf() {
        return getVeniceHelixAdmin().getClustersLeaderOf();
    }

    VeniceHelixAdmin getVeniceHelixAdmin() {
        return this.veniceHelixAdmin;
    }

    private <T> Function<T, T> addToUpdatedConfigList(List<CharSequence> list, String str) {
        return obj -> {
            list.add(str);
            return obj;
        };
    }

    @Override // com.linkedin.venice.controller.Admin
    public long getBackupVersionDefaultRetentionMs() {
        return getVeniceHelixAdmin().getBackupVersionDefaultRetentionMs();
    }

    @Override // com.linkedin.venice.controller.Admin
    public void wipeCluster(String str, String str2, Optional<String> optional, Optional<Integer> optional2) {
        ControllerResponse wipeCluster = getFabricBuildoutControllerClient(str, str2).wipeCluster(str2, optional, optional2);
        if (wipeCluster.isError()) {
            throw new VeniceException("Could not wipe cluster " + str + " in colo: " + str2 + ". " + wipeCluster.getError());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public StoreComparisonInfo compareStore(String str, String str2, String str3, String str4) throws IOException {
        ControllerClient fabricBuildoutControllerClient = getFabricBuildoutControllerClient(str, str3);
        ControllerClient fabricBuildoutControllerClient2 = getFabricBuildoutControllerClient(str, str4);
        StoreComparisonInfo storeComparisonInfo = new StoreComparisonInfo();
        compareStoreProperties(str2, str3, str4, fabricBuildoutControllerClient, fabricBuildoutControllerClient2, storeComparisonInfo);
        compareStoreSchemas(str2, str3, str4, fabricBuildoutControllerClient, fabricBuildoutControllerClient2, storeComparisonInfo);
        compareStoreVersions(str2, str3, str4, fabricBuildoutControllerClient, fabricBuildoutControllerClient2, storeComparisonInfo);
        return storeComparisonInfo;
    }

    private static void compareStoreProperties(String str, String str2, String str3, ControllerClient controllerClient, ControllerClient controllerClient2, StoreComparisonInfo storeComparisonInfo) {
        StoreInfo store = checkControllerResponse(controllerClient.getStore(str), str2).getStore();
        StoreInfo store2 = checkControllerResponse(controllerClient2.getStore(str), str3).getStore();
        ObjectMapper objectMapperFactory = ObjectMapperFactory.getInstance();
        Map map = (Map) objectMapperFactory.convertValue(store, Map.class);
        Map map2 = (Map) objectMapperFactory.convertValue(store2, Map.class);
        for (Map.Entry entry : map.entrySet()) {
            if (!((String) entry.getKey()).equals("coloToCurrentVersions") && !((String) entry.getKey()).equals("versions") && !((String) entry.getKey()).equals("kafkaBrokerUrl") && !Objects.equals(entry.getValue(), map2.get(entry.getKey()))) {
                storeComparisonInfo.addPropertyDiff(str2, str3, (String) entry.getKey(), entry.getValue().toString(), map2.get(entry.getKey()).toString());
            }
        }
    }

    private static void compareStoreSchemas(String str, String str2, String str3, ControllerClient controllerClient, ControllerClient controllerClient2, StoreComparisonInfo storeComparisonInfo) {
        String schemaStr = checkControllerResponse(controllerClient.getKeySchema(str), str2).getSchemaStr();
        String schemaStr2 = checkControllerResponse(controllerClient2.getKeySchema(str), str3).getSchemaStr();
        if (!Objects.equals(schemaStr, schemaStr2)) {
            storeComparisonInfo.addSchemaDiff(str2, str3, "key-schema", schemaStr, schemaStr2);
        }
        populateSchemaDiff(str2, str3, checkControllerResponse(controllerClient.getAllValueAndDerivedSchema(str), str2).getSchemas(), checkControllerResponse(controllerClient2.getAllValueAndDerivedSchema(str), str3).getSchemas(), "derived-schema", storeComparisonInfo);
        populateSchemaDiff(str2, str3, checkControllerResponse(controllerClient.getAllReplicationMetadataSchemas(str), str2).getSchemas(), checkControllerResponse(controllerClient2.getAllReplicationMetadataSchemas(str), str3).getSchemas(), "timestamp-metadata-schema", storeComparisonInfo);
    }

    private static void compareStoreVersions(String str, String str2, String str3, ControllerClient controllerClient, ControllerClient controllerClient2, StoreComparisonInfo storeComparisonInfo) {
        StoreInfo store = checkControllerResponse(controllerClient.getStore(str), str2).getStore();
        StoreInfo store2 = checkControllerResponse(controllerClient2.getStore(str), str3).getStore();
        List<Version> versions = store2.getVersions();
        for (Version version : store.getVersions()) {
            int number = version.getNumber();
            Optional version2 = store2.getVersion(number);
            if (version2.isPresent()) {
                if (!version.getStatus().equals(((Version) version2.get()).getStatus())) {
                    storeComparisonInfo.addVersionStateDiff(str2, str3, number, version.getStatus(), ((Version) version2.get()).getStatus());
                }
                versions.remove(version2.get());
            } else if (store2.getLargestUsedVersionNumber() >= number) {
                storeComparisonInfo.addVersionStateDiff(str2, str3, number, version.getStatus(), VersionStatus.ERROR);
            } else {
                storeComparisonInfo.addVersionStateDiff(str2, str3, number, version.getStatus(), VersionStatus.NOT_CREATED);
            }
        }
        for (Version version3 : versions) {
            if (store.getLargestUsedVersionNumber() >= version3.getNumber()) {
                storeComparisonInfo.addVersionStateDiff(str2, str3, version3.getNumber(), VersionStatus.ERROR, version3.getStatus());
            } else {
                storeComparisonInfo.addVersionStateDiff(str2, str3, version3.getNumber(), VersionStatus.NOT_CREATED, version3.getStatus());
            }
        }
    }

    private static <T extends ControllerResponse> T checkControllerResponse(T t, String str) {
        if (t.isError()) {
            throw new VeniceException("ControllerResponse from fabric " + str + " has error " + t);
        }
        return t;
    }

    private static void populateSchemaDiff(String str, String str2, MultiSchemaResponse.Schema[] schemaArr, MultiSchemaResponse.Schema[] schemaArr2, String str3, StoreComparisonInfo storeComparisonInfo) {
        HashMap hashMap = new HashMap();
        for (MultiSchemaResponse.Schema schema : schemaArr2) {
            hashMap.put(schema.getDerivedSchemaId() == -1 ? "value-schema-" + schema.getId() : str3 + "-" + schema.getId() + "-" + schema.getDerivedSchemaId(), schema.getSchemaStr());
        }
        for (MultiSchemaResponse.Schema schema2 : schemaArr) {
            String str4 = schema2.getDerivedSchemaId() == -1 ? "value-schema-" + schema2.getId() : str3 + "-" + schema2.getId() + "-" + schema2.getDerivedSchemaId();
            if (hashMap.containsKey(str4)) {
                if (!schema2.getSchemaStr().equals(hashMap.get(str4))) {
                    storeComparisonInfo.addSchemaDiff(str, str2, str4, schema2.getSchemaStr(), (String) hashMap.get(str4));
                }
                hashMap.remove(str4);
            } else {
                storeComparisonInfo.addSchemaDiff(str, str2, str4, schema2.getSchemaStr(), "N/A");
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            storeComparisonInfo.addSchemaDiff(str, str2, (String) entry.getKey(), "N/A", (String) entry.getValue());
        }
    }

    @Override // com.linkedin.venice.controller.Admin
    public StoreInfo copyOverStoreSchemasAndConfigs(String str, String str2, String str3, String str4) {
        try {
            ControllerClient fabricBuildoutControllerClient = getFabricBuildoutControllerClient(str, str2);
            ControllerClient fabricBuildoutControllerClient2 = getFabricBuildoutControllerClient(str, str3);
            acquireAdminMessageLock(str, str4);
            try {
                try {
                    long executionId = fabricBuildoutControllerClient.getAdminTopicMetadata(Optional.of(str4)).getExecutionId();
                    StoreInfo store = fabricBuildoutControllerClient.getStore(str4).getStore();
                    String schemaStr = fabricBuildoutControllerClient.getKeySchema(str4).getSchemaStr();
                    MultiSchemaResponse.Schema[] schemas = fabricBuildoutControllerClient.getAllValueAndDerivedSchema(str4).getSchemas();
                    releaseAdminMessageLock(str, str4);
                    Arrays.sort(schemas, new Comparator<MultiSchemaResponse.Schema>() { // from class: com.linkedin.venice.controller.VeniceParentHelixAdmin.1
                        @Override // java.util.Comparator
                        public int compare(MultiSchemaResponse.Schema schema, MultiSchemaResponse.Schema schema2) {
                            int derivedSchemaId = schema.getDerivedSchemaId() - schema2.getDerivedSchemaId();
                            return derivedSchemaId == 0 ? schema.getId() - schema2.getId() : derivedSchemaId;
                        }
                    });
                    fabricBuildoutControllerClient2.createNewStore(store.getName(), store.getOwner(), schemaStr, schemas[0].getSchemaStr());
                    for (int i = 1; i < schemas.length; i++) {
                        MultiSchemaResponse.Schema schema = schemas[i];
                        if (schema.getDerivedSchemaId() == -1) {
                            fabricBuildoutControllerClient2.addValueSchema(store.getName(), schema.getSchemaStr(), schema.getId());
                        } else {
                            fabricBuildoutControllerClient2.addDerivedSchema(store.getName(), schema.getId(), schema.getSchemaStr(), schema.getDerivedSchemaId());
                        }
                    }
                    ControllerResponse updateStore = fabricBuildoutControllerClient2.updateStore(store.getName(), new UpdateStoreQueryParams(store, false));
                    if (updateStore.isError()) {
                        throw new VeniceException("Failed to update store " + updateStore.getError());
                    }
                    ControllerResponse updateAdminTopicMetadata = fabricBuildoutControllerClient2.updateAdminTopicMetadata(executionId, Optional.of(str4), Optional.empty(), Optional.empty());
                    if (updateAdminTopicMetadata.isError()) {
                        throw new VeniceException("Failed to update store's execution id " + updateAdminTopicMetadata.getError());
                    }
                    return store;
                } catch (Throwable th) {
                    releaseAdminMessageLock(str, str4);
                    throw th;
                }
            } catch (Exception e) {
                throw new VeniceException("Error when getting store " + str4 + " metadata from source fabric " + str2 + " Exception: " + e.getMessage());
            }
        } catch (Exception e2) {
            throw new VeniceException("Error copying src fabric's metadata to dest fabric.", e2.getCause());
        }
    }

    LingeringStoreVersionChecker getLingeringStoreVersionChecker() {
        return this.lingeringStoreVersionChecker;
    }

    VeniceControllerMultiClusterConfig getMultiClusterConfigs() {
        return this.multiClusterConfigs;
    }

    UserSystemStoreLifeCycleHelper getSystemStoreLifeCycleHelper() {
        return this.systemStoreLifeCycleHelper;
    }

    private ControllerClient getFabricBuildoutControllerClient(String str, String str2) {
        Map<String, ControllerClient> controllerClientMap = getVeniceHelixAdmin().getControllerClientMap(str);
        if (controllerClientMap.containsKey(str2)) {
            return controllerClientMap.get(str2);
        }
        ControllerClient computeIfAbsent = this.newFabricControllerClientMap.computeIfAbsent(str, str3 -> {
            return new VeniceConcurrentHashMap();
        }).computeIfAbsent(str2, str4 -> {
            VeniceControllerConfig controllerConfig = this.multiClusterConfigs.getControllerConfig(str);
            String childControllerD2ZkHost = controllerConfig.getChildControllerD2ZkHost(str2);
            String d2ServiceName = controllerConfig.getD2ServiceName();
            if (StringUtils.isNotBlank(childControllerD2ZkHost) && StringUtils.isNotBlank(d2ServiceName)) {
                return new D2ControllerClient(d2ServiceName, str, childControllerD2ZkHost, this.sslFactory);
            }
            String childControllerUrl = controllerConfig.getChildControllerUrl(str2);
            if (StringUtils.isNotBlank(childControllerUrl)) {
                return ControllerClient.constructClusterControllerClient(str, childControllerUrl, this.sslFactory, this.token);
            }
            return null;
        });
        if (computeIfAbsent == null) {
            throw new VeniceException("Could not construct child controller client for cluster " + str + " fabric " + str2 + ". child.cluster.d2 or child.cluster.url value is missing in parent controller");
        }
        return computeIfAbsent;
    }

    @Override // com.linkedin.venice.controller.Admin
    public void createStoragePersona(String str, String str2, long j, Set<String> set, Set<String> set2) {
        getVeniceHelixAdmin().checkControllerLeadershipFor(str);
        CreateStoragePersona createStoragePersona = (CreateStoragePersona) AdminMessageType.CREATE_STORAGE_PERSONA.getNewInstance();
        createStoragePersona.setClusterName(str);
        createStoragePersona.setName(str2);
        createStoragePersona.setQuotaNumber(j);
        createStoragePersona.setStoresToEnforce(new ArrayList(set));
        createStoragePersona.setOwners(new ArrayList(set2));
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.CREATE_STORAGE_PERSONA.getValue();
        adminOperation.payloadUnion = createStoragePersona;
        StoragePersonaRepository storagePersonaRepository = getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getStoragePersonaRepository();
        if (storagePersonaRepository.hasPersona(str2)) {
            throw new VeniceException("Persona with name " + str2 + " already exists");
        }
        storagePersonaRepository.validatePersona(str2, j, set, set2);
        sendAdminMessageAndWaitForConsumed(str, null, adminOperation);
    }

    @Override // com.linkedin.venice.controller.Admin
    public StoragePersona getStoragePersona(String str, String str2) {
        return getVeniceHelixAdmin().getStoragePersona(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void deleteStoragePersona(String str, String str2) {
        getVeniceHelixAdmin().checkControllerLeadershipFor(str);
        DeleteStoragePersona deleteStoragePersona = (DeleteStoragePersona) AdminMessageType.DELETE_STORAGE_PERSONA.getNewInstance();
        deleteStoragePersona.setClusterName(str);
        deleteStoragePersona.setName(str2);
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.DELETE_STORAGE_PERSONA.getValue();
        adminOperation.payloadUnion = deleteStoragePersona;
        sendAdminMessageAndWaitForConsumed(str, null, adminOperation);
    }

    @Override // com.linkedin.venice.controller.Admin
    public void updateStoragePersona(String str, String str2, UpdateStoragePersonaQueryParams updateStoragePersonaQueryParams) {
        getVeniceHelixAdmin().checkControllerLeadershipFor(str);
        UpdateStoragePersona updateStoragePersona = (UpdateStoragePersona) AdminMessageType.UPDATE_STORAGE_PERSONA.getNewInstance();
        updateStoragePersona.setClusterName(str);
        updateStoragePersona.setName(str2);
        updateStoragePersona.setQuotaNumber((Long) updateStoragePersonaQueryParams.getQuota().orElse(null));
        updateStoragePersona.setStoresToEnforce((List) updateStoragePersonaQueryParams.getStoresToEnforceAsList().orElse(null));
        updateStoragePersona.setOwners((List) updateStoragePersonaQueryParams.getOwnersAsList().orElse(null));
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.UPDATE_STORAGE_PERSONA.getValue();
        adminOperation.payloadUnion = updateStoragePersona;
        getVeniceHelixAdmin().getHelixVeniceClusterResources(str).getStoragePersonaRepository().validatePersonaUpdate(str2, updateStoragePersonaQueryParams);
        sendAdminMessageAndWaitForConsumed(str, null, adminOperation);
    }

    @Override // com.linkedin.venice.controller.Admin
    public StoragePersona getPersonaAssociatedWithStore(String str, String str2) {
        return getVeniceHelixAdmin().getPersonaAssociatedWithStore(str, str2);
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<StoragePersona> getClusterStoragePersonas(String str) {
        return getVeniceHelixAdmin().getClusterStoragePersonas(str);
    }

    @Override // com.linkedin.venice.controller.Admin
    public List<String> cleanupInstanceCustomizedStates(String str) {
        throw new VeniceUnsupportedOperationException("cleanupInstanceCustomizedStates");
    }

    @Override // com.linkedin.venice.controller.Admin
    public StoreGraveyard getStoreGraveyard() {
        return getVeniceHelixAdmin().getStoreGraveyard();
    }

    @Override // com.linkedin.venice.controller.Admin
    public void removeStoreFromGraveyard(String str, String str2) {
        getVeniceHelixAdmin().checkKafkaTopicAndHelixResource(str, str2, true, false, false);
        getVeniceHelixAdmin().getControllerClientMap(str).forEach((str3, controllerClient) -> {
            ControllerResponse removeStoreFromGraveyard = controllerClient.removeStoreFromGraveyard(str2);
            if (removeStoreFromGraveyard.isError()) {
                if (!ErrorType.RESOURCE_STILL_EXISTS.equals(removeStoreFromGraveyard.getErrorType())) {
                    throw new VeniceException("Error when removing store graveyard " + str2 + " in colo: " + str3 + ". " + removeStoreFromGraveyard.getError());
                }
                throw new ResourceStillExistsException("Store graveyard " + str2 + " is not ready for removal in colo: " + str3);
            }
        });
        getStoreGraveyard().removeStoreFromGraveyard(str, str2);
    }
}
