package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.InvalidVeniceSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.schema.GeneratedSchemaID;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/helix/HelixReadOnlySchemaRepository.class */
public class HelixReadOnlySchemaRepository implements ReadOnlySchemaRepository, StoreDataChangedListener {
    private static final Logger logger = LogManager.getLogger(HelixReadOnlySchemaRepository.class);
    public static final int VALUE_SCHEMA_STARTING_ID = 1;
    private final ZkClient zkClient;
    private final HelixSchemaAccessor accessor;
    private final CachedResourceZkStateListener zkStateListener;
    private final ReadOnlyStoreRepository storeRepository;
    private final Map<String, SchemaData> schemaMap = new VeniceConcurrentHashMap();
    private final IZkChildListener keySchemaChildListener = new KeySchemaChildListener();
    private final IZkChildListener valueSchemaChildListener = new ValueSchemaChildListener();
    private final IZkChildListener derivedSchemaChildListener = new DerivedSchemaChildListener();
    private final IZkChildListener replicationMetadataSchemaChildListener = new ReplicationMetadataSchemaChildListener();
    private final ReadWriteLock schemaLock = new ReentrantReadWriteLock();

    /* loaded from: input_file:com/linkedin/venice/helix/HelixReadOnlySchemaRepository$DerivedSchemaChildListener.class */
    private class DerivedSchemaChildListener extends SchemaChildListener {
        private DerivedSchemaChildListener() {
            super();
        }

        @Override // com.linkedin.venice.helix.HelixReadOnlySchemaRepository.SchemaChildListener
        void handleSchemaChanges(String str, List<String> list) {
            SchemaData schemaData = (SchemaData) HelixReadOnlySchemaRepository.this.schemaMap.get(str);
            for (String str2 : list) {
                String[] split = str2.split("-");
                if (split.length != 2) {
                    throw new VeniceException("unrecognized derivedSchema path format. Store: " + str + " path: " + str2);
                }
                if (schemaData.getDerivedSchema(Integer.parseInt(split[0]), Integer.parseInt(split[1])) == null) {
                    schemaData.addDerivedSchema(HelixReadOnlySchemaRepository.this.accessor.getDerivedSchema(str, str2));
                }
            }
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/HelixReadOnlySchemaRepository$KeySchemaChildListener.class */
    private class KeySchemaChildListener extends SchemaChildListener {
        private KeySchemaChildListener() {
            super();
        }

        @Override // com.linkedin.venice.helix.HelixReadOnlySchemaRepository.SchemaChildListener
        void handleSchemaChanges(String str, List<String> list) {
            ((SchemaData) HelixReadOnlySchemaRepository.this.schemaMap.get(str)).setKeySchema(HelixReadOnlySchemaRepository.this.accessor.getKeySchema(str));
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/HelixReadOnlySchemaRepository$ReplicationMetadataSchemaChildListener.class */
    private class ReplicationMetadataSchemaChildListener extends SchemaChildListener {
        private ReplicationMetadataSchemaChildListener() {
            super();
        }

        @Override // com.linkedin.venice.helix.HelixReadOnlySchemaRepository.SchemaChildListener
        void handleSchemaChanges(String str, List<String> list) {
            SchemaData schemaData = (SchemaData) HelixReadOnlySchemaRepository.this.schemaMap.get(str);
            for (String str2 : list) {
                String[] split = str2.split("-");
                if (split.length != 2) {
                    throw new VeniceException("unrecognized Schema path format. Store: " + str + " path: " + str2);
                }
                if (schemaData.getReplicationMetadataSchema(Integer.parseInt(split[0]), Integer.parseInt(split[1])) == null) {
                    schemaData.addReplicationMetadataSchema(HelixReadOnlySchemaRepository.this.accessor.getReplicationMetadataSchema(str, str2));
                }
            }
        }
    }

    /* loaded from: input_file:com/linkedin/venice/helix/HelixReadOnlySchemaRepository$SchemaChildListener.class */
    private abstract class SchemaChildListener implements IZkChildListener {
        private SchemaChildListener() {
        }

        public void handleChildChange(String str, List<String> list) {
            String extractStoreNameFromSchemaPath = HelixReadOnlySchemaRepository.this.extractStoreNameFromSchemaPath(str);
            if (extractStoreNameFromSchemaPath == null) {
                HelixReadOnlySchemaRepository.logger.error("Invalid schema path: {}.", str);
                return;
            }
            if (list == null) {
                HelixReadOnlySchemaRepository.logger.info("currentChildren is null, which might be triggered by store deletion");
                return;
            }
            HelixReadOnlySchemaRepository.this.schemaLock.writeLock().lock();
            try {
                if (HelixReadOnlySchemaRepository.this.schemaMap.containsKey(extractStoreNameFromSchemaPath)) {
                    handleSchemaChanges(extractStoreNameFromSchemaPath, list);
                } else {
                    HelixReadOnlySchemaRepository.logger.error("Local schemaMap is missing store entry: {}, which should not happen.", extractStoreNameFromSchemaPath);
                }
            } finally {
                HelixReadOnlySchemaRepository.this.schemaLock.writeLock().unlock();
            }
        }

        abstract void handleSchemaChanges(String str, List<String> list);
    }

    /* loaded from: input_file:com/linkedin/venice/helix/HelixReadOnlySchemaRepository$ValueSchemaChildListener.class */
    private class ValueSchemaChildListener extends SchemaChildListener {
        private ValueSchemaChildListener() {
            super();
        }

        @Override // com.linkedin.venice.helix.HelixReadOnlySchemaRepository.SchemaChildListener
        void handleSchemaChanges(String str, List<String> list) {
            SchemaData schemaData = (SchemaData) HelixReadOnlySchemaRepository.this.schemaMap.get(str);
            for (String str2 : list) {
                if (schemaData.getValueSchema(Integer.parseInt(str2)) == null) {
                    schemaData.addValueSchema(HelixReadOnlySchemaRepository.this.accessor.getValueSchema(str, str2));
                }
            }
        }
    }

    public HelixReadOnlySchemaRepository(ReadOnlyStoreRepository readOnlyStoreRepository, ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, String str, int i, long j) {
        this.storeRepository = readOnlyStoreRepository;
        this.zkClient = zkClient;
        this.accessor = new HelixSchemaAccessor(zkClient, helixAdapterSerializer, str, i, j);
        readOnlyStoreRepository.registerStoreDataChangedListener(this);
        this.zkStateListener = new CachedResourceZkStateListener(this, i, j);
    }

    HelixReadOnlySchemaRepository(ReadOnlyStoreRepository readOnlyStoreRepository, ZkClient zkClient, HelixSchemaAccessor helixSchemaAccessor, int i, long j) {
        this.storeRepository = readOnlyStoreRepository;
        this.zkClient = zkClient;
        this.accessor = helixSchemaAccessor;
        readOnlyStoreRepository.registerStoreDataChangedListener(this);
        this.zkStateListener = new CachedResourceZkStateListener(this, i, j);
    }

    private SchemaData getSchemaDataFromCacheOrFetch(String str) {
        Store storeOrThrow = getStoreRepository().getStoreOrThrow(str);
        getSchemaLock().readLock().lock();
        try {
            SchemaData populateSchemaMap = populateSchemaMap(str, storeOrThrow);
            if (populateSchemaMap == null) {
                throw new VeniceNoStoreException(str);
            }
            return populateSchemaMap;
        } finally {
            getSchemaLock().readLock().unlock();
        }
    }

    private Object doSchemaOperation(String str, Function<SchemaData, Object> function) {
        return function.apply(getSchemaDataFromCacheOrFetch(str));
    }

    void maybeRegisterAndPopulateRmdSchema(Store store, SchemaData schemaData) {
        if (store.isActiveActiveReplicationEnabled()) {
            String name = store.getName();
            getAccessor().subscribeReplicationMetadataSchemaCreationChange(name, this.replicationMetadataSchemaChildListener);
            List<RmdSchemaEntry> allReplicationMetadataSchemas = getAccessor().getAllReplicationMetadataSchemas(name);
            Objects.requireNonNull(schemaData);
            allReplicationMetadataSchemas.forEach(schemaData::addReplicationMetadataSchema);
        }
    }

    void maybeRegisterAndPopulateUpdateSchema(Store store, SchemaData schemaData) {
        if (store.isWriteComputationEnabled()) {
            String name = store.getName();
            getAccessor().subscribeDerivedSchemaCreationChange(name, this.derivedSchemaChildListener);
            List<DerivedSchemaEntry> allDerivedSchemas = getAccessor().getAllDerivedSchemas(name);
            Objects.requireNonNull(schemaData);
            allDerivedSchemas.forEach(schemaData::addDerivedSchema);
        }
    }

    SchemaEntry forceRefreshSupersetSchemaWithRetry(String str) {
        Store store = getStoreRepository().getStore(str);
        int latestSuperSetValueSchemaId = store.getLatestSuperSetValueSchemaId();
        AtomicReference atomicReference = new AtomicReference();
        RetryUtils.executeWithMaxAttempt(() -> {
            try {
                getSchemaLock().writeLock().lock();
                SchemaData schemaData = getSchemaMap().get(str);
                forceRefreshSchemaData(store, schemaData);
                if (!isSupersetSchemaReadyToServe(store, schemaData, latestSuperSetValueSchemaId)) {
                    throw new InvalidVeniceSchemaException("Unable to refresh superset schema id: " + latestSuperSetValueSchemaId + " for store: " + store.getName());
                }
                atomicReference.set(schemaData.getValueSchema(latestSuperSetValueSchemaId));
                getSchemaLock().writeLock().unlock();
            } catch (Throwable th) {
                getSchemaLock().writeLock().unlock();
                throw th;
            }
        }, 3, Duration.ofMillis(100L), Collections.singletonList(InvalidVeniceSchemaException.class));
        return (SchemaEntry) atomicReference.get();
    }

    boolean isSupersetSchemaReadyToServe(Store store, SchemaData schemaData, int i) {
        if (schemaData.getValueSchema(i) == null) {
            logger.warn("Superset schema ID: {} for store: {} not found in schema cache.", Integer.valueOf(i), store.getName());
            return false;
        }
        if (store.isWriteComputationEnabled() && !schemaData.hasUpdateSchema(i)) {
            logger.warn("Update schema of superset schema ID: {} for store: {} not found in schema cache.", Integer.valueOf(i), store.getName());
            return false;
        }
        if (!store.isActiveActiveReplicationEnabled() || schemaData.hasRmdSchema(i)) {
            return true;
        }
        logger.warn("RMD schema of superset schema ID: {} for store: {} not found in schema cache.", Integer.valueOf(i), store.getName());
        return false;
    }

    void forceRefreshSchemaData(Store store, SchemaData schemaData) {
        String name = store.getName();
        List<SchemaEntry> allValueSchemas = getAccessor().getAllValueSchemas(name);
        Objects.requireNonNull(schemaData);
        allValueSchemas.forEach(schemaData::addValueSchema);
        if (store.isWriteComputationEnabled()) {
            List<DerivedSchemaEntry> allDerivedSchemas = getAccessor().getAllDerivedSchemas(name);
            Objects.requireNonNull(schemaData);
            allDerivedSchemas.forEach(schemaData::addDerivedSchema);
        }
        if (store.isActiveActiveReplicationEnabled()) {
            List<RmdSchemaEntry> allReplicationMetadataSchemas = getAccessor().getAllReplicationMetadataSchemas(name);
            Objects.requireNonNull(schemaData);
            allReplicationMetadataSchemas.forEach(schemaData::addReplicationMetadataSchema);
        }
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public SchemaEntry getKeySchema(String str) {
        return getSchemaDataFromCacheOrFetch(str).getKeySchema();
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public SchemaEntry getValueSchema(String str, int i) {
        return getValueSchemaInternally(str, i);
    }

    private SchemaEntry getValueSchemaInternally(String str, int i) {
        return getSchemaDataFromCacheOrFetch(str).getValueSchema(i);
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public boolean hasValueSchema(String str, int i) {
        return getValueSchemaInternally(str, i) != null;
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public int getValueSchemaId(String str, String str2) {
        return getSchemaDataFromCacheOrFetch(str).getSchemaID(new SchemaEntry(-1, str2));
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public GeneratedSchemaID getDerivedSchemaId(String str, String str2) {
        return getSchemaDataFromCacheOrFetch(str).getDerivedSchemaId(str2);
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public DerivedSchemaEntry getDerivedSchema(String str, int i, int i2) {
        return getSchemaDataFromCacheOrFetch(str).getDerivedSchema(i, i2);
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public Collection<SchemaEntry> getValueSchemas(String str) {
        return getSchemaDataFromCacheOrFetch(str).getValueSchemas();
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public Collection<DerivedSchemaEntry> getDerivedSchemas(String str) {
        return getSchemaDataFromCacheOrFetch(str).getDerivedSchemas();
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public SchemaEntry getSupersetOrLatestValueSchema(String str) {
        SchemaData schemaDataFromCacheOrFetch = getSchemaDataFromCacheOrFetch(str);
        int intValue = getSupersetSchemaID(str).intValue();
        if (intValue == -1) {
            intValue = schemaDataFromCacheOrFetch.getMaxValueSchemaId();
            if (intValue == -1) {
                throw new VeniceException(str + " doesn't have latest schema!");
            }
        }
        return schemaDataFromCacheOrFetch.getValueSchema(intValue);
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public SchemaEntry getSupersetSchema(String str) {
        SchemaData schemaDataFromCacheOrFetch = getSchemaDataFromCacheOrFetch(str);
        Integer supersetSchemaID = getSupersetSchemaID(str);
        if (supersetSchemaID.intValue() == -1) {
            return null;
        }
        return isSupersetSchemaReadyToServe(getStoreRepository().getStore(str), schemaDataFromCacheOrFetch, supersetSchemaID.intValue()) ? schemaDataFromCacheOrFetch.getValueSchema(supersetSchemaID.intValue()) : forceRefreshSupersetSchemaWithRetry(str);
    }

    private Integer getSupersetSchemaID(String str) {
        return Integer.valueOf(getStoreRepository().getStoreOrThrow(str).getLatestSuperSetValueSchemaId());
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public DerivedSchemaEntry getLatestDerivedSchema(String str, int i) {
        return (DerivedSchemaEntry) doSchemaOperation(str, schemaData -> {
            Optional max = schemaData.getDerivedSchemas().stream().filter(derivedSchemaEntry -> {
                return derivedSchemaEntry.getValueSchemaID() == i;
            }).max(Comparator.comparing((v0) -> {
                return v0.getId();
            }));
            if (max.isPresent()) {
                return max.get();
            }
            throw new VeniceException("Cannot find latest schema for store: " + str + ", value schema id: " + i);
        });
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public RmdSchemaEntry getReplicationMetadataSchema(String str, int i, int i2) {
        return (RmdSchemaEntry) doSchemaOperation(str, schemaData -> {
            return schemaData.getReplicationMetadataSchema(i, i2);
        });
    }

    @Override // com.linkedin.venice.meta.ReadOnlySchemaRepository
    public Collection<RmdSchemaEntry> getReplicationMetadataSchemas(String str) {
        return (Collection) doSchemaOperation(str, schemaData -> {
            return schemaData.getReplicationMetadataSchemas();
        });
    }

    @Override // com.linkedin.venice.VeniceResource
    public void refresh() {
        logger.info("Starting to refresh schema map.");
        this.schemaLock.writeLock().lock();
        try {
            this.schemaMap.keySet().forEach(this::removeStoreSchemaFromLocal);
            this.schemaMap.clear();
            this.zkClient.subscribeStateChanges(this.zkStateListener);
            for (Store store : this.storeRepository.getAllStores()) {
                populateSchemaMap(store.getName(), store);
            }
            logger.info("Finished refreshing schema map.");
        } finally {
            this.schemaLock.writeLock().unlock();
        }
    }

    private SchemaData populateSchemaMap(String str, Store store) {
        return getSchemaMap().computeIfAbsent(str, str2 -> {
            logger.info("Try to fetch schema data for store: {}.", str);
            SchemaData schemaData = new SchemaData(str);
            this.accessor.subscribeKeySchemaCreationChange(str, this.keySchemaChildListener);
            schemaData.setKeySchema(this.accessor.getKeySchema(str));
            this.accessor.subscribeValueSchemaCreationChange(str, this.valueSchemaChildListener);
            List<SchemaEntry> allValueSchemas = this.accessor.getAllValueSchemas(str);
            Objects.requireNonNull(schemaData);
            allValueSchemas.forEach(schemaData::addValueSchema);
            maybeRegisterAndPopulateUpdateSchema(store, schemaData);
            maybeRegisterAndPopulateRmdSchema(store, schemaData);
            return schemaData;
        });
    }

    @Override // com.linkedin.venice.VeniceResource
    public void clear() {
        this.zkClient.unsubscribeStateChanges(this.zkStateListener);
        this.schemaLock.writeLock().lock();
        try {
            ((Set) this.schemaMap.values().stream().map((v0) -> {
                return v0.getStoreName();
            }).collect(Collectors.toSet())).forEach(this::removeStoreSchemaFromLocal);
        } finally {
            this.schemaLock.writeLock().unlock();
        }
    }

    private void removeStoreSchemaFromLocal(String str) {
        this.schemaLock.writeLock().lock();
        try {
            if (this.schemaMap.remove(str) == null) {
                return;
            }
            logger.info("Remove schema for store locally: {}.", str);
            this.accessor.unsubscribeKeySchemaCreationChange(str, this.keySchemaChildListener);
            this.accessor.unsubscribeValueSchemaCreationChange(str, this.valueSchemaChildListener);
            this.accessor.unsubscribeDerivedSchemaCreationChanges(str, this.derivedSchemaChildListener);
            this.accessor.unsubscribeReplicationMetadataSchemaCreationChanges(str, this.replicationMetadataSchemaChildListener);
        } finally {
            this.schemaLock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String extractStoreNameFromSchemaPath(String str) {
        String[] split = str.split("/");
        if (split.length != 5) {
            return null;
        }
        return split[3];
    }

    @Override // com.linkedin.venice.meta.StoreDataChangedListener
    public void handleStoreCreated(Store store) {
    }

    @Override // com.linkedin.venice.meta.StoreDataChangedListener
    public void handleStoreDeleted(String str) {
        removeStoreSchemaFromLocal(str);
    }

    @Override // com.linkedin.venice.meta.StoreDataChangedListener
    public void handleStoreChanged(Store store) {
        String name = store.getName();
        this.schemaLock.readLock().lock();
        try {
            SchemaData populateSchemaMap = populateSchemaMap(name, store);
            this.schemaLock.readLock().unlock();
            maybeRegisterAndPopulateUpdateSchema(store, populateSchemaMap);
            maybeRegisterAndPopulateRmdSchema(store, populateSchemaMap);
        } catch (Throwable th) {
            this.schemaLock.readLock().unlock();
            throw th;
        }
    }

    HelixSchemaAccessor getAccessor() {
        return this.accessor;
    }

    ReadOnlyStoreRepository getStoreRepository() {
        return this.storeRepository;
    }

    Map<String, SchemaData> getSchemaMap() {
        return this.schemaMap;
    }

    ReadWriteLock getSchemaLock() {
        return this.schemaLock;
    }
}
