package com.linkedin.venice.router;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerApiConstants;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponseV2;
import com.linkedin.venice.controllerapi.LeaderControllerResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoHelixResourceException;
import com.linkedin.venice.helix.HelixHybridStoreQuotaRepository;
import com.linkedin.venice.helix.StoreJSONSerializer;
import com.linkedin.venice.helix.SystemStoreJSONSerializer;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreConfigRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.SystemStore;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus;
import com.linkedin.venice.router.api.RouterResourceType;
import com.linkedin.venice.router.api.VenicePathParser;
import com.linkedin.venice.router.api.VenicePathParserHelper;
import com.linkedin.venice.routerapi.HybridStoreQuotaStatusResponse;
import com.linkedin.venice.routerapi.PushStatusResponse;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.routerapi.ResourceStateResponse;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.NettyUtils;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateExpiredException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@ChannelHandler.Sharable
/* loaded from: input_file:com/linkedin/venice/router/MetaDataHandler.class */
public class MetaDataHandler extends SimpleChannelInboundHandler<HttpRequest> {
    private final RoutingDataRepository routingDataRepository;
    private final ReadOnlySchemaRepository schemaRepo;
    private final ReadOnlyStoreConfigRepository storeConfigRepo;
    private final Map<String, String> clusterToD2Map;
    private final Map<String, String> clusterToServerD2Map;
    private final Optional<HelixHybridStoreQuotaRepository> hybridStoreQuotaRepository;
    private final ReadOnlyStoreRepository storeRepository;
    private final String clusterName;
    private final String zkAddress;
    private final String kafkaBootstrapServers;
    static final String REQUEST_TOPIC_ERROR_WRITES_DISABLED = "Write operations to the store are disabled.";
    static final String REQUEST_TOPIC_ERROR_BATCH_ONLY_STORE = "Online writes are only supported for hybrid stores.";
    static final String REQUEST_TOPIC_ERROR_NO_CURRENT_VERSION = "Store doesn't have an active version. Please push data to the store.";
    static final String REQUEST_TOPIC_ERROR_MISSING_CURRENT_VERSION = "Store has a current version, but the configs for the current version are not present. This is unexpected.";
    static final String REQUEST_TOPIC_ERROR_CURRENT_VERSION_NOT_HYBRID = "Online writes are only supported for stores with a current version capable of receiving hybrid writes.";
    static final String REQUEST_TOPIC_ERROR_FORMAT_UNSUPPORTED_PARTITIONER = "Expected partitioner class %s cannot be found.";
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) MetaDataHandler.class);
    private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
    private static final StoreJSONSerializer STORE_SERIALIZER = new StoreJSONSerializer();
    private static final SystemStoreJSONSerializer SYSTEM_STORE_SERIALIZER = new SystemStoreJSONSerializer();
    private static final RedundantExceptionFilter EXCEPTION_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    static final String REQUEST_TOPIC_ERROR_UNSUPPORTED_REPLICATION_POLICY = "Online writes are only supported for hybrid stores that have " + DataReplicationPolicy.ACTIVE_ACTIVE + " or " + DataReplicationPolicy.NON_AGGREGATE + " data replication policy.";

    public MetaDataHandler(RoutingDataRepository routingDataRepository, ReadOnlySchemaRepository readOnlySchemaRepository, ReadOnlyStoreConfigRepository readOnlyStoreConfigRepository, Map<String, String> map, Map<String, String> map2, ReadOnlyStoreRepository readOnlyStoreRepository, Optional<HelixHybridStoreQuotaRepository> optional, String str, String str2, String str3) {
        this.routingDataRepository = routingDataRepository;
        this.schemaRepo = readOnlySchemaRepository;
        this.storeConfigRepo = readOnlyStoreConfigRepository;
        this.clusterToD2Map = map;
        this.clusterToServerD2Map = map2;
        this.hybridStoreQuotaRepository = optional;
        this.storeRepository = readOnlyStoreRepository;
        this.clusterName = str;
        this.zkAddress = str2;
        this.kafkaBootstrapServers = str3;
    }

    @Override // io.netty.channel.SimpleChannelInboundHandler
    public void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws IOException {
        VenicePathParserHelper parseRequest = VenicePathParserHelper.parseRequest(httpRequest);
        switch (parseRequest.getResourceType()) {
            case TYPE_LEADER_CONTROLLER:
            case TYPE_LEADER_CONTROLLER_LEGACY:
                handleControllerLookup(channelHandlerContext);
                return;
            case TYPE_KEY_SCHEMA:
                handleKeySchemaLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_VALUE_SCHEMA:
                handleValueSchemaLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_LATEST_VALUE_SCHEMA:
                handleLatestValueSchemaLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_GET_UPDATE_SCHEMA:
                handleUpdateSchemaLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_CLUSTER_DISCOVERY:
                handleD2ServiceLookup(channelHandlerContext, parseRequest, httpRequest.headers());
                return;
            case TYPE_RESOURCE_STATE:
                handleResourceStateLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_PUSH_STATUS:
                handlePushStatusLookUp(channelHandlerContext, parseRequest);
                return;
            case TYPE_STREAM_HYBRID_STORE_QUOTA:
                handleStreamHybridStoreQuotaStatusLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_STREAM_REPROCESSING_HYBRID_STORE_QUOTA:
                handleStreamReprocessingHybridStoreQuotaStatusLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_STORE_STATE:
                handleStoreStateLookup(channelHandlerContext, parseRequest);
                return;
            case TYPE_REQUEST_TOPIC:
                handleRequestTopic(channelHandlerContext, parseRequest, httpRequest);
                return;
            default:
                ReferenceCountUtil.retain(httpRequest);
                channelHandlerContext.fireChannelRead((Object) httpRequest);
                return;
        }
    }

    private void handleControllerLookup(ChannelHandlerContext channelHandlerContext) throws IOException {
        LeaderControllerResponse leaderControllerResponse = new LeaderControllerResponse();
        leaderControllerResponse.setCluster(this.clusterName);
        leaderControllerResponse.setUrl(this.routingDataRepository.getLeaderController().getUrl());
        LOGGER.info("For cluster: {}, the leader controller url: {}, last refreshed at {}", leaderControllerResponse.getCluster(), leaderControllerResponse.getUrl(), Long.valueOf(this.routingDataRepository.getLeaderControllerChangeTimeMs()));
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(leaderControllerResponse), true, channelHandlerContext);
    }

    private void handleKeySchemaLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/" + VenicePathParser.TYPE_KEY_SCHEMA + "/${storeName}");
        SchemaEntry keySchema = this.schemaRepo.getKeySchema(resourceName);
        if (keySchema == null) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Key schema for store: " + resourceName + " doesn't exist").getBytes(), false, channelHandlerContext);
            return;
        }
        SchemaResponse schemaResponse = new SchemaResponse();
        schemaResponse.setCluster(this.clusterName);
        schemaResponse.setName(resourceName);
        schemaResponse.setId(keySchema.getId());
        schemaResponse.setSchemaStr(keySchema.getSchema().toString());
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(schemaResponse), true, channelHandlerContext);
    }

    private void handleValueSchemaLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/" + VenicePathParser.TYPE_VALUE_SCHEMA + "/${storeName} or /" + VenicePathParser.TYPE_VALUE_SCHEMA + "/${storeName}/${valueSchemaId}");
        String key = venicePathParserHelper.getKey();
        if (key != null && !key.isEmpty()) {
            SchemaResponse schemaResponse = new SchemaResponse();
            schemaResponse.setCluster(this.clusterName);
            schemaResponse.setName(resourceName);
            SchemaEntry valueSchema = this.schemaRepo.getValueSchema(resourceName, Integer.parseInt(key));
            if (valueSchema == null) {
                NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Value schema doesn't exist for schema id: " + key + " of store: " + resourceName).getBytes(), false, channelHandlerContext);
                return;
            }
            schemaResponse.setId(valueSchema.getId());
            schemaResponse.setSchemaStr(valueSchema.getSchema().toString());
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(schemaResponse), true, channelHandlerContext);
            return;
        }
        MultiSchemaResponse multiSchemaResponse = new MultiSchemaResponse();
        multiSchemaResponse.setCluster(this.clusterName);
        multiSchemaResponse.setName(resourceName);
        int latestSuperSetValueSchemaId = this.storeRepository.getStore(resourceName).getLatestSuperSetValueSchemaId();
        if (latestSuperSetValueSchemaId != -1) {
            multiSchemaResponse.setSuperSetSchemaId(latestSuperSetValueSchemaId);
        }
        Collection<SchemaEntry> valueSchemas = this.schemaRepo.getValueSchemas(resourceName);
        MultiSchemaResponse.Schema[] schemaArr = new MultiSchemaResponse.Schema[valueSchemas.size()];
        for (SchemaEntry schemaEntry : valueSchemas) {
            int id = schemaEntry.getId();
            schemaArr[id - 1] = new MultiSchemaResponse.Schema();
            schemaArr[id - 1].setId(id);
            schemaArr[id - 1].setSchemaStr(schemaEntry.getSchema().toString());
        }
        multiSchemaResponse.setSchemas(schemaArr);
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(multiSchemaResponse), true, channelHandlerContext);
    }

    private void handleLatestValueSchemaLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/" + RouterResourceType.TYPE_LATEST_VALUE_SCHEMA + "/${storeName}");
        SchemaResponse schemaResponse = new SchemaResponse();
        schemaResponse.setCluster(this.clusterName);
        schemaResponse.setName(resourceName);
        SchemaEntry supersetOrLatestValueSchema = this.schemaRepo.getSupersetOrLatestValueSchema(resourceName);
        if (supersetOrLatestValueSchema == null) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.INTERNAL_SERVER_ERROR, ("Latest value schema doesn't exist for store: " + resourceName).getBytes(), false, channelHandlerContext);
        } else {
            schemaResponse.setId(supersetOrLatestValueSchema.getId());
            schemaResponse.setSchemaStr(supersetOrLatestValueSchema.getSchemaStr());
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(schemaResponse), true, channelHandlerContext);
        }
    }

    private void handleUpdateSchemaLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/" + RouterResourceType.TYPE_GET_UPDATE_SCHEMA + "/${storeName} or /" + RouterResourceType.TYPE_GET_UPDATE_SCHEMA + "/${storeName}/${valueSchemaId}");
        String key = venicePathParserHelper.getKey();
        if (key != null && !key.isEmpty()) {
            int parseInt = Integer.parseInt(key);
            Optional<DerivedSchemaEntry> latestUpdateSchemaWithValueSchemaId = getLatestUpdateSchemaWithValueSchemaId(resourceName, parseInt);
            if (!latestUpdateSchemaWithValueSchemaId.isPresent()) {
                NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Update schema doesn't exist for value schema id: " + key + " of store: " + resourceName).getBytes(), false, channelHandlerContext);
                return;
            }
            SchemaResponse schemaResponse = new SchemaResponse();
            schemaResponse.setCluster(this.clusterName);
            schemaResponse.setName(resourceName);
            schemaResponse.setId(parseInt);
            schemaResponse.setDerivedSchemaId(latestUpdateSchemaWithValueSchemaId.get().getId());
            schemaResponse.setSchemaStr(latestUpdateSchemaWithValueSchemaId.get().getSchemaStr());
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(schemaResponse), true, channelHandlerContext);
            return;
        }
        MultiSchemaResponse multiSchemaResponse = new MultiSchemaResponse();
        multiSchemaResponse.setCluster(this.clusterName);
        multiSchemaResponse.setName(resourceName);
        int latestSuperSetValueSchemaId = this.storeRepository.getStore(resourceName).getLatestSuperSetValueSchemaId();
        if (latestSuperSetValueSchemaId != -1) {
            multiSchemaResponse.setSuperSetSchemaId(latestSuperSetValueSchemaId);
        }
        Collection<DerivedSchemaEntry> derivedSchemas = this.schemaRepo.getDerivedSchemas(resourceName);
        MultiSchemaResponse.Schema[] schemaArr = new MultiSchemaResponse.Schema[derivedSchemas.size()];
        for (DerivedSchemaEntry derivedSchemaEntry : derivedSchemas) {
            int valueSchemaID = derivedSchemaEntry.getValueSchemaID();
            schemaArr[valueSchemaID - 1] = new MultiSchemaResponse.Schema();
            schemaArr[valueSchemaID - 1].setSchemaStr(derivedSchemaEntry.getSchema().toString());
            schemaArr[valueSchemaID - 1].setDerivedSchemaId(derivedSchemaEntry.getId());
            schemaArr[valueSchemaID - 1].setId(valueSchemaID);
        }
        multiSchemaResponse.setSchemas(schemaArr);
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(multiSchemaResponse), true, channelHandlerContext);
    }

    private void handleD2ServiceLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper, HttpHeaders httpHeaders) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/" + VenicePathParser.TYPE_CLUSTER_DISCOVERY + "/${storeName}");
        Optional<StoreConfig> storeConfig = this.storeConfigRepo.getStoreConfig(resourceName);
        if (!storeConfig.isPresent() || StringUtils.isEmpty(storeConfig.get().getCluster())) {
            setupErrorD2DiscoveryResponseAndFlush(HttpResponseStatus.NOT_FOUND, "Cluster for store: " + resourceName + " doesn't exist", httpHeaders, channelHandlerContext);
            return;
        }
        String cluster = storeConfig.get().getCluster();
        String d2ServiceByClusterName = getD2ServiceByClusterName(cluster);
        if (StringUtils.isEmpty(d2ServiceByClusterName)) {
            setupErrorD2DiscoveryResponseAndFlush(HttpResponseStatus.NOT_FOUND, "D2 service for store: " + resourceName + " doesn't exist", httpHeaders, channelHandlerContext);
            return;
        }
        String serverD2ServiceByClusterName = getServerD2ServiceByClusterName(cluster);
        if (!httpHeaders.contains(D2ServiceDiscoveryResponseV2.D2_SERVICE_DISCOVERY_RESPONSE_V2_ENABLED)) {
            D2ServiceDiscoveryResponse d2ServiceDiscoveryResponse = new D2ServiceDiscoveryResponse();
            d2ServiceDiscoveryResponse.setCluster(storeConfig.get().getCluster());
            d2ServiceDiscoveryResponse.setName(storeConfig.get().getStoreName());
            d2ServiceDiscoveryResponse.setD2Service(d2ServiceByClusterName);
            d2ServiceDiscoveryResponse.setServerD2Service(serverD2ServiceByClusterName);
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(d2ServiceDiscoveryResponse), true, channelHandlerContext);
            return;
        }
        D2ServiceDiscoveryResponseV2 d2ServiceDiscoveryResponseV2 = new D2ServiceDiscoveryResponseV2();
        d2ServiceDiscoveryResponseV2.setCluster(storeConfig.get().getCluster());
        d2ServiceDiscoveryResponseV2.setName(storeConfig.get().getStoreName());
        d2ServiceDiscoveryResponseV2.setD2Service(d2ServiceByClusterName);
        d2ServiceDiscoveryResponseV2.setServerD2Service(serverD2ServiceByClusterName);
        d2ServiceDiscoveryResponseV2.setZkAddress(this.zkAddress);
        d2ServiceDiscoveryResponseV2.setKafkaBootstrapServers(this.kafkaBootstrapServers);
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(d2ServiceDiscoveryResponseV2), true, channelHandlerContext);
    }

    private void setupErrorD2DiscoveryResponseAndFlush(HttpResponseStatus httpResponseStatus, String str, HttpHeaders httpHeaders, ChannelHandlerContext channelHandlerContext) throws IOException {
        D2ServiceDiscoveryResponse d2ServiceDiscoveryResponseV2 = httpHeaders.contains(D2ServiceDiscoveryResponseV2.D2_SERVICE_DISCOVERY_RESPONSE_V2_ENABLED) ? new D2ServiceDiscoveryResponseV2() : new D2ServiceDiscoveryResponse();
        d2ServiceDiscoveryResponseV2.setError(str);
        if (httpResponseStatus.equals(HttpResponseStatus.NOT_FOUND)) {
            d2ServiceDiscoveryResponseV2.setErrorType(ErrorType.STORE_NOT_FOUND);
        }
        NettyUtils.setupResponseAndFlush(httpResponseStatus, OBJECT_MAPPER.writeValueAsBytes(d2ServiceDiscoveryResponseV2), true, channelHandlerContext);
    }

    private void handleResourceStateLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        boolean z = true;
        checkResourceName(resourceName, "/" + VenicePathParser.TYPE_RESOURCE_STATE + "/${resourceName}");
        if (!Version.isVersionTopic(resourceName)) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, ("Invalid resource name: " + resourceName).getBytes(), false, channelHandlerContext);
            return;
        }
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(resourceName);
        if (this.storeRepository.getStore(parseStoreFromKafkaTopicName) == null) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Cannot fetch the state for resource: " + resourceName + " because the store: " + parseStoreFromKafkaTopicName + " cannot be found in cluster: " + this.clusterName).getBytes(), false, channelHandlerContext);
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < this.routingDataRepository.getNumberOfPartitions(resourceName); i++) {
            try {
                List<ReplicaState> replicaStates = this.routingDataRepository.getReplicaStates(resourceName, i);
                if (replicaStates.isEmpty()) {
                    arrayList2.add(Integer.valueOf(i));
                } else {
                    if (z) {
                        z = replicaStates.stream().filter((v0) -> {
                            return v0.isReadyToServe();
                        }).count() > ((long) (replicaStates.size() / 2));
                    }
                    arrayList.addAll(replicaStates);
                }
            } catch (VeniceNoHelixResourceException e) {
                NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Cannot find metadata for resource: " + resourceName).getBytes(), false, channelHandlerContext);
                return;
            }
        }
        ResourceStateResponse resourceStateResponse = new ResourceStateResponse();
        if (!arrayList2.isEmpty()) {
            resourceStateResponse.setUnretrievablePartitions(arrayList2);
            resourceStateResponse.setError("Unable to retrieve replica states for partition(s): " + Arrays.toString(arrayList2.toArray()));
            z = false;
        }
        resourceStateResponse.setCluster(this.clusterName);
        resourceStateResponse.setName(resourceName);
        resourceStateResponse.setReplicaStates(arrayList);
        resourceStateResponse.setReadyToServe(z);
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(resourceStateResponse), true, channelHandlerContext);
    }

    private void handlePushStatusLookUp(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/push_status/${resourceName}");
        if (!this.storeConfigRepo.getStoreConfig(Version.parseStoreFromKafkaTopicName(resourceName)).isPresent()) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Cannot fetch the push status for resource: " + resourceName + " because the store: " + Version.parseStoreFromKafkaTopicName(resourceName) + " cannot be found").getBytes(), false, channelHandlerContext);
        } else {
            PushStatusResponse pushStatusResponse = new PushStatusResponse();
            pushStatusResponse.setName(resourceName);
            pushStatusResponse.setError("Only support getting push status for stores running in Leader/Follower mode");
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, OBJECT_MAPPER.writeValueAsBytes(pushStatusResponse), true, channelHandlerContext);
        }
    }

    private void handleStreamHybridStoreQuotaStatusLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/stream_hybrid_store_quota/${storeName}");
        if (this.storeConfigRepo.getStoreConfig(resourceName).isPresent()) {
            prepareHybridStoreQuotaStatusResponse(Version.composeKafkaTopic(resourceName, this.storeRepository.getStore(resourceName).getCurrentVersion()), channelHandlerContext);
        } else {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Cannot fetch the hybrid store quota status for store: " + resourceName + " because the store: " + resourceName + " cannot be found").getBytes(), false, channelHandlerContext);
        }
    }

    private void handleStreamReprocessingHybridStoreQuotaStatusLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/stream_reprocessing_hybrid_store_quota/${resourceName}");
        if (this.storeConfigRepo.getStoreConfig(Version.parseStoreFromKafkaTopicName(resourceName)).isPresent()) {
            prepareHybridStoreQuotaStatusResponse(resourceName, channelHandlerContext);
        } else {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Cannot fetch the hybrid store quota status for resource: " + resourceName + " because the store: " + Version.parseStoreFromKafkaTopicName(resourceName) + " cannot be found").getBytes(), false, channelHandlerContext);
        }
    }

    private void handleStoreStateLookup(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper) throws IOException {
        byte[] serialize;
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/store_state/${storeName}");
        Store store = this.storeRepository.getStore(resourceName);
        if (store == null) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, ("Cannot fetch the store state for store: " + resourceName + " because the store cannot be found in cluster: " + this.clusterName).getBytes(), false, channelHandlerContext);
            return;
        }
        if (store instanceof SystemStore) {
            serialize = SYSTEM_STORE_SERIALIZER.serialize(((SystemStore) store).getSerializableSystemStore(), (String) null);
        } else {
            serialize = STORE_SERIALIZER.serialize(store, (String) null);
        }
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, serialize, true, channelHandlerContext);
    }

    private void handleRequestTopic(ChannelHandlerContext channelHandlerContext, VenicePathParserHelper venicePathParserHelper, HttpRequest httpRequest) throws IOException {
        HybridStoreConfig hybridStoreConfig;
        String resourceName = venicePathParserHelper.getResourceName();
        checkResourceName(resourceName, "/" + VenicePathParser.TYPE_REQUEST_TOPIC + "/${storeName}");
        Store store = this.storeRepository.getStore(resourceName);
        if (!store.isEnableWrites()) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, REQUEST_TOPIC_ERROR_WRITES_DISABLED.getBytes(), false, channelHandlerContext);
            return;
        }
        if (!store.isHybrid()) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, REQUEST_TOPIC_ERROR_BATCH_ONLY_STORE.getBytes(), false, channelHandlerContext);
            return;
        }
        int currentVersion = store.getCurrentVersion();
        if (currentVersion == 0) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, REQUEST_TOPIC_ERROR_NO_CURRENT_VERSION.getBytes(), false, channelHandlerContext);
            return;
        }
        Optional<Version> version = store.getVersion(currentVersion);
        if (!version.isPresent()) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.INTERNAL_SERVER_ERROR, REQUEST_TOPIC_ERROR_MISSING_CURRENT_VERSION.getBytes(), false, channelHandlerContext);
            return;
        }
        Version version2 = version.get();
        if (!version2.isUseVersionLevelHybridConfig()) {
            hybridStoreConfig = store.getHybridStoreConfig();
        } else {
            if (version2.getHybridStoreConfig() == null) {
                NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, REQUEST_TOPIC_ERROR_CURRENT_VERSION_NOT_HYBRID.getBytes(), false, channelHandlerContext);
                return;
            }
            hybridStoreConfig = version2.getHybridStoreConfig();
        }
        DataReplicationPolicy dataReplicationPolicy = hybridStoreConfig.getDataReplicationPolicy();
        if (!dataReplicationPolicy.equals(DataReplicationPolicy.NON_AGGREGATE) && !dataReplicationPolicy.equals(DataReplicationPolicy.ACTIVE_ACTIVE)) {
            NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, REQUEST_TOPIC_ERROR_UNSUPPORTED_REPLICATION_POLICY.getBytes(), false, channelHandlerContext);
            return;
        }
        PartitionerConfig partitionerConfig = store.getPartitionerConfig();
        Map<String, String> extractQueryParameters = venicePathParserHelper.extractQueryParameters(httpRequest);
        if (extractQueryParameters.get(ControllerApiConstants.PARTITIONERS) != null) {
            boolean z = false;
            String[] split = extractQueryParameters.get(ControllerApiConstants.PARTITIONERS).split(",");
            int length = split.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                if (split[i].equals(partitionerConfig.getPartitionerClass())) {
                    z = true;
                    break;
                }
                i++;
            }
            if (!z) {
                NettyUtils.setupResponseAndFlush(HttpResponseStatus.BAD_REQUEST, String.format(REQUEST_TOPIC_ERROR_FORMAT_UNSUPPORTED_PARTITIONER, partitionerConfig.getPartitionerClass()).getBytes(), false, channelHandlerContext);
                return;
            }
        }
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setCluster(this.clusterName);
        versionCreationResponse.setName(resourceName);
        versionCreationResponse.setPartitions(version2.getPartitionCount());
        versionCreationResponse.setKafkaTopic(Version.composeRealTimeTopic(resourceName));
        versionCreationResponse.setCompressionStrategy(CompressionStrategy.NO_OP);
        versionCreationResponse.setAmplificationFactor(1);
        versionCreationResponse.setKafkaBootstrapServers(this.kafkaBootstrapServers);
        versionCreationResponse.setDaVinciPushStatusStoreEnabled(store.isDaVinciPushStatusStoreEnabled());
        versionCreationResponse.setPartitionerClass(partitionerConfig.getPartitionerClass());
        versionCreationResponse.setPartitionerParams(partitionerConfig.getPartitionerParams());
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(versionCreationResponse), true, channelHandlerContext);
    }

    private void prepareHybridStoreQuotaStatusResponse(String str, ChannelHandlerContext channelHandlerContext) throws IOException {
        HybridStoreQuotaStatusResponse hybridStoreQuotaStatusResponse = new HybridStoreQuotaStatusResponse();
        hybridStoreQuotaStatusResponse.setName(str);
        if (this.hybridStoreQuotaRepository.isPresent()) {
            hybridStoreQuotaStatusResponse.setQuotaStatus(this.hybridStoreQuotaRepository.get().getHybridStoreQuotaStatus(str));
        } else {
            hybridStoreQuotaStatusResponse.setQuotaStatus(HybridStoreQuotaStatus.UNKNOWN);
        }
        NettyUtils.setupResponseAndFlush(HttpResponseStatus.OK, OBJECT_MAPPER.writeValueAsBytes(hybridStoreQuotaStatusResponse), true, channelHandlerContext);
    }

    private String getD2ServiceByClusterName(String str) {
        return this.clusterToD2Map.get(str);
    }

    private String getServerD2ServiceByClusterName(String str) {
        return this.clusterToServerD2Map.get(str);
    }

    private void checkResourceName(String str, String str2) {
        if (StringUtils.isEmpty(str)) {
            throw new VeniceException("Resource name required, valid path should be : " + str2);
        }
    }

    private Optional<DerivedSchemaEntry> getLatestUpdateSchemaWithValueSchemaId(String str, int i) {
        DerivedSchemaEntry derivedSchemaEntry = null;
        for (DerivedSchemaEntry derivedSchemaEntry2 : this.schemaRepo.getDerivedSchemas(str)) {
            if (derivedSchemaEntry2.getValueSchemaID() == i && (derivedSchemaEntry == null || derivedSchemaEntry2.getId() > derivedSchemaEntry.getId())) {
                derivedSchemaEntry = derivedSchemaEntry2;
            }
        }
        return Optional.ofNullable(derivedSchemaEntry);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
        String str = inetSocketAddress.getHostName() + ":" + inetSocketAddress.getPort();
        if (!EXCEPTION_FILTER.isRedundantException(inetSocketAddress.getHostName(), th)) {
            LOGGER.error("Got exception while handling meta data request from {}. ", str, th);
        }
        try {
            try {
                if (ExceptionUtils.recursiveClassEquals(th, CertificateExpiredException.class)) {
                    NettyUtils.setupResponseAndFlush(HttpResponseStatus.UNAUTHORIZED, "Your certificate has expired. Please renew.".getBytes(), false, channelHandlerContext);
                    LOGGER.info("Sent an error message to client about expired certificate");
                } else {
                    NettyUtils.setupResponseAndFlush(HttpResponseStatus.INTERNAL_SERVER_ERROR, ExceptionUtils.stackTraceToString(th).getBytes(), false, channelHandlerContext);
                }
                channelHandlerContext.channel().close();
            } catch (Exception e) {
                LOGGER.error("Got exception while trying to send error response", (Throwable) e);
                channelHandlerContext.channel().close();
            }
        } catch (Throwable th2) {
            channelHandlerContext.channel().close();
            throw th2;
        }
    }
}
