package com.linkedin.venice.router.api;

import com.linkedin.alpini.base.concurrency.AsyncPromise;
import com.linkedin.alpini.netty4.misc.BasicFullHttpRequest;
import com.linkedin.alpini.router.api.RouterException;
import com.linkedin.alpini.router.api.Scatter;
import com.linkedin.alpini.router.api.ScatterGatherRequest;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.MockHttpServerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.LiveInstanceMonitor;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.VeniceRouterConfig;
import com.linkedin.venice.router.api.path.VenicePath;
import com.linkedin.venice.router.httpclient.ApacheHttpAsyncStorageNodeClient;
import com.linkedin.venice.router.stats.AggHostHealthStats;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.router.stats.RouteHttpRequestStats;
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.utils.TestUtils;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.methods.HttpGet;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/router/api/TestVeniceDispatcher.class */
public class TestVeniceDispatcher {
    @Test
    public void testErrorRetry() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.add((HttpResponseStatus) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise2)).setSuccess((HttpResponseStatus) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.SINGLE_GET, HttpResponseStatus.INTERNAL_SERVER_ERROR, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 1);
                Assert.assertEquals(arrayList.get(0), HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }, false, CompressionStrategy.NO_OP);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void testErrorRetryOnPendingCheckFail() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(true, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.add((HttpResponseStatus) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise2)).setSuccess((HttpResponseStatus) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.SINGLE_GET, HttpResponseStatus.OK, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 2);
                Assert.assertEquals(arrayList.get(0), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                Assert.assertEquals(mockDispatcher.getPendingRequestThrottler().getCurrentPendingRequestCount(), 0L);
            }, false, CompressionStrategy.NO_OP);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void testErrorRetryOnPendingCheckLeak() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, true);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.add((HttpResponseStatus) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise2)).setSuccess((HttpResponseStatus) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.SINGLE_GET, HttpResponseStatus.OK, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 0);
                ((RouteHttpRequestStats) Mockito.verify(mockDispatcher.getRouteHttpRequestStats(), Mockito.times(1))).recordFinishedRequest((String) Mockito.any());
                ((RouteHttpRequestStats) Mockito.verify(mockDispatcher.getRouteHttpRequestStats(), Mockito.times(1))).getPendingRequestCount((String) Mockito.any());
            }, true, CompressionStrategy.NO_OP);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void passesThroughHttp429() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.addAll((Collection) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise)).setSuccess((List) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.SINGLE_GET, HttpResponseStatus.TOO_MANY_REQUESTS, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 1);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).status(), HttpResponseStatus.TOO_MANY_REQUESTS);
            }, false, CompressionStrategy.NO_OP);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void passThroughCompressedDataIfClientSupportsDecompressionForSingleGet() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.addAll((Collection) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise)).setSuccess((List) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.SINGLE_GET, HttpResponseStatus.OK, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 1);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).status(), HttpResponseStatus.OK);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).headers().get("X-VENICE-COMPRESSION-STRATEGY"), String.valueOf(CompressionStrategy.GZIP.getValue()));
                Assert.assertTrue(((FullHttpResponse) arrayList.get(0)).headers().contains("X-VENICE-RCU"));
            }, false, CompressionStrategy.GZIP);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void decompressRecordIfClientDoesntSupportsDecompressionForSingleGet() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.addAll((Collection) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise)).setSuccess((List) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.SINGLE_GET, HttpResponseStatus.OK, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 1);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).status(), HttpResponseStatus.OK);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).headers().get("X-VENICE-COMPRESSION-STRATEGY"), String.valueOf(CompressionStrategy.NO_OP.getValue()));
            }, false, CompressionStrategy.ZSTD_WITH_DICT);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void passThroughCompressedDataIfClientSupportsDecompressionForMultiGet() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.addAll((Collection) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise)).setSuccess((List) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.MULTI_GET, HttpResponseStatus.OK, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 1);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).status(), HttpResponseStatus.OK);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).headers().get("X-VENICE-COMPRESSION-STRATEGY"), String.valueOf(CompressionStrategy.GZIP.getValue()));
            }, false, CompressionStrategy.GZIP);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    @Test
    public void decompressRecordIfClientDoesntSupportsDecompressionForMultiGet() {
        VeniceDispatcher mockDispatcher = getMockDispatcher(false, false);
        try {
            AsyncPromise asyncPromise = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            AsyncPromise asyncPromise2 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
            ArrayList arrayList = new ArrayList();
            ((AsyncPromise) Mockito.doAnswer(invocationOnMock -> {
                arrayList.addAll((Collection) invocationOnMock.getArgument(0));
                return null;
            }).when(asyncPromise)).setSuccess((List) Mockito.any());
            triggerResponse(mockDispatcher, RequestType.MULTI_GET, HttpResponseStatus.OK, asyncPromise2, asyncPromise, () -> {
                Assert.assertEquals(arrayList.size(), 1);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).status(), HttpResponseStatus.OK);
                Assert.assertEquals(((FullHttpResponse) arrayList.get(0)).headers().get("X-VENICE-COMPRESSION-STRATEGY"), String.valueOf(CompressionStrategy.NO_OP.getValue()));
            }, false, CompressionStrategy.ZSTD_WITH_DICT);
            mockDispatcher.stop();
        } catch (Throwable th) {
            mockDispatcher.stop();
            throw th;
        }
    }

    private VeniceDispatcher getMockDispatcher(boolean z, boolean z2) {
        VeniceRouterConfig veniceRouterConfig = (VeniceRouterConfig) Mockito.mock(VeniceRouterConfig.class);
        ((VeniceRouterConfig) Mockito.doReturn(2).when(veniceRouterConfig)).getHttpClientPoolSize();
        ((VeniceRouterConfig) Mockito.doReturn(10).when(veniceRouterConfig)).getMaxOutgoingConn();
        ((VeniceRouterConfig) Mockito.doReturn(5).when(veniceRouterConfig)).getMaxOutgoingConnPerRoute();
        ((VeniceRouterConfig) Mockito.doReturn(10L).when(veniceRouterConfig)).getMaxPendingRequest();
        ((VeniceRouterConfig) Mockito.doReturn(false).when(veniceRouterConfig)).isSslToStorageNodes();
        ((VeniceRouterConfig) Mockito.doReturn(Long.valueOf(TimeUnit.MINUTES.toMillis(1L))).when(veniceRouterConfig)).getLeakedFutureCleanupPollIntervalMs();
        ((VeniceRouterConfig) Mockito.doReturn(Long.valueOf(TimeUnit.MINUTES.toMillis(1L))).when(veniceRouterConfig)).getLeakedFutureCleanupThresholdMs();
        ((VeniceRouterConfig) Mockito.doReturn(24).when(veniceRouterConfig)).getIoThreadCountInPoolMode();
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        MetricsRepository metricsRepository = new MetricsRepository();
        RouterStats routerStats = (RouterStats) Mockito.mock(RouterStats.class);
        RouteHttpRequestStats routeHttpRequestStats = (RouteHttpRequestStats) Mockito.mock(RouteHttpRequestStats.class);
        Mockito.when(routerStats.getStatsByType((RequestType) Mockito.any())).thenReturn(Mockito.mock(AggRouterHttpRequestStats.class));
        if (z) {
            ((VeniceRouterConfig) Mockito.doReturn(true).when(veniceRouterConfig)).isStatefulRouterHealthCheckEnabled();
            ((VeniceRouterConfig) Mockito.doReturn(5).when(veniceRouterConfig)).getRouterUnhealthyPendingConnThresholdPerRoute();
            ((RouteHttpRequestStats) Mockito.doReturn(10L).when(routeHttpRequestStats)).getPendingRequestCount(Mockito.anyString());
        }
        if (z2) {
            ((VeniceRouterConfig) Mockito.doReturn(1L).doReturn(0L).when(veniceRouterConfig)).getMaxPendingRequest();
            ((VeniceRouterConfig) Mockito.doReturn(true).when(veniceRouterConfig)).isStatefulRouterHealthCheckEnabled();
            ((VeniceRouterConfig) Mockito.doReturn(15).when(veniceRouterConfig)).getRouterUnhealthyPendingConnThresholdPerRoute();
            ((RouteHttpRequestStats) Mockito.doReturn(10L).when(routeHttpRequestStats)).getPendingRequestCount(Mockito.anyString());
        }
        return new VeniceDispatcher(veniceRouterConfig, readOnlyStoreRepository, routerStats, metricsRepository, new ApacheHttpAsyncStorageNodeClient(veniceRouterConfig, Optional.empty(), metricsRepository, (LiveInstanceMonitor) Mockito.mock(LiveInstanceMonitor.class)), routeHttpRequestStats, (AggHostHealthStats) Mockito.mock(AggHostHealthStats.class), (RouterStats) Mockito.mock(RouterStats.class));
    }

    private void triggerResponse(VeniceDispatcher veniceDispatcher, RequestType requestType, HttpResponseStatus httpResponseStatus, AsyncPromise asyncPromise, AsyncPromise asyncPromise2, Runnable runnable, boolean z, CompressionStrategy compressionStrategy) {
        Scatter scatter = (Scatter) Mockito.mock(Scatter.class);
        ScatterGatherRequest scatterGatherRequest = (ScatterGatherRequest) Mockito.mock(ScatterGatherRequest.class);
        RouterStats routerStats = (RouterStats) Mockito.mock(RouterStats.class);
        ((RouterStats) Mockito.doReturn((AggRouterHttpRequestStats) Mockito.mock(AggRouterHttpRequestStats.class)).when(routerStats)).getStatsByType((RequestType) Mockito.any());
        RouterExceptionAndTrackingUtils.setRouterStats(routerStats);
        VenicePath venicePath = (VenicePath) Mockito.mock(VenicePath.class);
        ((VenicePath) Mockito.doReturn("test_store").when(venicePath)).getStoreName();
        ((VenicePath) Mockito.doReturn(requestType).when(venicePath)).getRequestType();
        ((VenicePath) Mockito.doReturn(HttpMethod.GET).when(venicePath)).getHttpMethod();
        if (requestType.equals(RequestType.SINGLE_GET)) {
            ((VenicePath) Mockito.doReturn(Integer.toString(ReadAvroProtocolDefinition.SINGLE_GET_ROUTER_REQUEST_V1.getProtocolVersion())).when(venicePath)).getVeniceApiVersionHeader();
        } else if (requestType.equals(RequestType.MULTI_GET)) {
            ((VenicePath) Mockito.doReturn(Integer.toString(ReadAvroProtocolDefinition.MULTI_GET_ROUTER_REQUEST_V1.getProtocolVersion())).when(venicePath)).getVeniceApiVersionHeader();
        }
        BasicFullHttpRequest basicFullHttpRequest = (BasicFullHttpRequest) Mockito.mock(BasicFullHttpRequest.class);
        HashMap hashMap = new HashMap();
        hashMap.put("X-VENICE-SUPPORTED-COMPRESSION-STRATEGY", String.valueOf(CompressionStrategy.GZIP.getValue()));
        DefaultHttpHeaders defaultHttpHeaders = new DefaultHttpHeaders();
        for (Map.Entry entry : hashMap.entrySet()) {
            defaultHttpHeaders.add((String) entry.getKey(), entry.getValue());
        }
        ((BasicFullHttpRequest) Mockito.doReturn(defaultHttpHeaders).when(basicFullHttpRequest)).headers();
        CompressorFactory compressorFactory = (CompressorFactory) Mockito.mock(CompressorFactory.class);
        VeniceCompressor veniceCompressor = (VeniceCompressor) Mockito.mock(VeniceCompressor.class);
        VeniceCompressor veniceCompressor2 = (VeniceCompressor) Mockito.mock(VeniceCompressor.class);
        try {
            ((VeniceCompressor) Mockito.doReturn(ByteBuffer.allocate(10)).when(veniceCompressor)).decompress((ByteBuffer) Mockito.any(ByteBuffer.class));
        } catch (IOException e) {
            Assert.fail();
        }
        ((CompressorFactory) Mockito.doReturn(veniceCompressor).when(compressorFactory)).getVersionSpecificCompressor(Mockito.anyString());
        ((CompressorFactory) Mockito.doAnswer(invocationOnMock -> {
            return CompressionStrategy.NO_OP.equals(invocationOnMock.getArgument(0)) ? veniceCompressor2 : veniceCompressor;
        }).when(compressorFactory)).getCompressor((CompressionStrategy) Mockito.any());
        ((VenicePath) Mockito.doReturn(new VeniceResponseDecompressor(true, routerStats, basicFullHttpRequest, "test_store", 1, compressorFactory)).when(venicePath)).getResponseDecompressor();
        AsyncPromise asyncPromise3 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
        AsyncPromise asyncPromise4 = (AsyncPromise) Mockito.mock(AsyncPromise.class);
        Executor executor = runnable2 -> {
            runnable2.run();
        };
        MockHttpServerWrapper mockHttpServer = ServiceFactory.getMockHttpServer("mock_storage_node");
        try {
            DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
            mockHttpServer.addResponseForUriPattern(".*", defaultFullHttpResponse);
            defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "avro/binary").set("X-VENICE-STORE-VERSION", "1").set(HttpHeaderNames.CONTENT_LENGTH, "0").set("X-VENICE-SCHEMA-ID", "1").set("X-VENICE-COMPRESSION-STRATEGY", Integer.valueOf(compressionStrategy.getValue()));
            String address = mockHttpServer.getAddress();
            Instance instance = new Instance(address, mockHttpServer.getHost(), mockHttpServer.getPort());
            ArrayList arrayList = new ArrayList();
            arrayList.add(instance);
            ((ScatterGatherRequest) Mockito.doReturn(arrayList).when(scatterGatherRequest)).getHosts();
            ((VenicePath) Mockito.doReturn(new HttpGet("http://" + address + "/mock_get")).when(venicePath)).composeRouterRequest((String) Mockito.any());
            try {
                veniceDispatcher.dispatch(scatter, scatterGatherRequest, venicePath, basicFullHttpRequest, asyncPromise3, asyncPromise2, asyncPromise, asyncPromise4, executor);
            } catch (RouterException e2) {
                if (!z) {
                    throw new VeniceException(e2);
                }
            }
            TestUtils.waitForNonDeterministicAssertion(2L, TimeUnit.SECONDS, () -> {
                runnable.run();
            });
            if (mockHttpServer != null) {
                mockHttpServer.close();
            }
        } catch (Throwable th) {
            if (mockHttpServer != null) {
                try {
                    mockHttpServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
