package com.linkedin.venice.controller.init;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.Utils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/controller/init/ControllerClientBackedSystemSchemaInitializer.class */
public class ControllerClientBackedSystemSchemaInitializer {
    private static final Logger LOGGER = LogManager.getLogger(ControllerClientBackedSystemSchemaInitializer.class);
    private static final String DEFAULT_KEY_SCHEMA_STR = "\"int\"";
    private static final int DEFAULT_RETRY_TIMES = 10;
    private final AvroProtocolDefinition protocolDefinition;
    private final String clusterName;
    private final VeniceHelixAdmin admin;
    private final Schema keySchema;
    private final UpdateStoreQueryParams storeMetadataUpdate;
    private final boolean autoRegisterPartialUpdateSchema;
    private final boolean enforceSSLOnly;
    private ControllerClient controllerClient;

    public ControllerClientBackedSystemSchemaInitializer(AvroProtocolDefinition avroProtocolDefinition, String str, VeniceHelixAdmin veniceHelixAdmin, Schema schema, UpdateStoreQueryParams updateStoreQueryParams, boolean z, boolean z2) {
        this.protocolDefinition = avroProtocolDefinition;
        this.clusterName = str;
        this.admin = veniceHelixAdmin;
        this.keySchema = schema;
        this.storeMetadataUpdate = updateStoreQueryParams;
        this.autoRegisterPartialUpdateSchema = z;
        this.enforceSSLOnly = z2;
    }

    public void execute() {
        String systemStoreName = this.protocolDefinition.getSystemStoreName();
        Map allSchemasFromResources = Utils.getAllSchemasFromResources(this.protocolDefinition);
        this.controllerClient = ControllerClient.constructClusterControllerClient(this.clusterName, this.admin.getLeaderController(this.clusterName).getUrl(this.enforceSSLOnly), this.admin.getSslFactory());
        try {
            String str = (String) this.admin.discoverCluster(systemStoreName).getFirst();
            if (!str.equals(this.clusterName)) {
                LOGGER.warn("The system store for {} already exists in cluster {}, which is inconsistent with the config {} which specifies that it should be in cluster {}. Will abort the initialization routine.", this.protocolDefinition.name(), str, "controller.system.schema.cluster.name", this.clusterName);
                return;
            }
        } catch (VeniceNoStoreException e) {
            checkAndMayCreateSystemStore(systemStoreName, (Schema) allSchemasFromResources.get(1));
        }
        if (this.keySchema != null) {
            checkIfKeySchemaMatches(systemStoreName);
        }
        MultiSchemaResponse retryableRequest = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient -> {
            return controllerClient.getAllValueSchema(systemStoreName);
        });
        if (retryableRequest.isError()) {
            throw new VeniceException("Error when getting all value schemas from system store " + systemStoreName + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest.getError());
        }
        HashMap hashMap = new HashMap();
        Arrays.stream(retryableRequest.getSchemas()).forEach(schema -> {
            hashMap.put(Integer.valueOf(schema.getId()), AvroCompatibilityHelper.parse(new String[]{schema.getSchemaStr()}));
        });
        for (int i = 1; i <= this.protocolDefinition.getCurrentProtocolVersion(); i++) {
            Schema schema2 = (Schema) allSchemasFromResources.get(Integer.valueOf(i));
            if (schema2 == null) {
                throw new VeniceException("Invalid protocol definition: " + this.protocolDefinition.name() + " does not have a version " + i + " even though it is less than or equal to the current version (" + this.protocolDefinition.getCurrentProtocolVersion() + ").");
            }
            checkAndMayRegisterValueSchema(systemStoreName, i, (Schema) hashMap.get(Integer.valueOf(i)), schema2);
            if (this.autoRegisterPartialUpdateSchema) {
                checkAndMayRegisterPartialUpdateSchema(systemStoreName, i, schema2);
            }
        }
    }

    private void checkAndMayCreateSystemStore(String str, Schema schema) {
        StoreResponse retryableRequest = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient -> {
            return controllerClient.getStore(str);
        }, storeResponse -> {
            return Boolean.valueOf(storeResponse.getError().contains("does not exist"));
        });
        if (retryableRequest.isError()) {
            if (!retryableRequest.getError().contains("does not exist")) {
                throw new VeniceException("Error when getting system store " + str + " from cluster " + this.clusterName + " after retries. Error: " + retryableRequest.getError());
            }
            if (schema == null) {
                throw new VeniceException("Protocol definition: " + this.protocolDefinition.name() + " does not have version 1");
            }
            String schema2 = this.keySchema == null ? DEFAULT_KEY_SCHEMA_STR : this.keySchema.toString();
            String schema3 = schema.toString();
            NewStoreResponse retryableRequest2 = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient2 -> {
                return controllerClient2.createNewSystemStore(str, "venice-internal", schema2, schema3);
            }, newStoreResponse -> {
                return Boolean.valueOf(newStoreResponse.getError().contains("already exists"));
            });
            if (retryableRequest2.isError() && !retryableRequest2.getError().contains("already exists")) {
                throw new VeniceException("Error when creating system store " + str + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest2.getError());
            }
            if (this.storeMetadataUpdate != null) {
                ControllerResponse retryableRequest3 = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient3 -> {
                    return controllerClient3.updateStore(str, this.storeMetadataUpdate);
                });
                if (retryableRequest3.isError()) {
                    throw new VeniceException("Error when updating system store " + str + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest3.getError());
                }
                LOGGER.info("System store {} has been created.", str);
            }
        }
    }

    private void checkIfKeySchemaMatches(String str) {
        SchemaResponse retryableRequest = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient -> {
            return controllerClient.getKeySchema(str);
        });
        if (retryableRequest.isError()) {
            throw new VeniceException("Error when getting key schema from system store " + str + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest.getError());
        }
        Schema parse = AvroCompatibilityHelper.parse(new String[]{retryableRequest.getSchemaStr()});
        if (parse.equals(this.keySchema)) {
            return;
        }
        LOGGER.error("Key Schema of {} in cluster {} is already registered but it is INCONSISTENT with the local definition.\nAlready registered: {}\nLocal definition: {}", str, this.clusterName, parse.toString(true), this.keySchema.toString(true));
    }

    private void checkAndMayRegisterValueSchema(String str, int i, Schema schema, Schema schema2) {
        if (schema == null) {
            SchemaResponse retryableRequest = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient -> {
                return controllerClient.addValueSchema(str, schema2.toString(), i);
            });
            if (retryableRequest.isError()) {
                throw new VeniceException("Error when adding value schema " + i + " to system store " + str + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest.getError());
            }
            LOGGER.info("Added new schema v{} to system store {}.", Integer.valueOf(i), str);
            return;
        }
        if (schema.equals(schema2)) {
            LOGGER.info("Schema v{} in system store {} is already registered and consistent with the local definition.", Integer.valueOf(i), str);
        } else {
            LOGGER.warn("Schema v{} in system store {} is already registered but it is INCONSISTENT with the local definition.\nAlready registered: {}\nLocal definition: {}", Integer.valueOf(i), str, schema.toString(true), schema2.toString(true));
        }
    }

    private void checkAndMayRegisterPartialUpdateSchema(String str, int i, Schema schema) {
        String schema2 = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(schema).toString();
        SchemaResponse retryableRequest = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient -> {
            return controllerClient.getValueOrDerivedSchemaId(str, schema2);
        }, schemaResponse -> {
            return Boolean.valueOf(schemaResponse.getError().contains("Can not find any registered value schema nor derived schema"));
        });
        if (retryableRequest.isError()) {
            if (!retryableRequest.getError().contains("Can not find any registered value schema nor derived schema")) {
                throw new VeniceException("Error when getting derived schema from system store " + str + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest.getError());
            }
            SchemaResponse retryableRequest2 = this.controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, controllerClient2 -> {
                return controllerClient2.addDerivedSchema(str, i, schema2);
            });
            if (retryableRequest2.isError()) {
                throw new VeniceException("Error when adding derived schema for value schema v" + i + " to system store " + str + " in cluster " + this.clusterName + " after retries. Error: " + retryableRequest2.getError());
            }
            LOGGER.info("Added derived schema v{} for value schema v{} to system store {}.", Integer.valueOf(retryableRequest2.getDerivedSchemaId()), Integer.valueOf(i), str);
        }
    }
}
