package com.linkedin.venice.client.schema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.AvroSchemaUtils;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/client/schema/RouterBackedSchemaReader.class */
public class RouterBackedSchemaReader implements SchemaReader {
    public static final String TYPE_KEY_SCHEMA = "key_schema";
    public static final String TYPE_VALUE_SCHEMA = "value_schema";
    private final Optional<Schema> readerSchema;
    private volatile Schema keySchema;
    private final Map<Integer, Schema> valueSchemaMap;
    private final Map<Schema, Integer> valueSchemaMapR;
    private final AtomicReference<SchemaEntry> latestValueSchemaEntry;
    private final String storeName;
    private final AbstractAvroStoreClient storeClient;
    private final Predicate<Schema> preferredSchemaFilter;
    private final ICProvider icProvider;
    private static final Logger LOGGER = LogManager.getLogger(RouterBackedSchemaReader.class);
    private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
    private static final Function<SchemaEntry, Schema> SCHEMA_EXTRACTOR = schemaEntry -> {
        return schemaEntry.getSchema();
    };
    private static final Function<SchemaEntry, Integer> SCHEMA_ID_EXTRACTOR = schemaEntry -> {
        return Integer.valueOf(schemaEntry.getId());
    };

    RouterBackedSchemaReader(Supplier<AbstractAvroStoreClient> supplier) throws VeniceClientException {
        this(supplier, Optional.empty(), Optional.empty());
    }

    public RouterBackedSchemaReader(Supplier<AbstractAvroStoreClient> supplier, Optional<Schema> optional, Optional<Predicate<Schema>> optional2) {
        this(supplier, optional, optional2, null);
    }

    public RouterBackedSchemaReader(Supplier<AbstractAvroStoreClient> supplier, Optional<Schema> optional, Optional<Predicate<Schema>> optional2, ICProvider iCProvider) {
        this.valueSchemaMap = new VeniceConcurrentHashMap();
        this.valueSchemaMapR = new VeniceConcurrentHashMap();
        this.latestValueSchemaEntry = new AtomicReference<>();
        this.storeClient = supplier.get();
        this.storeName = this.storeClient.getStoreName();
        this.readerSchema = optional;
        this.preferredSchemaFilter = optional2.orElse(schema -> {
            return true;
        });
        optional.ifPresent(AvroSchemaUtils::validateAvroSchemaStr);
        this.icProvider = iCProvider;
    }

    public Schema getKeySchema() {
        if (this.keySchema != null) {
            return this.keySchema;
        }
        synchronized (this) {
            if (this.keySchema != null) {
                return this.keySchema;
            }
            SchemaEntry fetchKeySchema = fetchKeySchema();
            if (fetchKeySchema == null) {
                throw new VeniceClientException("Key Schema of store: " + this.storeName + " doesn't exist");
            }
            this.keySchema = fetchKeySchema.getSchema();
            return this.keySchema;
        }
    }

    public Schema getValueSchema(int i) {
        Schema schema = this.valueSchemaMap.get(Integer.valueOf(i));
        if (schema != null) {
            return schema;
        }
        try {
            synchronized (this) {
                if (this.valueSchemaMap.get(Integer.valueOf(i)) == null) {
                    refreshAllValueSchema();
                }
            }
            Schema schema2 = this.valueSchemaMap.get(Integer.valueOf(i));
            if (schema2 == null) {
                LOGGER.warn("Got null value schema from Venice for store: {} and id: {}", this.storeName, Integer.valueOf(i));
            }
            return schema2;
        } catch (VeniceException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public Schema getLatestValueSchema() throws VeniceClientException {
        return (Schema) ensureLatestValueSchemaIsFetched(SCHEMA_EXTRACTOR);
    }

    public Integer getLatestValueSchemaId() throws VeniceClientException {
        return (Integer) ensureLatestValueSchemaIsFetched(SCHEMA_ID_EXTRACTOR);
    }

    public int getValueSchemaId(Schema schema) {
        if (this.valueSchemaMapR.containsKey(schema)) {
            return this.valueSchemaMapR.get(schema).intValue();
        }
        synchronized (this) {
            if (!this.valueSchemaMapR.containsKey(schema)) {
                refreshAllValueSchema();
            }
        }
        if (this.valueSchemaMapR.containsKey(schema)) {
            return this.valueSchemaMapR.get(schema).intValue();
        }
        throw new VeniceClientException("Could not find schema: " + schema + ". for store " + this.storeName);
    }

    public void close() throws IOException {
        AbstractAvroStoreClient abstractAvroStoreClient = this.storeClient;
        Logger logger = LOGGER;
        Objects.requireNonNull(logger);
        IOUtils.closeQuietly(abstractAvroStoreClient, (v1) -> {
            r1.error(v1);
        });
    }

    private <T> T ensureLatestValueSchemaIsFetched(Function<SchemaEntry, T> function) {
        if (this.latestValueSchemaEntry.get() == null) {
            synchronized (this) {
                if (this.latestValueSchemaEntry.get() == null) {
                    refreshAllValueSchema();
                }
            }
        }
        SchemaEntry schemaEntry = this.latestValueSchemaEntry.get();
        if (schemaEntry == null) {
            return null;
        }
        return function.apply(schemaEntry);
    }

    private SchemaEntry fetchSingleSchema(String str, boolean z) throws VeniceClientException {
        try {
            CompletableFuture<byte[]> raw = this.icProvider != null ? (CompletableFuture) this.icProvider.call(getClass().getCanonicalName(), () -> {
                return this.storeClient.getRaw(str);
            }) : this.storeClient.getRaw(str);
            byte[] bArr = (byte[]) RetryUtils.executeWithMaxAttempt(() -> {
                return (byte[]) raw.get();
            }, 3, Duration.ofNanos(1L), Arrays.asList(ExecutionException.class));
            if (bArr == null) {
                LOGGER.warn("Requested schema doesn't exist for request path: {}", str);
                return null;
            }
            SchemaResponse schemaResponse = (SchemaResponse) OBJECT_MAPPER.readValue(bArr, SchemaResponse.class);
            if (schemaResponse.isError()) {
                throw new VeniceClientException("Received an error while fetching schema. " + getExceptionDetails(str) + ", error message: " + schemaResponse.getError());
            }
            return new SchemaEntry(schemaResponse.getId(), z ? preemptiveSchemaVerification(Schema.parse(schemaResponse.getSchemaStr()), schemaResponse.getSchemaStr(), schemaResponse.getId()) : Schema.parse(schemaResponse.getSchemaStr()));
        } catch (VeniceClientException e) {
            throw e;
        } catch (Exception e2) {
            throw new VeniceClientException("Got exception while trying to fetch single schema. " + getExceptionDetails(str), e2);
        }
    }

    private String getExceptionDetails(String str) {
        return "Store: " + this.storeName + ", path: " + str + ", storeClient: " + this.storeClient;
    }

    private SchemaEntry fetchKeySchema() throws VeniceClientException {
        return fetchSingleSchema("key_schema/" + this.storeName, false);
    }

    private void refreshAllValueSchema() throws VeniceClientException {
        String str = "value_schema/" + this.storeName;
        try {
            byte[] bArr = (byte[]) RetryUtils.executeWithMaxAttempt(() -> {
                return this.storeClient.getRaw(str).get();
            }, 3, Duration.ofNanos(1L), Arrays.asList(ExecutionException.class));
            if (bArr == null) {
                LOGGER.info("Got null for request path: {}", str);
                return;
            }
            MultiSchemaResponse multiSchemaResponse = (MultiSchemaResponse) OBJECT_MAPPER.readValue(bArr, MultiSchemaResponse.class);
            if (multiSchemaResponse.isError()) {
                throw new VeniceClientException("Received an error while fetching schema. " + getExceptionDetails(str) + ", error message: " + multiSchemaResponse.getError());
            }
            int i = -1;
            int i2 = -1;
            for (MultiSchemaResponse.Schema schema : multiSchemaResponse.getSchemas()) {
                int id = schema.getId();
                String schemaStr = schema.getSchemaStr();
                Schema preemptiveSchemaVerification = preemptiveSchemaVerification(AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(schemaStr), schemaStr, id);
                this.valueSchemaMap.put(Integer.valueOf(id), preemptiveSchemaVerification);
                this.valueSchemaMapR.put(preemptiveSchemaVerification, Integer.valueOf(id));
                if (i == -1 || i < id) {
                    i = id;
                }
                if (this.preferredSchemaFilter.test(preemptiveSchemaVerification) && (i2 == -1 || i2 < id)) {
                    i2 = id;
                }
            }
            synchronized (this) {
                if (i2 != -1) {
                    int i3 = i2;
                    if (this.latestValueSchemaEntry.get() == null || this.latestValueSchemaEntry.get().getId() < i3) {
                        this.latestValueSchemaEntry.set(new SchemaEntry(i3, this.valueSchemaMap.get(Integer.valueOf(i3))));
                    }
                } else if (multiSchemaResponse.getSuperSetSchemaId() != -1) {
                    int superSetSchemaId = multiSchemaResponse.getSuperSetSchemaId();
                    this.latestValueSchemaEntry.set(new SchemaEntry(superSetSchemaId, this.valueSchemaMap.get(Integer.valueOf(superSetSchemaId))));
                } else if (i != -1 && (this.latestValueSchemaEntry.get() == null || this.latestValueSchemaEntry.get().getId() < i)) {
                    this.latestValueSchemaEntry.set(new SchemaEntry(i, this.valueSchemaMap.get(Integer.valueOf(i))));
                }
            }
        } catch (Exception e) {
            throw new VeniceClientException("Got exception while trying to fetch single schema. " + getExceptionDetails(str), e);
        }
    }

    Schema preemptiveSchemaVerification(Schema schema, String str, int i) {
        if (!this.readerSchema.isPresent()) {
            return schema;
        }
        Schema schema2 = schema;
        Schema schema3 = this.readerSchema.get();
        try {
            if (AvroSchemaUtils.schemaResolveHasErrors(schema, schema3)) {
                LOGGER.info("Schema error detected during preemptive schema check for store {} with writer schema id {} Reader schema: {} Writer schema: {}", this.storeName, Integer.valueOf(i), schema3, str);
                schema2 = AvroSchemaUtils.generateSchemaWithNamespace(str, schema3.getNamespace());
                if (AvroSchemaUtils.schemaResolveHasErrors(schema2, schema3)) {
                    LOGGER.info("Schema error cannot be resolved with writer schema namespace fix");
                    schema2 = schema;
                } else {
                    LOGGER.info("Schema error can be resolved with writer schema namespace fix. Replacing writer schema for store {} and schema id {}", this.storeName, Integer.valueOf(i));
                }
            }
        } catch (Exception e) {
            LOGGER.info("Unknown exception during preemptive schema verification", e);
            schema2 = schema;
        }
        return schema2;
    }
}
