package com.linkedin.venice.router.api;

import com.linkedin.alpini.base.concurrency.AsyncFuture;
import com.linkedin.alpini.base.concurrency.AsyncPromise;
import com.linkedin.alpini.netty4.misc.BasicHttpRequest;
import com.linkedin.alpini.router.api.PartitionDispatchHandler4;
import com.linkedin.alpini.router.api.ResourcePath;
import com.linkedin.alpini.router.api.RouterException;
import com.linkedin.alpini.router.api.Scatter;
import com.linkedin.alpini.router.api.ScatterGatherRequest;
import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.VeniceRouterConfig;
import com.linkedin.venice.router.api.path.VenicePath;
import com.linkedin.venice.router.httpclient.PortableHttpResponse;
import com.linkedin.venice.router.httpclient.StorageNodeClient;
import com.linkedin.venice.router.stats.AggHostHealthStats;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.router.stats.RouteHttpRequestStats;
import com.linkedin.venice.router.stats.RouteHttpStats;
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.router.streaming.VeniceChunkedResponse;
import com.linkedin.venice.router.throttle.PendingRequestThrottler;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/router/api/VeniceDispatcher.class */
public final class VeniceDispatcher implements PartitionDispatchHandler4<Instance, VenicePath, RouterKey> {
    private final VeniceRouterConfig routerConfig;
    private final ReadOnlyStoreRepository storeRepository;
    private final StorageNodeClient storageNodeClient;
    private final PendingRequestThrottler pendingRequestThrottler;
    private final RouteHttpRequestStats routeHttpRequestStats;
    private final RouterStats<RouteHttpStats> perRouteStatsByType;
    private final RouterStats<AggRouterHttpRequestStats> perStoreStatsByType;
    private final AggHostHealthStats aggHostHealthStats;
    private final long routerUnhealthyPendingConnThresholdPerRoute;
    private final boolean isStatefulHealthCheckEnabled;
    private final RouterStats<AggRouterHttpRequestStats> routerStats;
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) VeniceDispatcher.class);
    private static final Set<Integer> PASS_THROUGH_ERROR_CODES = Utils.setOf(Integer.valueOf(HttpResponseStatus.TOO_MANY_REQUESTS.code()));
    private static final Set<Integer> RETRIABLE_ERROR_CODES = Utils.setOf(Integer.valueOf(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()), Integer.valueOf(HttpResponseStatus.SERVICE_UNAVAILABLE.code()));
    private final VeniceConcurrentHashMap<Long, TimedCompletableFuture> responseFutureMap = new VeniceConcurrentHashMap<>();
    private final VeniceConcurrentHashMap<String, ReentrantLock> storageNodeLockMap = new VeniceConcurrentHashMap<>();
    private final AtomicLong uniqueRequestId = new AtomicLong(0);
    private final LeakedCompletableFutureCleanupService leakedCompletableFutureCleanupService = new LeakedCompletableFutureCleanupService();

    /* loaded from: input_file:com/linkedin/venice/router/api/VeniceDispatcher$LeakedCompletableFutureCleanupService.class */
    private class LeakedCompletableFutureCleanupService extends Thread {
        private final long pollIntervalMs;
        private final long cleanupThresholdMs;

        public LeakedCompletableFutureCleanupService() {
            super("LeakedCompletableFutureCleanupService");
            this.pollIntervalMs = VeniceDispatcher.this.routerConfig.getLeakedFutureCleanupPollIntervalMs();
            this.cleanupThresholdMs = VeniceDispatcher.this.routerConfig.getLeakedFutureCleanupThresholdMs();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(this.pollIntervalMs);
                    VeniceDispatcher.this.responseFutureMap.forEach((l, timedCompletableFuture) -> {
                        if (System.currentTimeMillis() - timedCompletableFuture.requestTime >= this.cleanupThresholdMs) {
                            VeniceDispatcher.LOGGER.warn("Cleaning up the leaked response future: {}", timedCompletableFuture);
                            timedCompletableFuture.completeExceptionally(new VeniceException("Leaking response future"));
                            VeniceDispatcher.this.aggHostHealthStats.recordLeakedPendingRequestCount(timedCompletableFuture.hostName);
                        }
                    });
                } catch (InterruptedException e) {
                    VeniceDispatcher.LOGGER.info("LeakedCompletableFutureCleanupService was interrupt, will exit", (Throwable) e);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/router/api/VeniceDispatcher$TimedCompletableFuture.class */
    public static class TimedCompletableFuture<T> extends CompletableFuture<T> {
        private final long requestTime;
        private final String hostName;

        public TimedCompletableFuture(long j, String str) {
            this.requestTime = j;
            this.hostName = str;
        }

        @Override // java.util.concurrent.CompletableFuture
        public String toString() {
            return getClass().getName() + " for request to " + this.hostName + " at timestamp: " + this.requestTime;
        }
    }

    public VeniceDispatcher(VeniceRouterConfig veniceRouterConfig, ReadOnlyStoreRepository readOnlyStoreRepository, RouterStats<AggRouterHttpRequestStats> routerStats, MetricsRepository metricsRepository, StorageNodeClient storageNodeClient, RouteHttpRequestStats routeHttpRequestStats, AggHostHealthStats aggHostHealthStats, RouterStats<AggRouterHttpRequestStats> routerStats2) {
        this.routerConfig = veniceRouterConfig;
        this.routerUnhealthyPendingConnThresholdPerRoute = this.routerConfig.getRouterUnhealthyPendingConnThresholdPerRoute();
        this.isStatefulHealthCheckEnabled = this.routerConfig.isStatefulRouterHealthCheckEnabled();
        this.storeRepository = readOnlyStoreRepository;
        this.routeHttpRequestStats = routeHttpRequestStats;
        this.perRouteStatsByType = new RouterStats<>(requestType -> {
            return new RouteHttpStats(metricsRepository, requestType);
        });
        this.perStoreStatsByType = routerStats;
        this.storageNodeClient = storageNodeClient;
        this.pendingRequestThrottler = new PendingRequestThrottler(veniceRouterConfig.getMaxPendingRequest());
        this.aggHostHealthStats = aggHostHealthStats;
        this.leakedCompletableFutureCleanupService.start();
        this.routerStats = routerStats2;
    }

    public void dispatch(@Nonnull Scatter<Instance, VenicePath, RouterKey> scatter, @Nonnull ScatterGatherRequest<Instance, RouterKey> scatterGatherRequest, @Nonnull VenicePath venicePath, @Nonnull BasicHttpRequest basicHttpRequest, @Nonnull AsyncPromise<Instance> asyncPromise, @Nonnull AsyncPromise<List<FullHttpResponse>> asyncPromise2, @Nonnull AsyncPromise<HttpResponseStatus> asyncPromise3, @Nonnull AsyncFuture<Void> asyncFuture, @Nonnull Executor executor) throws RouterException {
        String storeName = venicePath.getStoreName();
        RequestType requestType = venicePath.getRequestType();
        venicePath.recordOriginalRequestStartTimestamp();
        if ((requestType.equals(RequestType.COMPUTE) || requestType.equals(RequestType.COMPUTE_STREAMING)) && !this.storeRepository.isReadComputationEnabled(storeName)) {
            throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking(Optional.of(storeName), Optional.of(requestType), HttpResponseStatus.METHOD_NOT_ALLOWED, "Read compute is not enabled for the store. Please contact Venice team to enable the feature.");
        }
        if (scatterGatherRequest.getHosts().size() != 1) {
            throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking(Optional.of(storeName), Optional.of(requestType), HttpResponseStatus.INTERNAL_SERVER_ERROR, "There should be only one chosen replica for the request: " + scatterGatherRequest);
        }
        Instance instance = scatterGatherRequest.getHosts().get(0);
        asyncPromise.setSuccess(instance);
        sendRequest(instance, venicePath, asyncPromise3).whenComplete((portableHttpResponse, th) -> {
            int statusCode;
            if (portableHttpResponse != null) {
                try {
                    statusCode = portableHttpResponse.getStatusCode();
                } catch (Throwable th) {
                    asyncPromise2.setFailure(th);
                    return;
                }
            } else {
                statusCode = 500;
            }
            int i = statusCode;
            if (!asyncPromise3.isCancelled() && RETRIABLE_ERROR_CODES.contains(Integer.valueOf(i))) {
                asyncPromise3.setSuccess(HttpResponseStatus.INTERNAL_SERVER_ERROR);
            } else {
                if (th != null) {
                    throw th;
                }
                if (i < 500) {
                    venicePath.markStorageNodeAsFast(instance.getNodeId());
                }
                asyncPromise2.setSuccess(Collections.singletonList(buildResponse(venicePath, portableHttpResponse)));
            }
        });
    }

    protected CompletableFuture<PortableHttpResponse> sendRequest(Instance instance, VenicePath venicePath, AsyncPromise<HttpResponseStatus> asyncPromise) throws RouterException {
        String storeName = venicePath.getStoreName();
        String host = instance.getHost();
        RequestType requestType = venicePath.getRequestType();
        long nanoTime = System.nanoTime();
        TimedCompletableFuture timedCompletableFuture = new TimedCompletableFuture(System.currentTimeMillis(), instance.getNodeId());
        if (!this.pendingRequestThrottler.put()) {
            this.routerStats.getStatsByType(requestType).recordRequestThrottledByRouterCapacity(storeName);
            throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking(Optional.of(storeName), Optional.of(requestType), HttpResponseStatus.SERVICE_UNAVAILABLE, "Maximum number of pending request threshold reached! Current pending request count: " + this.pendingRequestThrottler.getCurrentPendingRequestCount());
        }
        ReentrantLock computeIfAbsent = this.storageNodeLockMap.computeIfAbsent(host, str -> {
            return new ReentrantLock();
        });
        computeIfAbsent.lock();
        try {
            long pendingRequestCount = this.routeHttpRequestStats.getPendingRequestCount(instance.getNodeId());
            if (this.isStatefulHealthCheckEnabled && pendingRequestCount > this.routerUnhealthyPendingConnThresholdPerRoute) {
                if (asyncPromise.isCancelled()) {
                    throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking(Optional.of(storeName), Optional.of(requestType), HttpResponseStatus.SERVICE_UNAVAILABLE, "Too many pending request to storage node : " + host);
                }
                asyncPromise.setSuccess(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                timedCompletableFuture.completeExceptionally(new VeniceException("Triggering error retry, too many pending request to storage node :" + host));
                this.perStoreStatsByType.getStatsByType(venicePath.getRequestType()).recordErrorRetryAttemptTriggeredByPendingRequestCheck(storeName);
                if (1 != 0) {
                    this.pendingRequestThrottler.take();
                }
                computeIfAbsent.unlock();
                return timedCompletableFuture;
            }
            this.routeHttpRequestStats.recordPendingRequest(instance.getNodeId());
            long andIncrement = this.uniqueRequestId.getAndIncrement();
            this.responseFutureMap.put(Long.valueOf(andIncrement), timedCompletableFuture);
            try {
                venicePath.requestStorageNode(instance.getNodeId());
                StorageNodeClient storageNodeClient = this.storageNodeClient;
                Objects.requireNonNull(timedCompletableFuture);
                Consumer<PortableHttpResponse> consumer = (v1) -> {
                    r3.complete(v1);
                };
                Objects.requireNonNull(timedCompletableFuture);
                storageNodeClient.query(instance, venicePath, consumer, timedCompletableFuture::completeExceptionally, () -> {
                    return timedCompletableFuture.cancel(false);
                }, nanoTime);
            } catch (Throwable th) {
                timedCompletableFuture.completeExceptionally(th);
            }
            CompletableFuture whenComplete = timedCompletableFuture.whenComplete((portableHttpResponse, th2) -> {
                this.perRouteStatsByType.getStatsByType(requestType).recordResponseWaitingTime(instance.getHost(), LatencyUtils.getLatencyInMS(nanoTime));
                this.routeHttpRequestStats.recordFinishedRequest(instance.getNodeId());
                this.pendingRequestThrottler.take();
                this.responseFutureMap.remove(Long.valueOf(andIncrement));
            });
            if (0 != 0) {
                this.pendingRequestThrottler.take();
            }
            computeIfAbsent.unlock();
            return whenComplete;
        } catch (Throwable th3) {
            if (0 != 0) {
                this.pendingRequestThrottler.take();
            }
            computeIfAbsent.unlock();
            throw th3;
        }
    }

    protected VeniceFullHttpResponse buildResponse(VenicePath venicePath, PortableHttpResponse portableHttpResponse) throws IOException {
        ContentDecompressResult contentDecompressResult;
        int statusCode = portableHttpResponse.getStatusCode();
        ByteBuf contentInByteBuf = portableHttpResponse.getContentInByteBuf();
        if (PASS_THROUGH_ERROR_CODES.contains(Integer.valueOf(statusCode))) {
            return buildPlainTextResponse(HttpResponseStatus.valueOf(statusCode), contentInByteBuf);
        }
        CompressionStrategy compressionStrategy = VeniceResponseDecompressor.getCompressionStrategy(portableHttpResponse.getFirstHeader(HttpConstants.VENICE_COMPRESSION_STRATEGY));
        long j = 0;
        if (statusCode != 200 && statusCode != 404) {
            statusCode = 502;
        }
        if (statusCode == 200) {
            VeniceResponseDecompressor responseDecompressor = venicePath.getResponseDecompressor();
            if (venicePath.isStreamingRequest()) {
                VeniceChunkedResponse veniceChunkedResponse = venicePath.getChunkedResponse().get();
                if (venicePath.getRequestType().equals(RequestType.MULTI_GET_STREAMING)) {
                    Pair<ByteBuf, CompressionStrategy> processMultiGetResponseForStreaming = responseDecompressor.processMultiGetResponseForStreaming(compressionStrategy, contentInByteBuf);
                    veniceChunkedResponse.write(processMultiGetResponseForStreaming.getFirst(), processMultiGetResponseForStreaming.getSecond());
                } else {
                    veniceChunkedResponse.write(contentInByteBuf);
                }
                contentInByteBuf = Unpooled.EMPTY_BUFFER;
            } else {
                switch (venicePath.getRequestType()) {
                    case SINGLE_GET:
                        contentDecompressResult = responseDecompressor.decompressSingleGetContent(compressionStrategy, contentInByteBuf);
                        break;
                    case MULTI_GET:
                        contentDecompressResult = responseDecompressor.decompressMultiGetContent(compressionStrategy, contentInByteBuf);
                        break;
                    case COMPUTE:
                        contentDecompressResult = new ContentDecompressResult(contentInByteBuf, CompressionStrategy.NO_OP, 0L);
                        break;
                    default:
                        throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), HttpResponseStatus.INTERNAL_SERVER_ERROR, "Unknown request type: " + venicePath.getRequestType());
                }
                contentInByteBuf = contentDecompressResult.getContent();
                compressionStrategy = contentDecompressResult.getCompressionStrategy();
                j = contentDecompressResult.getDecompressionTimeInNs();
            }
        } else {
            compressionStrategy = CompressionStrategy.NO_OP;
        }
        VeniceFullHttpResponse veniceFullHttpResponse = new VeniceFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode), contentInByteBuf, j);
        veniceFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, portableHttpResponse.getFirstHeader("Content-Type")).set(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(contentInByteBuf.readableBytes())).set(HttpConstants.VENICE_SCHEMA_ID, (Object) portableHttpResponse.getFirstHeader(HttpConstants.VENICE_SCHEMA_ID)).set(HttpConstants.VENICE_COMPRESSION_STRATEGY, (Object) Integer.valueOf(compressionStrategy.getValue())).set(HttpConstants.VENICE_REQUEST_RCU, portableHttpResponse.containsHeader(HttpConstants.VENICE_REQUEST_RCU) ? portableHttpResponse.getFirstHeader(HttpConstants.VENICE_REQUEST_RCU) : 1);
        return veniceFullHttpResponse;
    }

    public RouteHttpRequestStats getRouteHttpRequestStats() {
        return this.routeHttpRequestStats;
    }

    public PendingRequestThrottler getPendingRequestThrottler() {
        return this.pendingRequestThrottler;
    }

    protected VeniceFullHttpResponse buildPlainTextResponse(HttpResponseStatus httpResponseStatus, ByteBuf byteBuf) {
        VeniceFullHttpResponse veniceFullHttpResponse = new VeniceFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf, 0L);
        veniceFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain").set(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(byteBuf.readableBytes()));
        return veniceFullHttpResponse;
    }

    public void stop() {
        this.leakedCompletableFutureCleanupService.interrupt();
    }

    @Override // com.linkedin.alpini.router.api.PartitionDispatchHandler
    public /* bridge */ /* synthetic */ void dispatch(@Nonnull Scatter scatter, @Nonnull ScatterGatherRequest scatterGatherRequest, @Nonnull ResourcePath resourcePath, @Nonnull BasicHttpRequest basicHttpRequest, @Nonnull AsyncPromise asyncPromise, @Nonnull AsyncPromise<List<FullHttpResponse>> asyncPromise2, @Nonnull AsyncPromise<HttpResponseStatus> asyncPromise3, @Nonnull AsyncFuture asyncFuture, @Nonnull Executor executor) throws RouterException {
        dispatch((Scatter<Instance, VenicePath, RouterKey>) scatter, (ScatterGatherRequest<Instance, RouterKey>) scatterGatherRequest, (VenicePath) resourcePath, basicHttpRequest, (AsyncPromise<Instance>) asyncPromise, asyncPromise2, asyncPromise3, (AsyncFuture<Void>) asyncFuture, executor);
    }
}
