package com.linkedin.venice.router.api;

import com.linkedin.alpini.base.misc.HeaderNames;
import com.linkedin.alpini.base.misc.Metrics;
import com.linkedin.alpini.base.misc.TimeValue;
import com.linkedin.alpini.netty4.misc.BasicFullHttpRequest;
import com.linkedin.alpini.router.api.MetricNames;
import com.linkedin.alpini.router.api.ResponseAggregatorFactory;
import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceStoreIsMigratedException;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.api.path.VenicePath;
import com.linkedin.venice.router.api.routing.helix.HelixGroupSelector;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.router.streaming.SuccessfulStreamingResponse;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.utils.LatencyUtils;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/router/api/VeniceResponseAggregator.class */
public class VeniceResponseAggregator implements ResponseAggregatorFactory<BasicFullHttpRequest, FullHttpResponse> {
    private final RouterStats<AggRouterHttpRequestStats> routerStats;
    private final Optional<MetaStoreShadowReader> metaStoreShadowReaderOptional;
    private HelixGroupSelector helixGroupSelector;
    private long singleGetTardyThresholdInMs = TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
    private long multiGetTardyThresholdInMs = TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
    private long computeTardyThresholdInMs = TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
    private static final List<HttpResponseStatus> HEALTHY_STATUSES = Arrays.asList(HttpResponseStatus.OK, HttpResponseStatus.NOT_FOUND);
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) VeniceResponseAggregator.class);
    public static final Map<CharSequence, String> MULTI_GET_VALID_HEADER_MAP = new HashMap();
    public static final Map<CharSequence, String> COMPUTE_VALID_HEADER_MAP = new HashMap();

    public VeniceResponseAggregator(RouterStats<AggRouterHttpRequestStats> routerStats, Optional<MetaStoreShadowReader> optional) {
        this.routerStats = routerStats;
        this.metaStoreShadowReaderOptional = optional;
    }

    public VeniceResponseAggregator withSingleGetTardyThreshold(long j, TimeUnit timeUnit) {
        this.singleGetTardyThresholdInMs = timeUnit.toMillis(j);
        return this;
    }

    public VeniceResponseAggregator withMultiGetTardyThreshold(long j, TimeUnit timeUnit) {
        this.multiGetTardyThresholdInMs = timeUnit.toMillis(j);
        return this;
    }

    public VeniceResponseAggregator withComputeTardyThreshold(long j, TimeUnit timeUnit) {
        this.computeTardyThresholdInMs = timeUnit.toMillis(j);
        return this;
    }

    public void initHelixGroupSelector(HelixGroupSelector helixGroupSelector) {
        if (this.helixGroupSelector != null) {
            throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), HttpResponseStatus.INTERNAL_SERVER_ERROR, "HelixGroupSelector has already been initialized before, and no further update expected!");
        }
        this.helixGroupSelector = helixGroupSelector;
    }

    @Override // com.linkedin.alpini.router.api.ResponseAggregatorFactory
    @Nonnull
    public FullHttpResponse buildResponse(@Nonnull BasicFullHttpRequest basicFullHttpRequest, Metrics metrics, @Nonnull List<FullHttpResponse> list) {
        FullHttpResponse processComputeResponses;
        if (list.isEmpty()) {
            throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), HttpResponseStatus.BAD_GATEWAY, "Received empty response!");
        }
        if (metrics == null) {
            throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), HttpResponseStatus.INTERNAL_SERVER_ERROR, "'metrics' should not be null");
        }
        VenicePath venicePath = (VenicePath) metrics.getPath();
        if (venicePath == null) {
            FullHttpResponse fullHttpResponse = list.get(0);
            try {
                if (fullHttpResponse.status().equals(HttpResponseStatus.MOVED_PERMANENTLY)) {
                    String d2ServiceName = VeniceStoreIsMigratedException.getD2ServiceName(fullHttpResponse.headers().get(HeaderNames.X_ERROR_MESSAGE));
                    if (StringUtils.isEmpty(d2ServiceName)) {
                        LOGGER.error("D2 service name is not available for request redirection");
                    } else {
                        URI uri = new URI(basicFullHttpRequest.uri());
                        String uri2 = new URI("d2", d2ServiceName, uri.getPath(), uri.getQuery(), uri.getFragment()).toString();
                        LOGGER.info("redirect the request to {}", uri2);
                        fullHttpResponse.setStatus(HttpResponseStatus.MOVED_PERMANENTLY);
                        fullHttpResponse.headers().set(HttpHeaderNames.LOCATION, uri2);
                    }
                }
                return fullHttpResponse;
            } catch (URISyntaxException e) {
                throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), HttpResponseStatus.BAD_REQUEST, "Failed to parse uri");
            }
        }
        if (!venicePath.isRetryRequest() && this.helixGroupSelector != null && venicePath.getHelixGroupId() >= 0) {
            this.helixGroupSelector.finishRequest(venicePath.getRequestId(), venicePath.getHelixGroupId());
        }
        RequestType requestType = venicePath.getRequestType();
        AggRouterHttpRequestStats statsByType = this.routerStats.getStatsByType(requestType);
        String storeName = venicePath.getStoreName();
        int versionNumber = venicePath.getVersionNumber();
        if (venicePath.isStreamingRequest()) {
            processComputeResponses = buildStreamingResponse(list, storeName, versionNumber);
        } else {
            switch (requestType) {
                case SINGLE_GET:
                    processComputeResponses = list.get(0);
                    break;
                case MULTI_GET:
                    processComputeResponses = processMultiGetResponses(list, storeName, versionNumber);
                    break;
                case COMPUTE:
                    processComputeResponses = processComputeResponses(list, storeName);
                    break;
                default:
                    throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), HttpResponseStatus.INTERNAL_SERVER_ERROR, "Unknown request type: " + requestType);
            }
        }
        statsByType.recordFanoutRequestCount(storeName, list.size());
        if (this.metaStoreShadowReaderOptional.isPresent()) {
            MetaStoreShadowReader metaStoreShadowReader = this.metaStoreShadowReaderOptional.get();
            if (metaStoreShadowReader.shouldPerformShadowRead(venicePath, processComputeResponses)) {
                this.routerStats.getStatsByType(RequestType.SINGLE_GET).recordMetaStoreShadowRead(VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName));
                processComputeResponses = metaStoreShadowReader.shadowReadMetaStore(venicePath, processComputeResponses);
            }
        }
        HttpResponseStatus status = processComputeResponses.status();
        Map<String, TimeValue> metrics2 = metrics.getMetrics();
        if (metrics2.containsKey(MetricNames.ROUTER_SERVER_TIME.name())) {
            double convertLatencyFromNSToMS = LatencyUtils.convertLatencyFromNSToMS(metrics2.get(MetricNames.ROUTER_SERVER_TIME.name()).getRawValue(TimeUnit.NANOSECONDS));
            statsByType.recordLatency(storeName, convertLatencyFromNSToMS);
            if (HEALTHY_STATUSES.contains(status)) {
                this.routerStats.getStatsByType(RequestType.SINGLE_GET).recordReadQuotaUsage(storeName, venicePath.getPartitionKeys().size());
                if (isFastRequest(convertLatencyFromNSToMS, requestType)) {
                    statsByType.recordHealthyRequest(storeName, convertLatencyFromNSToMS);
                } else {
                    statsByType.recordTardyRequest(storeName, convertLatencyFromNSToMS);
                }
            } else if (status.equals(HttpResponseStatus.TOO_MANY_REQUESTS)) {
                LOGGER.debug("request is rejected by storage node because quota is exceeded");
                statsByType.recordThrottledRequest(storeName, convertLatencyFromNSToMS);
            } else {
                LOGGER.debug("Unhealthy request detected, latency: {}ms, response status: {}", Double.valueOf(convertLatencyFromNSToMS), status);
                statsByType.recordUnhealthyRequest(storeName, convertLatencyFromNSToMS);
            }
        }
        if (metrics2.containsKey(MetricNames.ROUTER_RESPONSE_WAIT_TIME.name())) {
            statsByType.recordResponseWaitingTime(storeName, LatencyUtils.convertLatencyFromNSToMS(metrics2.get(MetricNames.ROUTER_RESPONSE_WAIT_TIME.name()).getRawValue(TimeUnit.NANOSECONDS)));
        }
        if (metrics2.containsKey(MetricNames.ROUTER_PARSE_URI.name())) {
            statsByType.recordRequestParsingLatency(storeName, LatencyUtils.convertLatencyFromNSToMS(metrics2.get(MetricNames.ROUTER_PARSE_URI.name()).getRawValue(TimeUnit.NANOSECONDS)));
        }
        if (metrics2.containsKey(MetricNames.ROUTER_ROUTING_TIME.name())) {
            statsByType.recordRequestRoutingLatency(storeName, LatencyUtils.convertLatencyFromNSToMS(metrics2.get(MetricNames.ROUTER_ROUTING_TIME.name()).getRawValue(TimeUnit.NANOSECONDS)));
        }
        if (HEALTHY_STATUSES.contains(status) && !venicePath.isStreamingRequest()) {
            statsByType.recordResponseSize(storeName, processComputeResponses.content().readableBytes());
        }
        statsByType.recordResponse(storeName);
        return processComputeResponses;
    }

    private FullHttpResponse buildStreamingResponse(List<FullHttpResponse> list, String str, int i) {
        validateAndExtractCompressionStrategy(list, str, i);
        for (FullHttpResponse fullHttpResponse : list) {
            if (!fullHttpResponse.status().equals(HttpResponseStatus.OK)) {
                return fullHttpResponse;
            }
        }
        return new SuccessfulStreamingResponse();
    }

    private boolean isFastRequest(double d, RequestType requestType) {
        switch (requestType) {
            case SINGLE_GET:
                return d < ((double) this.singleGetTardyThresholdInMs);
            case MULTI_GET:
            case MULTI_GET_STREAMING:
                return d < ((double) this.multiGetTardyThresholdInMs);
            case COMPUTE:
            case COMPUTE_STREAMING:
                return d < ((double) this.computeTardyThresholdInMs);
            default:
                throw new VeniceException("Unknown request type: " + requestType);
        }
    }

    private static CompressionStrategy getCompressionStrategy(String str) {
        return str == null ? CompressionStrategy.NO_OP : CompressionStrategy.valueOf(Integer.parseInt(str));
    }

    private static CompressionStrategy getResponseCompressionStrategy(HttpResponse httpResponse) {
        return getCompressionStrategy(httpResponse.headers().get(HttpConstants.VENICE_COMPRESSION_STRATEGY));
    }

    private CompressionStrategy validateAndExtractCompressionStrategy(List<FullHttpResponse> list, String str, int i) {
        CompressionStrategy compressionStrategy = null;
        Iterator<FullHttpResponse> it2 = list.iterator();
        while (it2.hasNext()) {
            CompressionStrategy responseCompressionStrategy = getResponseCompressionStrategy(it2.next());
            if (compressionStrategy == null) {
                compressionStrategy = responseCompressionStrategy;
            }
            if (responseCompressionStrategy != compressionStrategy) {
                throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.of(str), Optional.of(RequestType.MULTI_GET), HttpResponseStatus.BAD_GATEWAY, String.format("Inconsistent compression strategy returned. Store: %s; Version: %d, ExpectedCompression: %d, ResponseCompression: %d", str, Integer.valueOf(i), Integer.valueOf(compressionStrategy.getValue()), Integer.valueOf(responseCompressionStrategy.getValue())));
            }
        }
        return compressionStrategy;
    }

    protected FullHttpResponse processComputeResponses(List<FullHttpResponse> list, String str) {
        CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer();
        int i = 0;
        for (FullHttpResponse fullHttpResponse : list) {
            if (fullHttpResponse.status() != HttpResponseStatus.OK) {
                return fullHttpResponse;
            }
            COMPUTE_VALID_HEADER_MAP.forEach((charSequence, str2) -> {
                String str2 = fullHttpResponse.headers().get(charSequence);
                if (str2 == null) {
                    throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.of(str), Optional.of(RequestType.COMPUTE), HttpResponseStatus.BAD_GATEWAY, "Header: " + ((Object) charSequence) + " is expected in compute sub-response");
                }
                if (!str2.equals(str2)) {
                    throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.of(str), Optional.of(RequestType.COMPUTE), HttpResponseStatus.BAD_GATEWAY, "Incompatible header received for " + ((Object) charSequence) + ", values: " + str2 + ", " + str2);
                }
            });
            i += getRCU(fullHttpResponse);
            compositeBuffer.addComponent(true, fullHttpResponse.content());
        }
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, compositeBuffer);
        COMPUTE_VALID_HEADER_MAP.forEach((charSequence2, str3) -> {
            defaultFullHttpResponse.headers().add(charSequence2, str3);
        });
        defaultFullHttpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(compositeBuffer.readableBytes()));
        defaultFullHttpResponse.headers().add(HttpConstants.VENICE_COMPRESSION_STRATEGY, (Object) Integer.valueOf(CompressionStrategy.NO_OP.getValue()));
        defaultFullHttpResponse.headers().add(HttpConstants.VENICE_REQUEST_RCU, (Object) Integer.valueOf(i));
        return defaultFullHttpResponse;
    }

    private int getRCU(FullHttpResponse fullHttpResponse) {
        String str = fullHttpResponse.headers().get(HttpConstants.VENICE_REQUEST_RCU);
        if (NumberUtils.isCreatable(str)) {
            return Integer.parseInt(str);
        }
        return 1;
    }

    protected FullHttpResponse processMultiGetResponses(List<FullHttpResponse> list, String str, int i) {
        long j = 0;
        long j2 = 0;
        int i2 = 0;
        CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer();
        CompressionStrategy validateAndExtractCompressionStrategy = validateAndExtractCompressionStrategy(list, str, i);
        for (FullHttpResponse fullHttpResponse : list) {
            if (fullHttpResponse.status() != HttpResponseStatus.OK) {
                fullHttpResponse.headers().set(HttpConstants.VENICE_COMPRESSION_STRATEGY, (Object) Integer.valueOf(CompressionStrategy.NO_OP.getValue()));
                return fullHttpResponse;
            }
            compositeBuffer.addComponent(true, fullHttpResponse.content());
            MULTI_GET_VALID_HEADER_MAP.forEach((charSequence, str2) -> {
                String str2 = fullHttpResponse.headers().get(charSequence);
                if (str2 == null) {
                    throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.of(str), Optional.of(RequestType.MULTI_GET), HttpResponseStatus.BAD_GATEWAY, "Header: " + ((Object) charSequence) + " is expected in multi-get sub-response");
                }
                if (!str2.equals(str2)) {
                    throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.of(str), Optional.of(RequestType.MULTI_GET), HttpResponseStatus.BAD_GATEWAY, "Incompatible header received for " + ((Object) charSequence) + ", values: " + str2 + ", " + str2);
                }
            });
            i2 += getRCU(fullHttpResponse);
            j += fullHttpResponse.content().readableBytes();
            if (fullHttpResponse instanceof VeniceFullHttpResponse) {
                j2 += ((VeniceFullHttpResponse) fullHttpResponse).getDecompressionTimeInNs();
            }
        }
        if (j > 0 && j2 > 0) {
            AggRouterHttpRequestStats statsByType = this.routerStats.getStatsByType(RequestType.MULTI_GET);
            statsByType.recordCompressedResponseSize(str, j);
            statsByType.recordDecompressionTime(str, LatencyUtils.convertLatencyFromNSToMS(j2));
        }
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, compositeBuffer);
        MULTI_GET_VALID_HEADER_MAP.forEach((charSequence2, str3) -> {
            defaultFullHttpResponse.headers().add(charSequence2, str3);
        });
        defaultFullHttpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(compositeBuffer.readableBytes()));
        defaultFullHttpResponse.headers().add(HttpConstants.VENICE_COMPRESSION_STRATEGY, (Object) Integer.valueOf(validateAndExtractCompressionStrategy.getValue()));
        defaultFullHttpResponse.headers().add(HttpConstants.VENICE_REQUEST_RCU, (Object) Integer.valueOf(i2));
        return defaultFullHttpResponse;
    }

    static {
        MULTI_GET_VALID_HEADER_MAP.put(HttpHeaderNames.CONTENT_TYPE, HttpConstants.AVRO_BINARY);
        MULTI_GET_VALID_HEADER_MAP.put(HttpConstants.VENICE_SCHEMA_ID, Integer.toString(ReadAvroProtocolDefinition.MULTI_GET_RESPONSE_V1.getProtocolVersion()));
        COMPUTE_VALID_HEADER_MAP.put(HttpHeaderNames.CONTENT_TYPE, HttpConstants.AVRO_BINARY);
        COMPUTE_VALID_HEADER_MAP.put(HttpConstants.VENICE_SCHEMA_ID, Integer.toString(ReadAvroProtocolDefinition.COMPUTE_RESPONSE_V1.getProtocolVersion()));
    }
}
