package com.linkedin.venice.router;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.r2.transport.http.common.HttpProtocolVersion;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.exceptions.ServiceDiscoveryException;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.client.store.StatTrackingStoreClient;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.router.httpclient.StorageNodeClientType;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/router/TestStreaming.class */
public class TestStreaming {
    private static final int MAX_KEY_LIMIT = 1000;
    private static final int LAST_KEY_INDEX_WITH_NON_NULL_VALUE = 500;
    private static final String NON_EXISTING_KEY1 = "a_unknown_key";
    private static final String NON_EXISTING_KEY2 = "z_unknown_key";
    private static final int NON_EXISTING_KEY_NUM = 2;
    private VeniceClusterWrapper veniceCluster;
    private String storeVersionName;
    private int valueSchemaId;
    private String storeName;
    private CompressorFactory compressorFactory;
    private VeniceKafkaSerializer keySerializer;
    private VeniceKafkaSerializer valueSerializer;
    private VeniceWriter<Object, Object, Object> veniceWriter;
    private static final String KEY_PREFIX = "key_";
    private static final String KEY_SCHEMA = "\"string\"";
    private static final Logger LOGGER = LogManager.getLogger(TestStreaming.class);
    private static final String VALUE_SCHEMA = "{\n  \"type\": \"record\",\n  \"name\": \"test_value_schema\",\n  \"fields\": [\n   {\"name\": \"int_field\", \"type\": \"int\"},\n   {\"name\": \"float_field\", \"type\": \"float\"},\n   {\"name\": \"nullable_string_field\", \"type\": [\"null\", \"string\"], \"default\": null}\n  ]\n}";
    private static final Schema VALUE_SCHEMA_OBJECT = Schema.parse(VALUE_SCHEMA);

    @BeforeClass
    public void setUp() throws InterruptedException, ExecutionException, VeniceClientException, IOException {
        System.setProperty("io.netty.leakDetection.maxRecords", "50");
        System.setProperty("io.netty.leakDetection.level", "paranoid");
        Utils.thisIsLocalhost();
        this.veniceCluster = ServiceFactory.getVeniceCluster(1, NON_EXISTING_KEY_NUM, 0, NON_EXISTING_KEY_NUM, 100, true, false);
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        properties2.put(VeniceServerWrapper.SERVER_ENABLE_SSL, "true");
        this.veniceCluster.addVeniceServer(properties2, properties);
        CompressionStrategy compressionStrategy = CompressionStrategy.GZIP;
        this.storeName = Utils.getUniqueString("venice-store");
        this.veniceCluster.getNewStore(this.storeName, KEY_SCHEMA, VALUE_SCHEMA);
        this.storeVersionName = this.veniceCluster.getNewVersion(this.storeName, false).getKafkaTopic();
        this.veniceCluster.useControllerClient(controllerClient -> {
            controllerClient.updateStore(this.storeName, new UpdateStoreQueryParams().setReadComputationEnabled(true).setReadQuotaInCU(20000000L));
        });
        this.valueSchemaId = 1;
        this.keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA);
        this.valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA);
        this.compressorFactory = new CompressorFactory();
        VeniceCompressor compressor = this.compressorFactory.getCompressor(compressionStrategy);
        this.veniceWriter = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(this.storeVersionName).setKeySerializer(this.keySerializer).build());
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(this.storeVersionName);
        this.veniceWriter.broadcastStartOfPush(false, false, compressionStrategy, new HashMap());
        for (int i = 0; i < 10000; i++) {
            GenericData.Record record = new GenericData.Record(VALUE_SCHEMA_OBJECT);
            record.put("int_field", Integer.valueOf(i));
            record.put("float_field", Float.valueOf(i + 100.0f));
            if (i <= LAST_KEY_INDEX_WITH_NON_NULL_VALUE) {
                record.put("nullable_string_field", "nullable_string_field" + i);
            }
            this.veniceWriter.put(KEY_PREFIX + i, compressor.compress(this.valueSerializer.serialize("", record)), this.valueSchemaId).get();
        }
        this.veniceWriter.broadcastEndOfPush(new HashMap());
        String allControllersURLs = this.veniceCluster.getAllControllersURLs();
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return ControllerClient.getStore(allControllersURLs, this.veniceCluster.getClusterName(), this.storeName).getStore().getCurrentVersion() == parseVersionFromKafkaTopicName;
        });
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceWriter});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.compressorFactory});
    }

    private Properties getRouterProperties(boolean z, boolean z2, boolean z3) {
        Properties properties = new Properties();
        properties.put("router.long.tail.retry.for.single.get.threshold.ms", 1);
        properties.put("router.max.key_count.in.multiget.req", Integer.valueOf(MAX_KEY_LIMIT));
        properties.put("router.long.tail.retry.for.batch.get.threshold.ms", "1-:100");
        properties.put("router.storage.node.client.type", StorageNodeClientType.APACHE_HTTP_ASYNC_CLIENT.name());
        properties.put("router.client.decompression.enabled", Boolean.toString(z2));
        properties.put("router.http2.inbound.enabled", Boolean.toString(z3));
        return properties;
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeOut = 300000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testReadStreaming(boolean z) throws Exception {
        VeniceRouterWrapper addVeniceRouter = this.veniceCluster.addVeniceRouter(getRouterProperties(false, true, z));
        MetricsRepository metricsRepository = addVeniceRouter.getMetricsRepository();
        VeniceRouterWrapper addVeniceRouter2 = this.veniceCluster.addVeniceRouter(getRouterProperties(true, false, z));
        D2Client d2Client = null;
        Closeable closeable = null;
        try {
            d2Client = D2TestUtils.getD2Client(this.veniceCluster.getZk().getAddress(), true, z ? HttpProtocolVersion.HTTP_2 : HttpProtocolVersion.HTTP_1_1);
            D2TestUtils.startD2Client(d2Client);
            MetricsRepository metricsRepository2 = new MetricsRepository();
            closeable = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setD2Client(d2Client).setMetricsRepository(metricsRepository2).setUseFastAvro(false));
            StatTrackingStoreClient statTrackingStoreClient = (StatTrackingStoreClient) closeable;
            int i = 0;
            TreeSet treeSet = new TreeSet();
            treeSet.add(NON_EXISTING_KEY1);
            for (int i2 = 0; i2 < 998; i2++) {
                treeSet.add(KEY_PREFIX + i2);
            }
            treeSet.add(NON_EXISTING_KEY2);
            while (true) {
                i++;
                if (i > 10) {
                    break;
                }
                final VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
                final AtomicInteger atomicInteger = new AtomicInteger(0);
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                statTrackingStoreClient.streamingBatchGet(treeSet, new StreamingCallback<String, Object>() { // from class: com.linkedin.venice.router.TestStreaming.1
                    public void onRecordReceived(String str, Object obj) {
                        if (obj != null) {
                            veniceConcurrentHashMap.put(str, obj);
                        }
                        atomicInteger.getAndIncrement();
                    }

                    public void onCompletion(Optional<Exception> optional) {
                        countDownLatch.countDown();
                        if (optional.isPresent()) {
                            TestStreaming.LOGGER.info("MultiGet onCompletion invoked with Venice Exception", optional.get());
                            Assert.fail("Exception: " + optional.get() + " is not expected");
                        }
                    }
                });
                countDownLatch.await();
                Assert.assertEquals(atomicInteger.get(), MAX_KEY_LIMIT);
                Assert.assertEquals(veniceConcurrentHashMap.size(), 998);
                verifyMultiGetResult(veniceConcurrentHashMap);
                Map<String, Object> map = (Map) statTrackingStoreClient.streamingBatchGet(treeSet).get();
                Assert.assertEquals(map.size(), 998);
                verifyMultiGetResult(map);
                final AtomicInteger atomicInteger2 = new AtomicInteger(0);
                final VeniceConcurrentHashMap veniceConcurrentHashMap2 = new VeniceConcurrentHashMap();
                final CountDownLatch countDownLatch2 = new CountDownLatch(1);
                statTrackingStoreClient.compute().project(new String[]{"int_field", "nullable_string_field"}).streamingExecute(treeSet, new StreamingCallback<String, ComputeGenericRecord>() { // from class: com.linkedin.venice.router.TestStreaming.2
                    public void onRecordReceived(String str, ComputeGenericRecord computeGenericRecord) {
                        atomicInteger2.incrementAndGet();
                        if (computeGenericRecord != null) {
                            veniceConcurrentHashMap2.put(str, computeGenericRecord);
                        }
                    }

                    public void onCompletion(Optional<Exception> optional) {
                        countDownLatch2.countDown();
                        if (optional.isPresent()) {
                            TestStreaming.LOGGER.info("Compute onCompletion invoked with Venice Exception", optional.get());
                            Assert.fail("Exception: " + optional.get() + " is not expected");
                        }
                    }
                });
                countDownLatch2.await();
                Assert.assertEquals(atomicInteger2.get(), MAX_KEY_LIMIT);
                Assert.assertEquals(veniceConcurrentHashMap2.size(), 998);
                verifyComputeResult(veniceConcurrentHashMap2);
                Map<String, ComputeGenericRecord> map2 = (Map) statTrackingStoreClient.compute().project(new String[]{"int_field", "nullable_string_field"}).streamingExecute(treeSet).get();
                Assert.assertEquals(map2.size(), 998);
                verifyComputeResult(map2);
            }
            String str = "." + this.storeName;
            Map metrics = metricsRepository2.metrics();
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_request.OccurrenceRate").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_healthy_request_latency.Avg").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_response_ttfr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_response_tt50pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_response_tt90pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_response_tt95pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_response_tt99pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--multiget_streaming_healthy_request.OccurrenceRate").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_request.OccurrenceRate").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_healthy_request_latency.Avg").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_response_ttfr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_response_tt50pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_response_tt90pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_response_tt95pr.50thPercentile").toString())).value() > 0.0d);
            Assert.assertTrue(((Metric) metrics.get(new StringBuilder().append(str).append("--compute_streaming_response_tt99pr.50thPercentile").toString())).value() > 0.0d);
            LOGGER.info("The following metrics are Router metrics:");
            Iterator it = Arrays.asList(metricsRepository).iterator();
            while (it.hasNext()) {
                Map metrics2 = ((MetricsRepository) it.next()).metrics();
                Assert.assertTrue(((Metric) metrics2.get(new StringBuilder().append(str).append("--multiget_streaming_request.OccurrenceRate").toString())).value() > 0.0d);
                Assert.assertTrue(((Metric) metrics2.get(new StringBuilder().append(str).append("--multiget_streaming_latency.99thPercentile").toString())).value() > 0.0d);
                Assert.assertTrue(((Metric) metrics2.get(new StringBuilder().append(str).append("--multiget_streaming_fanout_request_count.Avg").toString())).value() > 0.0d);
                Assert.assertTrue(((Metric) metrics2.get(new StringBuilder().append(str).append("--compute_streaming_request.OccurrenceRate").toString())).value() > 0.0d);
                Assert.assertTrue(((Metric) metrics2.get(new StringBuilder().append(str).append("--compute_streaming_latency.99thPercentile").toString())).value() > 0.0d);
                Assert.assertTrue(((Metric) metrics2.get(new StringBuilder().append(str).append("--compute_streaming_fanout_request_count.Avg").toString())).value() > 0.0d);
            }
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{addVeniceRouter});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{addVeniceRouter2});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
            if (d2Client != null) {
                D2ClientUtils.shutdownClient(d2Client);
            }
        } catch (Throwable th) {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{addVeniceRouter});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{addVeniceRouter2});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
            if (d2Client != null) {
                D2ClientUtils.shutdownClient(d2Client);
            }
            throw th;
        }
    }

    private void verifyMultiGetResult(Map<String, Object> map) {
        for (int i = 0; i < 998; i++) {
            Object obj = map.get(KEY_PREFIX + i);
            Assert.assertTrue(obj instanceof GenericRecord);
            GenericRecord genericRecord = (GenericRecord) obj;
            Assert.assertEquals(genericRecord.get("int_field"), Integer.valueOf(i));
            Assert.assertEquals(genericRecord.get("float_field"), Float.valueOf(i + 100.0f));
        }
    }

    private void verifyComputeResult(Map<String, ComputeGenericRecord> map) {
        for (int i = 0; i < 998; i++) {
            String str = KEY_PREFIX + i;
            GenericRecord genericRecord = map.get(str);
            Assert.assertEquals(genericRecord.get("int_field"), Integer.valueOf(i));
            Assert.assertNull(genericRecord.get("float_field"));
            if (i <= LAST_KEY_INDEX_WITH_NON_NULL_VALUE) {
                Assert.assertTrue(genericRecord.get("nullable_string_field").toString().equals("nullable_string_field" + i));
            } else {
                Assert.assertNull(genericRecord.get("nullable_string_field"), "Field: 'nullable_string_field' should be 'null' for key: " + str);
            }
        }
    }

    @Test(timeOut = 30000)
    public void testWithNonExistingStore() throws ExecutionException, InterruptedException {
        String uniqueString = Utils.getUniqueString("non_existing_store");
        D2Client d2Client = null;
        Closeable closeable = null;
        VeniceRouterWrapper veniceRouterWrapper = null;
        try {
            try {
                try {
                    veniceRouterWrapper = this.veniceCluster.addVeniceRouter(new Properties());
                    d2Client = D2TestUtils.getD2Client(this.veniceCluster.getZk().getAddress(), false);
                    D2TestUtils.startD2Client(d2Client);
                    closeable = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setD2Client(d2Client));
                    closeable.get("test").get();
                    Assert.fail("An exception is expected here");
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
                    if (d2Client != null) {
                        D2ClientUtils.shutdownClient(d2Client);
                    }
                } catch (ServiceDiscoveryException e) {
                    Assert.assertTrue(e.getCause() instanceof VeniceNoStoreException);
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                    Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
                    if (d2Client != null) {
                        D2ClientUtils.shutdownClient(d2Client);
                    }
                }
            } catch (Throwable th) {
                Assert.fail("Unexpected exception received: " + th.getClass());
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
                if (d2Client != null) {
                    D2ClientUtils.shutdownClient(d2Client);
                }
            }
        } catch (Throwable th2) {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
            if (d2Client != null) {
                D2ClientUtils.shutdownClient(d2Client);
            }
            throw th2;
        }
    }

    @Test(timeOut = 30000)
    public void testWithForceClusterDiscovery() {
        Closeable closeable = null;
        VeniceRouterWrapper veniceRouterWrapper = null;
        try {
            try {
                veniceRouterWrapper = this.veniceCluster.addVeniceRouter(new Properties());
                closeable = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setD2Client(D2TestUtils.getD2Client(this.veniceCluster.getZk().getAddress(), false)).setForceClusterDiscoveryAtStartTime(true));
                Assert.fail("An exception is expected here");
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
            } catch (Throwable th) {
                if (!(th instanceof VeniceException) || !th.getMessage().contains("Failed to initializing Venice Client")) {
                    Assert.fail("Unexpected exception received: " + th.getClass());
                }
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
                Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
            }
        } catch (Throwable th2) {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{veniceRouterWrapper});
            throw th2;
        }
    }
}
