package com.linkedin.venice.listener;

import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.listener.response.MetadataResponse;
import com.linkedin.davinci.storage.DiskHealthCheckService;
import com.linkedin.davinci.storage.MetadataRetriever;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.venice.admin.protocol.response.ConsumptionStateSnapshot;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.listener.ErrorCountAppender;
import com.linkedin.venice.listener.request.AdminRequest;
import com.linkedin.venice.listener.request.GetRouterRequest;
import com.linkedin.venice.listener.request.HealthCheckRequest;
import com.linkedin.venice.listener.request.MetadataFetchRequest;
import com.linkedin.venice.listener.request.MultiGetRouterRequestWrapper;
import com.linkedin.venice.listener.response.HttpShortcutResponse;
import com.linkedin.venice.listener.response.MultiGetResponseWrapper;
import com.linkedin.venice.listener.response.StorageResponseObject;
import com.linkedin.venice.meta.PartitionerConfigImpl;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.ServerAdminAction;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.metadata.response.VersionProperties;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.read.protocol.request.router.MultiGetRouterRequestKeyV1;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.unit.kafka.SimplePartitioner;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.TestUtils;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/listener/StorageReadRequestsHandlerTest.class */
public class StorageReadRequestsHandlerTest {
    @Test
    public static void storageExecutionHandlerPassesRequestsAndGeneratesResponses() throws Exception {
        ArrayList arrayList = new ArrayList();
        byte[] serialize = ValueRecord.create(1, "testvalue".getBytes()).serialize();
        GetRouterRequest parseGetHttpRequest = GetRouterRequest.parseGetHttpRequest(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/storage/temp-test-topic_v1/3/testkey"));
        AbstractStorageEngine abstractStorageEngine = (AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class);
        ((AbstractStorageEngine) Mockito.doReturn(serialize).when(abstractStorageEngine)).get(3, ByteBuffer.wrap("testkey".getBytes()));
        StorageEngineRepository storageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        ((StorageEngineRepository) Mockito.doReturn(abstractStorageEngine).when(storageEngineRepository)).getLocalStorageEngine("temp-test-topic_v1");
        MetadataRetriever metadataRetriever = (MetadataRetriever) Mockito.mock(MetadataRetriever.class);
        ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        Store store = (Store) Mockito.mock(Store.class);
        Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(Optional.empty());
        Mockito.when(readOnlyStoreRepository.getStoreOrThrow(Mockito.anyString())).thenReturn(store);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
        Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
            arrayList.add(invocationOnMock.getArguments()[0]);
            return null;
        });
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, storageEngineRepository, readOnlyStoreRepository, readOnlySchemaRepository, metadataRetriever, (DiskHealthCheckService) null, false, false, 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty()).channelRead(channelHandlerContext, parseGetHttpRequest);
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertEquals(arrayList.size(), 1);
            StorageResponseObject storageResponseObject = (StorageResponseObject) arrayList.get(0);
            Assert.assertEquals(storageResponseObject.getValueRecord().getDataInBytes(), "testvalue".getBytes());
            Assert.assertEquals(storageResponseObject.getValueRecord().getSchemaId(), 1);
            TestUtils.shutdownExecutor(threadPoolExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(threadPoolExecutor);
            throw th;
        }
    }

    @Test
    public void testDiskHealthCheckService() throws Exception {
        DiskHealthCheckService diskHealthCheckService = (DiskHealthCheckService) Mockito.mock(DiskHealthCheckService.class);
        ((DiskHealthCheckService) Mockito.doReturn(true).when(diskHealthCheckService)).isDiskHealthy();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            StorageEngineRepository storageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
            ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
            ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
            MetadataRetriever metadataRetriever = (MetadataRetriever) Mockito.mock(MetadataRetriever.class);
            VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
            ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
            StorageReadRequestsHandler storageReadRequestsHandler = new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, storageEngineRepository, readOnlyStoreRepository, readOnlySchemaRepository, metadataRetriever, diskHealthCheckService, false, false, 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty());
            ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
            ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
            ArrayList arrayList = new ArrayList();
            Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
                arrayList.add(invocationOnMock.getArguments()[0]);
                return null;
            });
            storageReadRequestsHandler.channelRead(channelHandlerContext, new HealthCheckRequest());
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertTrue(arrayList.get(0) instanceof HttpShortcutResponse);
            Assert.assertEquals(((HttpShortcutResponse) arrayList.get(0)).getStatus(), HttpResponseStatus.OK);
            TestUtils.shutdownExecutor(threadPoolExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(threadPoolExecutor);
            throw th;
        }
    }

    private static void waitUntilStorageExecutionHandlerRespond(List<Object> list) throws Exception {
        int i = 1;
        while (list.size() < 1) {
            Thread.sleep(10L);
            i++;
            if (i > 200) {
                throw new RuntimeException("Timeout waiting for StorageExecutionHandler output to appear");
            }
        }
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public static void testMultiGetNotUsingKeyBytes(Boolean bool) throws Exception {
        ArrayList arrayList = new ArrayList();
        String str = "/storage/temp-test-topic_v1";
        RecordSerializer avroGenericSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(MultiGetRouterRequestKeyV1.SCHEMA$);
        ArrayList arrayList2 = new ArrayList();
        VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"string\"");
        SimplePartitioner simplePartitioner = new SimplePartitioner();
        AbstractStorageEngine abstractStorageEngine = (AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 10; i++) {
            MultiGetRouterRequestKeyV1 multiGetRouterRequestKeyV1 = new MultiGetRouterRequestKeyV1();
            byte[] serialize = veniceAvroKafkaSerializer.serialize((String) null, "key_" + i);
            int partitionId = simplePartitioner.getPartitionId(serialize, 3);
            multiGetRouterRequestKeyV1.keyBytes = ByteBuffer.wrap(serialize);
            multiGetRouterRequestKeyV1.keyIndex = i;
            multiGetRouterRequestKeyV1.partitionId = 0;
            String str2 = "value_" + i;
            ((AbstractStorageEngine) Mockito.doReturn(ValueRecord.create(1, str2.getBytes()).serialize()).when(abstractStorageEngine)).get(partitionId, ByteBuffer.wrap(serialize));
            hashMap.put(Integer.valueOf(i), str2);
            arrayList2.add(multiGetRouterRequestKeyV1);
        }
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, str, Unpooled.wrappedBuffer(avroGenericSerializer.serializeObjects(arrayList2)));
        defaultFullHttpRequest.headers().set("X-VENICE-API-VERSION", Integer.valueOf(ReadAvroProtocolDefinition.MULTI_GET_ROUTER_REQUEST_V1.getProtocolVersion()));
        MultiGetRouterRequestWrapper parseMultiGetHttpRequest = MultiGetRouterRequestWrapper.parseMultiGetHttpRequest(defaultFullHttpRequest);
        StorageEngineRepository storageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        ((StorageEngineRepository) Mockito.doReturn(abstractStorageEngine).when(storageEngineRepository)).getLocalStorageEngine("temp-test-topic_v1");
        MetadataRetriever metadataRetriever = (MetadataRetriever) Mockito.mock(MetadataRetriever.class);
        ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        Store store = (Store) Mockito.mock(Store.class);
        Version version = (Version) Mockito.mock(Version.class);
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(SimplePartitioner.class.getName());
        partitionerConfigImpl.setAmplificationFactor(3);
        Mockito.when(version.getPartitionerConfig()).thenReturn(partitionerConfigImpl);
        Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(Optional.of(version));
        Mockito.when(readOnlyStoreRepository.getStoreOrThrow(Mockito.anyString())).thenReturn(store);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
        Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
            arrayList.add(invocationOnMock.getArguments()[0]);
            return null;
        });
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, storageEngineRepository, readOnlyStoreRepository, readOnlySchemaRepository, metadataRetriever, (DiskHealthCheckService) null, false, bool.booleanValue(), 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty()).channelRead(channelHandlerContext, parseMultiGetHttpRequest);
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertEquals(arrayList.size(), 1);
            Assert.assertTrue(arrayList.get(0) instanceof MultiGetResponseWrapper);
            Iterable deserializeObjects = SerializerDeserializerFactory.getAvroSpecificDeserializer(MultiGetResponseRecordV1.class).deserializeObjects(((MultiGetResponseWrapper) arrayList.get(0)).getResponseBody().array());
            HashMap hashMap2 = new HashMap();
            deserializeObjects.forEach(multiGetResponseRecordV1 -> {
                hashMap2.put(Integer.valueOf(multiGetResponseRecordV1.keyIndex), new String(multiGetResponseRecordV1.value.array(), StandardCharsets.UTF_8));
            });
            Assert.assertEquals(hashMap2.size(), 10);
            for (int i2 = 0; i2 < 10; i2++) {
                Assert.assertEquals((String) hashMap2.get(Integer.valueOf(i2)), (String) hashMap.get(Integer.valueOf(i2)));
            }
        } finally {
            TestUtils.shutdownExecutor(threadPoolExecutor);
        }
    }

    @Test
    public static void storageExecutionHandlerLogsExceptions() throws Exception {
        ArrayList arrayList = new ArrayList();
        byte[] serialize = ValueRecord.create(1, "testvalue".getBytes()).serialize();
        GetRouterRequest parseGetHttpRequest = GetRouterRequest.parseGetHttpRequest(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/storage/temp-test-topic_v1/3/testkey"));
        ((AbstractStorageEngine) Mockito.doReturn(serialize).when((AbstractStorageEngine) Mockito.mock(AbstractStorageEngine.class))).get(3, ByteBuffer.wrap("testkey".getBytes()));
        StorageEngineRepository storageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        ((StorageEngineRepository) Mockito.doThrow(new Throwable[]{new VeniceException("Exception thrown in Mock")}).when(storageEngineRepository)).getLocalStorageEngine("temp-test-topic_v1");
        MetadataRetriever metadataRetriever = (MetadataRetriever) Mockito.mock(MetadataRetriever.class);
        ReadOnlySchemaRepository readOnlySchemaRepository = (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class);
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        Store store = (Store) Mockito.mock(Store.class);
        Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(Optional.empty());
        Mockito.when(readOnlyStoreRepository.getStoreOrThrow(Mockito.anyString())).thenReturn(store);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
        Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
            arrayList.add(invocationOnMock.getArguments()[0]);
            return null;
        });
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            AtomicInteger atomicInteger = new AtomicInteger();
            ErrorCountAppender m0build = new ErrorCountAppender.Builder().setErrorMessageCounter(atomicInteger).setExceptionMessage("Exception thrown in Mock").m0build();
            m0build.start();
            LogManager.getContext(false).getConfiguration().addLoggerAppender(LogManager.getLogger(StorageReadRequestsHandler.class), m0build);
            new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, storageEngineRepository, readOnlyStoreRepository, readOnlySchemaRepository, metadataRetriever, (DiskHealthCheckService) null, false, false, 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty()).channelRead(channelHandlerContext, parseGetHttpRequest);
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertEquals(arrayList.size(), 1);
            HttpShortcutResponse httpShortcutResponse = (HttpShortcutResponse) arrayList.get(0);
            Assert.assertEquals(httpShortcutResponse.getStatus(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
            Assert.assertEquals(httpShortcutResponse.getMessage(), "Exception thrown in Mock");
            Assert.assertTrue(atomicInteger.get() > 0);
            TestUtils.shutdownExecutor(threadPoolExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(threadPoolExecutor);
            throw th;
        }
    }

    @Test
    public static void testAdminRequestsPassInStorageExecutionHandler() throws Exception {
        ArrayList arrayList = new ArrayList();
        AdminRequest parseAdminHttpRequest = AdminRequest.parseAdminHttpRequest(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + QueryAction.ADMIN.toString().toLowerCase() + "/test_store_v1/" + ServerAdminAction.DUMP_INGESTION_STATE.toString()));
        AdminResponse adminResponse = new AdminResponse();
        adminResponse.addPartitionConsumptionState(new PartitionConsumptionState(12345, 1, new OffsetRecord(AvroProtocolDefinition.PARTITION_STATE.getSerializer()), false));
        MetadataRetriever metadataRetriever = (MetadataRetriever) Mockito.mock(MetadataRetriever.class);
        ((MetadataRetriever) Mockito.doReturn(adminResponse).when(metadataRetriever)).getConsumptionSnapshots((String) Mockito.eq("test_store_v1"), (ComplementSet) Mockito.any());
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
        Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
            arrayList.add(invocationOnMock.getArguments()[0]);
            return null;
        });
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
            ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
            new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class), (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class), (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class), metadataRetriever, (DiskHealthCheckService) null, false, false, 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty()).channelRead(channelHandlerContext, parseAdminHttpRequest);
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertEquals(arrayList.size(), 1);
            Assert.assertTrue(arrayList.get(0) instanceof AdminResponse);
            AdminResponse adminResponse2 = (AdminResponse) arrayList.get(0);
            Assert.assertEquals(adminResponse2.getResponseRecord().partitionConsumptionStates.size(), 1);
            Assert.assertEquals(((ConsumptionStateSnapshot) adminResponse2.getResponseRecord().partitionConsumptionStates.get(0)).partitionId, 12345);
            Assert.assertEquals(adminResponse2.getResponseSchemaIdHeader(), AvroProtocolDefinition.SERVER_ADMIN_RESPONSE_V1.getCurrentProtocolVersion());
            TestUtils.shutdownExecutor(threadPoolExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(threadPoolExecutor);
            throw th;
        }
    }

    @Test
    public static void testMetadataFetchRequestsPassInStorageExecutionHandler() throws Exception {
        Map singletonMap = Collections.singletonMap("test_key_schema_id", "test_key_schema");
        Map singletonMap2 = Collections.singletonMap("test_value_schema_id", "test_value_schemas");
        ArrayList arrayList = new ArrayList();
        MetadataFetchRequest parseGetHttpRequest = MetadataFetchRequest.parseGetHttpRequest(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + QueryAction.METADATA.toString().toLowerCase() + "/test_store_name"));
        MetadataResponse metadataResponse = new MetadataResponse();
        VersionProperties versionProperties = new VersionProperties(1, 0, 1, "test_partitioner_class", Collections.singletonMap("test_partitioner_param", "test_param"), 2);
        metadataResponse.setVersions(Collections.singletonList(1));
        metadataResponse.setVersionMetadata(versionProperties);
        metadataResponse.setKeySchema(singletonMap);
        metadataResponse.setValueSchemas(singletonMap2);
        MetadataRetriever metadataRetriever = (MetadataRetriever) Mockito.mock(MetadataRetriever.class);
        ((MetadataRetriever) Mockito.doReturn(metadataResponse).when(metadataRetriever)).getMetadata((String) Mockito.eq("test_store_name"));
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
        Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
            arrayList.add(invocationOnMock.getArguments()[0]);
            return null;
        });
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
            ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
            new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class), (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class), (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class), metadataRetriever, (DiskHealthCheckService) null, false, false, 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty()).channelRead(channelHandlerContext, parseGetHttpRequest);
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertEquals(arrayList.size(), 1);
            Assert.assertTrue(arrayList.get(0) instanceof MetadataResponse);
            MetadataResponse metadataResponse2 = (MetadataResponse) arrayList.get(0);
            Assert.assertEquals(metadataResponse2.getResponseRecord().getVersionMetadata(), versionProperties);
            Assert.assertEquals(metadataResponse2.getResponseRecord().getKeySchema(), singletonMap);
            Assert.assertEquals(metadataResponse2.getResponseRecord().getValueSchemas(), singletonMap2);
            TestUtils.shutdownExecutor(threadPoolExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(threadPoolExecutor);
            throw th;
        }
    }

    @Test
    public static void testUnrecognizedRequestInStorageExecutionHandler() throws Exception {
        ArrayList arrayList = new ArrayList();
        Object obj = new Object();
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        ((ChannelHandlerContext) Mockito.doReturn(new UnpooledByteBufAllocator(true)).when(channelHandlerContext)).alloc();
        Mockito.when(channelHandlerContext.writeAndFlush(Mockito.any())).then(invocationOnMock -> {
            arrayList.add(invocationOnMock.getArguments()[0]);
            return null;
        });
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2));
        try {
            VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
            ((VeniceServerConfig) Mockito.doReturn((RocksDBServerConfig) Mockito.mock(RocksDBServerConfig.class)).when(veniceServerConfig)).getRocksDBServerConfig();
            new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor, (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class), (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class), (ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class), (MetadataRetriever) Mockito.mock(MetadataRetriever.class), (DiskHealthCheckService) null, false, false, 10, veniceServerConfig, (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class), Optional.empty()).channelRead(channelHandlerContext, obj);
            waitUntilStorageExecutionHandlerRespond(arrayList);
            Assert.assertEquals(arrayList.size(), 1);
            Assert.assertTrue(arrayList.get(0) instanceof HttpShortcutResponse);
            HttpShortcutResponse httpShortcutResponse = (HttpShortcutResponse) arrayList.get(0);
            Assert.assertEquals(httpShortcutResponse.getStatus(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
            Assert.assertEquals(httpShortcutResponse.getMessage(), "Unrecognized object in StorageExecutionHandler");
            TestUtils.shutdownExecutor(threadPoolExecutor);
        } catch (Throwable th) {
            TestUtils.shutdownExecutor(threadPoolExecutor);
            throw th;
        }
    }
}
