package com.linkedin.venice.listener;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.listener.response.MetadataResponse;
import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.storage.DiskHealthCheckService;
import com.linkedin.davinci.storage.MetadataRetriever;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.chunking.BatchGetChunkingAdapter;
import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.VeniceConstants;
import com.linkedin.venice.cleaner.ResourceReadUsageTracker;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.compute.ComputeOperationUtils;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.compute.ReadComputeOperator;
import com.linkedin.venice.compute.protocol.request.ComputeOperation;
import com.linkedin.venice.compute.protocol.request.enums.ComputeOperationType;
import com.linkedin.venice.compute.protocol.request.router.ComputeRouterRequestKeyV1;
import com.linkedin.venice.compute.protocol.response.ComputeResponseRecordV1;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.listener.request.AdminRequest;
import com.linkedin.venice.listener.request.ComputeRouterRequestWrapper;
import com.linkedin.venice.listener.request.DictionaryFetchRequest;
import com.linkedin.venice.listener.request.GetRouterRequest;
import com.linkedin.venice.listener.request.HealthCheckRequest;
import com.linkedin.venice.listener.request.MetadataFetchRequest;
import com.linkedin.venice.listener.request.MultiGetRouterRequestWrapper;
import com.linkedin.venice.listener.request.RouterRequest;
import com.linkedin.venice.listener.response.BinaryResponse;
import com.linkedin.venice.listener.response.ComputeResponseWrapper;
import com.linkedin.venice.listener.response.HttpShortcutResponse;
import com.linkedin.venice.listener.response.MultiGetResponseWrapper;
import com.linkedin.venice.listener.response.StorageResponseObject;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.PartitionerConfigImpl;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.read.protocol.request.router.MultiGetRouterRequestKeyV1;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.streaming.StreamingConstants;
import com.linkedin.venice.streaming.StreamingUtils;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ComputeUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponseStatus;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntLists;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.util.Utf8;
import org.apache.log4j.varia.ExternallyRolledFileAppender;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@ChannelHandler.Sharable
/* loaded from: input_file:com/linkedin/venice/listener/StorageReadRequestsHandler.class */
public class StorageReadRequestsHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) StorageReadRequestsHandler.class);
    private static final byte[] BINARY_DECODER_PARAM = new byte[16];
    private final DiskHealthCheckService diskHealthCheckService;
    private final ThreadPoolExecutor executor;
    private final ThreadPoolExecutor computeExecutor;
    private final StorageEngineRepository storageEngineRepository;
    private final ReadOnlyStoreRepository metadataRepository;
    private final ReadOnlySchemaRepository schemaRepo;
    private final MetadataRetriever metadataRetriever;
    private final boolean fastAvroEnabled;
    private final boolean parallelBatchGetEnabled;
    private final int parallelBatchGetChunkSize;
    private final boolean keyValueProfilingEnabled;
    private final VeniceServerConfig serverConfig;
    private final StorageEngineBackedCompressorFactory compressorFactory;
    private final Optional<ResourceReadUsageTracker> resourceReadUsageTracker;
    private final Map<String, VenicePartitioner> resourceToPartitionerMap = new VeniceConcurrentHashMap();
    private final Map<String, PartitionerConfig> resourceToPartitionConfigMap = new VeniceConcurrentHashMap();
    private final ThreadLocal<StorageExecReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(() -> {
        return new StorageExecReusableObjects();
    });
    private final Map<Utf8, Schema> computeResultSchemaCache = new VeniceConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/listener/StorageReadRequestsHandler$StorageExecReusableObjects.class */
    public static class StorageExecReusableObjects extends AvroSerializer.AvroSerializerReusableObjects {
        final ByteBuffer reusedByteBuffer;
        final LinkedHashMap<Schema, GenericRecord> reuseValueRecordMap;
        final LinkedHashMap<Schema, GenericRecord> reuseResultRecordMap;
        final BinaryDecoder binaryDecoder;

        private StorageExecReusableObjects() {
            this.reusedByteBuffer = ByteBuffer.allocate(1048576);
            this.reuseValueRecordMap = new LinkedHashMap<Schema, GenericRecord>(100, 0.75f, true) { // from class: com.linkedin.venice.listener.StorageReadRequestsHandler.StorageExecReusableObjects.1
                @Override // java.util.LinkedHashMap
                protected boolean removeEldestEntry(Map.Entry<Schema, GenericRecord> entry) {
                    return size() > 100;
                }
            };
            this.reuseResultRecordMap = new LinkedHashMap<Schema, GenericRecord>(100, 0.75f, true) { // from class: com.linkedin.venice.listener.StorageReadRequestsHandler.StorageExecReusableObjects.2
                @Override // java.util.LinkedHashMap
                protected boolean removeEldestEntry(Map.Entry<Schema, GenericRecord> entry) {
                    return size() > 100;
                }
            };
            this.binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(StorageReadRequestsHandler.BINARY_DECODER_PARAM, 0, StorageReadRequestsHandler.BINARY_DECODER_PARAM.length, null);
        }
    }

    public StorageReadRequestsHandler(ThreadPoolExecutor threadPoolExecutor, ThreadPoolExecutor threadPoolExecutor2, StorageEngineRepository storageEngineRepository, ReadOnlyStoreRepository readOnlyStoreRepository, ReadOnlySchemaRepository readOnlySchemaRepository, MetadataRetriever metadataRetriever, DiskHealthCheckService diskHealthCheckService, boolean z, boolean z2, int i, VeniceServerConfig veniceServerConfig, StorageEngineBackedCompressorFactory storageEngineBackedCompressorFactory, Optional<ResourceReadUsageTracker> optional) {
        this.executor = threadPoolExecutor;
        this.computeExecutor = threadPoolExecutor2;
        this.storageEngineRepository = storageEngineRepository;
        this.metadataRepository = readOnlyStoreRepository;
        this.schemaRepo = readOnlySchemaRepository;
        this.metadataRetriever = metadataRetriever;
        this.diskHealthCheckService = diskHealthCheckService;
        this.fastAvroEnabled = z;
        this.parallelBatchGetEnabled = z2;
        this.parallelBatchGetChunkSize = i;
        this.keyValueProfilingEnabled = veniceServerConfig.isKeyValueProfilingEnabled();
        this.serverConfig = veniceServerConfig;
        this.compressorFactory = storageEngineBackedCompressorFactory;
        this.resourceReadUsageTracker = optional;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        long nanoTime = System.nanoTime();
        if (obj instanceof RouterRequest) {
            RouterRequest routerRequest = (RouterRequest) obj;
            this.resourceReadUsageTracker.ifPresent(resourceReadUsageTracker -> {
                resourceReadUsageTracker.recordReadUsage(routerRequest.getResourceName());
            });
            if (routerRequest.shouldRequestBeTerminatedEarly()) {
                channelHandlerContext.writeAndFlush(new HttpShortcutResponse(new VeniceRequestEarlyTerminationException(routerRequest.getStoreName()).getMessage(), VeniceRequestEarlyTerminationException.getHttpResponseStatus()));
                return;
            } else if (this.parallelBatchGetEnabled && routerRequest.getRequestType().equals(RequestType.MULTI_GET)) {
                handleMultiGetRequestInParallel((MultiGetRouterRequestWrapper) routerRequest, this.parallelBatchGetChunkSize).whenComplete((readResponse, th) -> {
                    if (th == null) {
                        channelHandlerContext.writeAndFlush(readResponse);
                        return;
                    }
                    if (th instanceof VeniceRequestEarlyTerminationException) {
                        channelHandlerContext.writeAndFlush(new HttpShortcutResponse(((VeniceRequestEarlyTerminationException) th).getMessage(), VeniceRequestEarlyTerminationException.getHttpResponseStatus()));
                    } else if (th instanceof VeniceNoStoreException) {
                        channelHandlerContext.writeAndFlush(new HttpShortcutResponse("No storage exists for: " + ((VeniceNoStoreException) th).getStoreName(), HttpResponseStatus.BAD_REQUEST));
                    } else {
                        LOGGER.error("Exception thrown in parallel batch get for {}", routerRequest.getResourceName(), th);
                        channelHandlerContext.writeAndFlush(new HttpShortcutResponse(th.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR));
                    }
                });
                return;
            } else {
                ThreadPoolExecutor executor = getExecutor(routerRequest.getRequestType());
                executor.submit(() -> {
                    ReadResponse handleComputeRequest;
                    try {
                        if (routerRequest.shouldRequestBeTerminatedEarly()) {
                            throw new VeniceRequestEarlyTerminationException(routerRequest.getStoreName());
                        }
                        double latencyInMS = LatencyUtils.getLatencyInMS(nanoTime);
                        int size = executor.getQueue().size();
                        switch (routerRequest.getRequestType()) {
                            case SINGLE_GET:
                                handleComputeRequest = handleSingleGetRequest((GetRouterRequest) routerRequest);
                                break;
                            case MULTI_GET:
                                handleComputeRequest = handleMultiGetRequest((MultiGetRouterRequestWrapper) routerRequest);
                                break;
                            case COMPUTE:
                                handleComputeRequest = handleComputeRequest((ComputeRouterRequestWrapper) obj);
                                break;
                            default:
                                throw new VeniceException("Unknown request type: " + routerRequest.getRequestType());
                        }
                        handleComputeRequest.setStorageExecutionSubmissionWaitTime(latencyInMS);
                        handleComputeRequest.setStorageExecutionQueueLen(size);
                        handleComputeRequest.setRCU(ReadQuotaEnforcementHandler.getRcu(routerRequest));
                        if (routerRequest.isStreamingRequest()) {
                            handleComputeRequest.setStreamingResponse();
                        }
                        channelHandlerContext.writeAndFlush(handleComputeRequest);
                    } catch (VeniceNoStoreException e) {
                        channelHandlerContext.writeAndFlush(new HttpShortcutResponse("No storage exists for: " + e.getStoreName(), HttpResponseStatus.BAD_REQUEST));
                    } catch (VeniceRequestEarlyTerminationException e2) {
                        channelHandlerContext.writeAndFlush(new HttpShortcutResponse(e2.getMessage(), VeniceRequestEarlyTerminationException.getHttpResponseStatus()));
                    } catch (Exception e3) {
                        LOGGER.error("Exception thrown for {}", routerRequest.getResourceName(), e3);
                        channelHandlerContext.writeAndFlush(new HttpShortcutResponse(e3.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR));
                    }
                });
                return;
            }
        }
        if (obj instanceof HealthCheckRequest) {
            if (this.diskHealthCheckService.isDiskHealthy()) {
                channelHandlerContext.writeAndFlush(new HttpShortcutResponse(ExternallyRolledFileAppender.OK, HttpResponseStatus.OK));
                return;
            } else {
                channelHandlerContext.writeAndFlush(new HttpShortcutResponse("Venice storage node hardware is not healthy!", HttpResponseStatus.INTERNAL_SERVER_ERROR));
                LOGGER.error("Disk is not healthy according to the disk health check service: {}", this.diskHealthCheckService.getErrorMessage());
                return;
            }
        }
        if (obj instanceof DictionaryFetchRequest) {
            channelHandlerContext.writeAndFlush(handleDictionaryFetchRequest((DictionaryFetchRequest) obj));
            return;
        }
        if (obj instanceof AdminRequest) {
            channelHandlerContext.writeAndFlush(handleServerAdminRequest((AdminRequest) obj));
        } else if (obj instanceof MetadataFetchRequest) {
            channelHandlerContext.writeAndFlush(handleMetadataFetchRequest((MetadataFetchRequest) obj));
        } else {
            channelHandlerContext.writeAndFlush(new HttpShortcutResponse("Unrecognized object in StorageExecutionHandler", HttpResponseStatus.INTERNAL_SERVER_ERROR));
        }
    }

    private ThreadPoolExecutor getExecutor(RequestType requestType) {
        switch (requestType) {
            case SINGLE_GET:
            case MULTI_GET:
                return this.executor;
            case COMPUTE:
                return this.computeExecutor;
            default:
                throw new VeniceException("Request type " + requestType + " is not supported.");
        }
    }

    private VenicePartitioner getPartitioner(String str, PartitionerConfig partitionerConfig) {
        return this.resourceToPartitionerMap.computeIfAbsent(str, str2 -> {
            Properties properties = new Properties();
            if (partitionerConfig.getPartitionerParams() != null) {
                properties.putAll(partitionerConfig.getPartitionerParams());
            }
            return PartitionUtils.getVenicePartitioner(partitionerConfig.getPartitionerClass(), 1, new VeniceProperties(properties));
        });
    }

    private int getSubPartitionId(int i, String str, PartitionerConfig partitionerConfig, byte[] bArr) {
        if (partitionerConfig == null || partitionerConfig.getAmplificationFactor() == 1) {
            return i;
        }
        return (i * partitionerConfig.getAmplificationFactor()) + getPartitioner(str, partitionerConfig).getPartitionId(bArr, partitionerConfig.getAmplificationFactor());
    }

    private int getSubPartitionId(int i, String str, PartitionerConfig partitionerConfig, ByteBuffer byteBuffer) {
        if (partitionerConfig == null || partitionerConfig.getAmplificationFactor() == 1) {
            return i;
        }
        return (i * partitionerConfig.getAmplificationFactor()) + getPartitioner(str, partitionerConfig).getPartitionId(byteBuffer, partitionerConfig.getAmplificationFactor());
    }

    private PartitionerConfig getPartitionerConfig(String str) {
        return this.resourceToPartitionConfigMap.computeIfAbsent(str, str2 -> {
            try {
                PartitionerConfig partitionerConfig = null;
                Optional<Version> version = this.metadataRepository.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(str)).getVersion(Version.parseVersionFromKafkaTopicName(str));
                if (version.isPresent()) {
                    partitionerConfig = version.get().getPartitionerConfig();
                    if (partitionerConfig == null) {
                        return new PartitionerConfigImpl();
                    }
                }
                return partitionerConfig;
            } catch (Exception e) {
                LOGGER.error("Can not acquire partitionerConfig. ", (Throwable) e);
                return null;
            }
        });
    }

    private ReadResponse handleSingleGetRequest(GetRouterRequest getRouterRequest) {
        String resourceName = getRouterRequest.getResourceName();
        int subPartitionId = getSubPartitionId(getRouterRequest.getPartition(), resourceName, getPartitionerConfig(resourceName), getRouterRequest.getKeyBytes());
        byte[] keyBytes = getRouterRequest.getKeyBytes();
        AbstractStorageEngine storageEngine = getStorageEngine(resourceName);
        boolean isChunked = storageEngine.isChunked();
        StorageResponseObject storageResponseObject = new StorageResponseObject();
        storageResponseObject.setCompressionStrategy(storageEngine.getCompressionStrategy());
        storageResponseObject.setDatabaseLookupLatency(0.0d);
        ValueRecord valueRecord = SingleGetChunkingAdapter.get(storageEngine, subPartitionId, keyBytes, isChunked, storageResponseObject);
        storageResponseObject.setValueRecord(valueRecord);
        if (this.keyValueProfilingEnabled) {
            storageResponseObject.setKeySizeList(IntLists.singleton(keyBytes.length));
            storageResponseObject.setValueSizeList(IntLists.singleton(storageResponseObject.isFound() ? valueRecord.getDataSize() : -1));
        }
        return storageResponseObject;
    }

    private CompletableFuture<ReadResponse> handleMultiGetRequestInParallel(MultiGetRouterRequestWrapper multiGetRouterRequestWrapper, int i) {
        String resourceName = multiGetRouterRequestWrapper.getResourceName();
        Iterable<MultiGetRouterRequestKeyV1> keys = multiGetRouterRequestWrapper.getKeys();
        AbstractStorageEngine storageEngine = getStorageEngine(resourceName);
        MultiGetResponseWrapper multiGetResponseWrapper = new MultiGetResponseWrapper(multiGetRouterRequestWrapper.getKeyCount());
        multiGetResponseWrapper.setCompressionStrategy(storageEngine.getCompressionStrategy());
        multiGetResponseWrapper.setDatabaseLookupLatency(0.0d);
        boolean isChunked = storageEngine.isChunked();
        ThreadPoolExecutor executor = getExecutor(RequestType.MULTI_GET);
        if (!(keys instanceof ArrayList)) {
            throw new VeniceException("'keys' in MultiGetResponseWrapper should be an ArrayList");
        }
        ArrayList arrayList = (ArrayList) keys;
        int size = arrayList.size();
        int ceil = (int) Math.ceil(size / i);
        ReentrantLock reentrantLock = new ReentrantLock();
        CompletableFuture[] completableFutureArr = new CompletableFuture[ceil];
        PartitionerConfig partitionerConfig = getPartitionerConfig(multiGetRouterRequestWrapper.getResourceName());
        IntArrayList intArrayList = this.keyValueProfilingEnabled ? new IntArrayList(size) : null;
        IntArrayList intArrayList2 = this.keyValueProfilingEnabled ? new IntArrayList(size) : null;
        for (int i2 = 0; i2 < ceil; i2++) {
            int i3 = i2;
            completableFutureArr[i2] = CompletableFuture.runAsync(() -> {
                if (multiGetRouterRequestWrapper.shouldRequestBeTerminatedEarly()) {
                    throw new VeniceRequestEarlyTerminationException(multiGetRouterRequestWrapper.getStoreName());
                }
                int min = Math.min((i3 + 1) * i, size);
                for (int i4 = i3 * i; i4 < min; i4++) {
                    MultiGetRouterRequestKeyV1 multiGetRouterRequestKeyV1 = (MultiGetRouterRequestKeyV1) arrayList.get(i4);
                    if (intArrayList != null) {
                        intArrayList.set(i4, multiGetRouterRequestKeyV1.keyBytes.remaining());
                    }
                    MultiGetResponseRecordV1 multiGetResponseRecordV1 = BatchGetChunkingAdapter.get(storageEngine, getSubPartitionId(multiGetRouterRequestKeyV1.partitionId, resourceName, partitionerConfig, multiGetRouterRequestKeyV1.keyBytes), multiGetRouterRequestKeyV1.keyBytes, isChunked, multiGetResponseWrapper);
                    if (multiGetResponseRecordV1 != null) {
                        multiGetResponseRecordV1.keyIndex = multiGetRouterRequestKeyV1.keyIndex;
                    } else if (multiGetRouterRequestWrapper.isStreamingRequest()) {
                        multiGetResponseRecordV1 = new MultiGetResponseRecordV1();
                        multiGetResponseRecordV1.keyIndex = Math.negateExact(multiGetRouterRequestKeyV1.keyIndex);
                        multiGetResponseRecordV1.schemaId = StreamingConstants.NON_EXISTING_KEY_SCHEMA_ID;
                        multiGetResponseRecordV1.value = StreamingUtils.EMPTY_BYTE_BUFFER;
                    }
                    if (multiGetResponseRecordV1 != null) {
                        if (intArrayList2 != null) {
                            intArrayList2.set(i4, multiGetResponseRecordV1.value.remaining());
                        }
                        reentrantLock.lock();
                        try {
                            multiGetResponseWrapper.addRecord(multiGetResponseRecordV1);
                            reentrantLock.unlock();
                        } catch (Throwable th) {
                            reentrantLock.unlock();
                            throw th;
                        }
                    } else if (intArrayList2 != null) {
                        intArrayList2.set(i4, -1);
                    }
                }
            }, executor);
        }
        return CompletableFuture.allOf(completableFutureArr).handle((r7, th) -> {
            if (th != null) {
                throw new VeniceException(th);
            }
            multiGetResponseWrapper.setKeySizeList(intArrayList);
            multiGetResponseWrapper.setValueSizeList(intArrayList2);
            return multiGetResponseWrapper;
        });
    }

    private ReadResponse handleMultiGetRequest(MultiGetRouterRequestWrapper multiGetRouterRequestWrapper) {
        String resourceName = multiGetRouterRequestWrapper.getResourceName();
        Iterable<MultiGetRouterRequestKeyV1> keys = multiGetRouterRequestWrapper.getKeys();
        PartitionerConfig partitionerConfig = getPartitionerConfig(multiGetRouterRequestWrapper.getResourceName());
        AbstractStorageEngine storageEngine = getStorageEngine(resourceName);
        MultiGetResponseWrapper multiGetResponseWrapper = new MultiGetResponseWrapper(multiGetRouterRequestWrapper.getKeyCount());
        multiGetResponseWrapper.setCompressionStrategy(storageEngine.getCompressionStrategy());
        multiGetResponseWrapper.setDatabaseLookupLatency(0.0d);
        boolean isChunked = storageEngine.isChunked();
        for (MultiGetRouterRequestKeyV1 multiGetRouterRequestKeyV1 : keys) {
            MultiGetResponseRecordV1 multiGetResponseRecordV1 = BatchGetChunkingAdapter.get(storageEngine, getSubPartitionId(multiGetRouterRequestKeyV1.partitionId, resourceName, partitionerConfig, multiGetRouterRequestKeyV1.keyBytes), multiGetRouterRequestKeyV1.keyBytes, isChunked, multiGetResponseWrapper);
            if (multiGetResponseRecordV1 != null) {
                multiGetResponseRecordV1.keyIndex = multiGetRouterRequestKeyV1.keyIndex;
            } else if (multiGetRouterRequestWrapper.isStreamingRequest()) {
                multiGetResponseRecordV1 = new MultiGetResponseRecordV1();
                multiGetResponseRecordV1.keyIndex = Math.negateExact(multiGetRouterRequestKeyV1.keyIndex);
                multiGetResponseRecordV1.schemaId = StreamingConstants.NON_EXISTING_KEY_SCHEMA_ID;
                multiGetResponseRecordV1.value = StreamingUtils.EMPTY_BYTE_BUFFER;
            }
            if (multiGetResponseRecordV1 != null) {
                multiGetResponseWrapper.addRecord(multiGetResponseRecordV1);
            }
        }
        return multiGetResponseWrapper;
    }

    private ReadResponse handleComputeRequest(ComputeRouterRequestWrapper computeRouterRequestWrapper) {
        String resourceName = computeRouterRequestWrapper.getResourceName();
        String storeName = computeRouterRequestWrapper.getStoreName();
        Iterable<ComputeRouterRequestKeyV1> keys = computeRouterRequestWrapper.getKeys();
        AbstractStorageEngine storageEngine = getStorageEngine(resourceName);
        PartitionerConfig partitionerConfig = getPartitionerConfig(computeRouterRequestWrapper.getResourceName());
        Schema schema = computeRouterRequestWrapper.getValueSchemaId() != -1 ? this.schemaRepo.getValueSchema(storeName, computeRouterRequestWrapper.getValueSchemaId()).getSchema() : this.schemaRepo.getSupersetOrLatestValueSchema(storeName).getSchema();
        ComputeRequestWrapper computeRequest = computeRouterRequestWrapper.getComputeRequest();
        Utf8 utf8 = (Utf8) computeRequest.getResultSchemaStr();
        Schema schema2 = this.computeResultSchemaCache.get(utf8);
        if (schema2 == null) {
            schema2 = Schema.parse(utf8.toString());
            ComputeUtils.checkResultSchema(schema2, schema, computeRequest.getComputeRequestVersion(), computeRequest.getOperations());
            this.computeResultSchemaCache.putIfAbsent(utf8, schema2);
        }
        ComputeResponseWrapper computeResponseWrapper = new ComputeResponseWrapper(computeRouterRequestWrapper.getKeyCount());
        CompressionStrategy compressionStrategy = storageEngine.getCompressionStrategy();
        boolean isChunked = storageEngine.isChunked();
        computeResponseWrapper.setReadComputeDeserializationLatency(0.0d);
        computeResponseWrapper.setDatabaseLookupLatency(0.0d);
        computeResponseWrapper.setReadComputeSerializationLatency(0.0d);
        computeResponseWrapper.setReadComputeLatency(0.0d);
        computeResponseWrapper.setCompressionStrategy(CompressionStrategy.NO_OP);
        StorageExecReusableObjects storageExecReusableObjects = this.threadLocalReusableObjects.get();
        Schema schema3 = schema;
        GenericRecord computeIfAbsent = storageExecReusableObjects.reuseValueRecordMap.computeIfAbsent(schema, schema4 -> {
            return new GenericData.Record(schema3);
        });
        Schema schema5 = schema2;
        GenericRecord computeIfAbsent2 = storageExecReusableObjects.reuseResultRecordMap.computeIfAbsent(schema2, schema6 -> {
            return new GenericData.Record(schema5);
        });
        ByteBuffer byteBuffer = storageExecReusableObjects.reusedByteBuffer;
        RecordSerializer<GenericRecord> fastAvroGenericSerializer = this.fastAvroEnabled ? FastSerializerDeserializerFactory.getFastAvroGenericSerializer(schema2) : SerializerDeserializerFactory.getAvroGenericSerializer(schema2);
        HashMap hashMap = new HashMap();
        VeniceCompressor compressor = this.compressorFactory.getCompressor(compressionStrategy, resourceName);
        for (ComputeRouterRequestKeyV1 computeRouterRequestKeyV1 : keys) {
            clearFieldsInReusedRecord(computeIfAbsent2, schema2);
            ComputeResponseRecordV1 computeResult = computeResult(storageEngine, storeName, computeRouterRequestKeyV1.keyBytes, computeRouterRequestKeyV1.keyIndex, getSubPartitionId(computeRouterRequestKeyV1.partitionId, resourceName, partitionerConfig, computeRouterRequestKeyV1.keyBytes), computeRequest.getComputeRequestVersion(), computeRequest.getOperations(), compressionStrategy, schema2, fastAvroGenericSerializer, computeIfAbsent, computeIfAbsent2, storageExecReusableObjects, isChunked, computeRouterRequestWrapper.isStreamingRequest(), computeResponseWrapper, hashMap, byteBuffer, compressor);
            if (computeResult != null) {
                computeResponseWrapper.addRecord(computeResult);
            }
        }
        return computeResponseWrapper;
    }

    private BinaryResponse handleDictionaryFetchRequest(DictionaryFetchRequest dictionaryFetchRequest) {
        return new BinaryResponse(this.metadataRetriever.getStoreVersionCompressionDictionary(dictionaryFetchRequest.getResourceName()));
    }

    private MetadataResponse handleMetadataFetchRequest(MetadataFetchRequest metadataFetchRequest) {
        return this.metadataRetriever.getMetadata(metadataFetchRequest.getStoreName());
    }

    private void clearFieldsInReusedRecord(GenericRecord genericRecord, Schema schema) {
        for (int i = 0; i < schema.getFields().size(); i++) {
            genericRecord.put(i, (Object) null);
        }
    }

    private ComputeResponseRecordV1 computeResult(AbstractStorageEngine abstractStorageEngine, String str, ByteBuffer byteBuffer, int i, int i2, int i3, List<ComputeOperation> list, CompressionStrategy compressionStrategy, Schema schema, RecordSerializer<GenericRecord> recordSerializer, GenericRecord genericRecord, GenericRecord genericRecord2, StorageExecReusableObjects storageExecReusableObjects, boolean z, boolean z2, ComputeResponseWrapper computeResponseWrapper, Map<String, Object> map, ByteBuffer byteBuffer2, VeniceCompressor veniceCompressor) {
        GenericRecord genericRecord3 = (GenericRecord) GenericRecordChunkingAdapter.INSTANCE.get(str, abstractStorageEngine, i2, ByteUtils.extractByteArray(byteBuffer), byteBuffer2, (ByteBuffer) genericRecord, storageExecReusableObjects.binaryDecoder, z, compressionStrategy, this.fastAvroEnabled, this.schemaRepo, (ReadResponse) computeResponseWrapper, veniceCompressor);
        if (genericRecord3 == null) {
            if (!z2) {
                return null;
            }
            ComputeResponseRecordV1 computeResponseRecordV1 = new ComputeResponseRecordV1();
            computeResponseRecordV1.keyIndex = Math.negateExact(i);
            computeResponseRecordV1.value = StreamingUtils.EMPTY_BYTE_BUFFER;
            return computeResponseRecordV1;
        }
        long nanoTime = System.nanoTime();
        HashMap hashMap = new HashMap();
        for (ComputeOperation computeOperation : list) {
            ReadComputeOperator operator = ComputeOperationType.valueOf(computeOperation).getOperator();
            String orElse = ComputeOperationUtils.validateNullableFieldAndGetErrorMsg(operator, genericRecord3, operator.getOperatorFieldName(computeOperation)).orElse(null);
            if (orElse != null) {
                operator.putDefaultResult(genericRecord2, operator.getResultFieldName(computeOperation));
                hashMap.put(operator.getResultFieldName(computeOperation), orElse);
            } else {
                incrementOperatorCount(computeResponseWrapper, computeOperation);
                operator.compute(i3, computeOperation, genericRecord3, genericRecord2, hashMap, map);
            }
        }
        for (Schema.Field field : schema.getFields()) {
            if (genericRecord2.get(field.pos()) == null) {
                if (field.name().equals(VeniceConstants.VENICE_COMPUTATION_ERROR_MAP_FIELD_NAME)) {
                    genericRecord2.put(field.pos(), hashMap);
                } else {
                    genericRecord2.put(field.pos(), genericRecord3.get(field.name()));
                }
            }
        }
        computeResponseWrapper.addReadComputeLatency(LatencyUtils.getLatencyInMS(nanoTime));
        ComputeResponseRecordV1 computeResponseRecordV12 = new ComputeResponseRecordV1();
        computeResponseRecordV12.keyIndex = i;
        long nanoTime2 = System.nanoTime();
        computeResponseRecordV12.value = ByteBuffer.wrap(recordSerializer.serialize(genericRecord2, storageExecReusableObjects));
        computeResponseWrapper.addReadComputeSerializationLatency(LatencyUtils.getLatencyInMS(nanoTime2));
        return computeResponseRecordV12;
    }

    private AdminResponse handleServerAdminRequest(AdminRequest adminRequest) {
        switch (adminRequest.getServerAdminAction()) {
            case DUMP_INGESTION_STATE:
                String storeVersion = adminRequest.getStoreVersion();
                Integer partition = adminRequest.getPartition();
                return this.metadataRetriever.getConsumptionSnapshots(storeVersion, partition == null ? ComplementSet.universalSet() : ComplementSet.of(partition));
            case DUMP_SERVER_CONFIGS:
                AdminResponse adminResponse = new AdminResponse();
                if (this.serverConfig == null) {
                    adminResponse.setError(true);
                    adminResponse.setMessage("Server config doesn't exist");
                } else {
                    adminResponse.addServerConfigs(this.serverConfig.getClusterProperties().toProperties());
                }
                return adminResponse;
            default:
                throw new VeniceException("Not a valid admin action: " + adminRequest.getServerAdminAction().toString());
        }
    }

    private void incrementOperatorCount(ComputeResponseWrapper computeResponseWrapper, ComputeOperation computeOperation) {
        switch (ComputeOperationType.valueOf(computeOperation)) {
            case DOT_PRODUCT:
                computeResponseWrapper.incrementDotProductCount();
                return;
            case COSINE_SIMILARITY:
                computeResponseWrapper.incrementCosineSimilarityCount();
                return;
            case HADAMARD_PRODUCT:
                computeResponseWrapper.incrementHadamardProductCount();
                return;
            case COUNT:
                computeResponseWrapper.incrementCountOperatorCount();
                return;
            default:
                return;
        }
    }

    private AbstractStorageEngine getStorageEngine(String str) {
        AbstractStorageEngine localStorageEngine = this.storageEngineRepository.getLocalStorageEngine(str);
        if (localStorageEngine == null) {
            throw new VeniceNoStoreException(str);
        }
        return localStorageEngine;
    }
}
