package com.linkedin.venice.storagenode;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.admin.protocol.response.AdminResponseRecord;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.ServerAdminAction;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.read.protocol.request.router.MultiGetRouterRequestKeyV1;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.JsonEncoder;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/storagenode/StorageNodeReadTest.class */
public class StorageNodeReadTest {
    private static final Logger LOGGER = LogManager.getLogger(StorageNodeReadTest.class);
    private VeniceClusterWrapper veniceCluster;
    private String storeVersionName;
    private int valueSchemaId;
    private String storeName;
    private int partitionCount;
    private String serverAddr;
    private String routerAddr;
    private VeniceKafkaSerializer keySerializer;
    private VeniceKafkaSerializer valueSerializer;
    private VeniceWriter<Object, Object, Object> veniceWriter;
    private AvroGenericStoreClient client;
    private final Base64.Encoder encoder = Base64.getUrlEncoder();

    @BeforeClass(alwaysRun = true)
    public void setUp() throws InterruptedException, ExecutionException, VeniceClientException {
        this.veniceCluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().build());
        this.serverAddr = this.veniceCluster.getVeniceServers().get(0).getAddress();
        this.routerAddr = "http://" + this.veniceCluster.getVeniceRouters().get(0).getAddress();
        VersionCreationResponse newStoreVersion = this.veniceCluster.getNewStoreVersion();
        this.storeVersionName = newStoreVersion.getKafkaTopic();
        this.storeName = Version.parseStoreFromKafkaTopicName(this.storeVersionName);
        this.valueSchemaId = 1;
        this.partitionCount = newStoreVersion.getPartitions();
        this.keySerializer = new VeniceAvroKafkaSerializer("\"string\"");
        this.valueSerializer = new VeniceAvroKafkaSerializer("\"string\"");
        this.veniceWriter = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(this.storeVersionName).setKeySerializer(this.keySerializer).setValueSerializer(this.valueSerializer).build());
        this.client = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.client});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceWriter});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
    }

    private int getPartitionId(byte[] bArr) {
        return new DefaultVenicePartitioner().getPartitionId(bArr, this.partitionCount);
    }

    @Test(timeOut = 30000)
    public void testRead() throws Exception {
        pushSyntheticData("key_", "value_", 100, this.veniceCluster, this.veniceWriter, Version.parseVersionFromKafkaTopicName(this.storeVersionName));
        CloseableHttpAsyncClient createDefault = HttpAsyncClients.createDefault();
        try {
            createDefault.start();
            byte[] serialize = this.keySerializer.serialize((String) null, "key_0");
            HttpResponse httpResponse = (HttpResponse) createDefault.execute(new HttpGet("http://" + this.serverAddr + "/storage/" + this.storeVersionName + "/" + getPartitionId(serialize) + "/" + this.encoder.encodeToString(serialize) + "?f=b64"), (FutureCallback) null).get();
            InputStream content = httpResponse.getEntity().getContent();
            try {
                byte[] byteArray = IOUtils.toByteArray(content);
                Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Response did not return 200: " + new String(byteArray));
                Assert.assertEquals(this.valueSerializer.deserialize((String) null, byteArray).toString(), "value_0");
                Assert.assertEquals(httpResponse.getLastHeader("X-VENICE-RCU").getValue(), "1");
                if (content != null) {
                    content.close();
                }
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 10; i++) {
                    MultiGetRouterRequestKeyV1 multiGetRouterRequestKeyV1 = new MultiGetRouterRequestKeyV1();
                    byte[] serialize2 = this.keySerializer.serialize((String) null, "key_" + i);
                    multiGetRouterRequestKeyV1.keyBytes = ByteBuffer.wrap(serialize2);
                    multiGetRouterRequestKeyV1.keyIndex = i;
                    multiGetRouterRequestKeyV1.partitionId = getPartitionId(serialize2);
                    arrayList.add(multiGetRouterRequestKeyV1);
                }
                byte[] serializeObjects = SerializerDeserializerFactory.getAvroGenericSerializer(MultiGetRouterRequestKeyV1.SCHEMA$).serializeObjects(arrayList);
                HttpPost httpPost = new HttpPost("http://" + this.serverAddr + "/storage/" + this.storeVersionName);
                BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
                basicHttpEntity.setContent(new ByteArrayInputStream(serializeObjects));
                httpPost.setEntity(basicHttpEntity);
                httpPost.setHeader("X-VENICE-API-VERSION", Integer.toString(ReadAvroProtocolDefinition.MULTI_GET_ROUTER_REQUEST_V1.getProtocolVersion()));
                RecordDeserializer avroSpecificDeserializer = SerializerDeserializerFactory.getAvroSpecificDeserializer(MultiGetResponseRecordV1.class);
                HttpResponse httpResponse2 = (HttpResponse) createDefault.execute(httpPost, (FutureCallback) null).get();
                Assert.assertEquals(httpResponse2.getLastHeader("X-VENICE-RCU").getValue(), String.valueOf(arrayList.size()));
                InputStream content2 = httpResponse2.getEntity().getContent();
                try {
                    byte[] byteArray2 = IOUtils.toByteArray(content2);
                    Assert.assertEquals(httpResponse2.getStatusLine().getStatusCode(), 200, "Response did not return 200: " + new String(byteArray2));
                    Iterable deserializeObjects = avroSpecificDeserializer.deserializeObjects(byteArray2);
                    HashMap hashMap = new HashMap();
                    deserializeObjects.forEach(multiGetResponseRecordV1 -> {
                        hashMap.put(Integer.valueOf(multiGetResponseRecordV1.keyIndex), this.valueSerializer.deserialize((String) null, multiGetResponseRecordV1.value.array()).toString());
                    });
                    Assert.assertEquals(hashMap.size(), 10);
                    for (int i2 = 0; i2 < 10; i2++) {
                        Assert.assertEquals((String) hashMap.get(Integer.valueOf(i2)), "value_" + i2);
                    }
                    if (content2 != null) {
                        content2.close();
                    }
                    HttpResponse httpResponse3 = (HttpResponse) createDefault.execute(new HttpGet("http://" + this.serverAddr + "/" + QueryAction.ADMIN.toString().toLowerCase() + "/" + this.storeVersionName + "/" + ServerAdminAction.DUMP_INGESTION_STATE.toString().toLowerCase()), (FutureCallback) null).get();
                    content = httpResponse3.getEntity().getContent();
                    try {
                        byte[] byteArray3 = IOUtils.toByteArray(content);
                        Assert.assertEquals(httpResponse3.getStatusLine().getStatusCode(), 200, "Response did not return 200: " + new String(byteArray3));
                        Object deserialize = SerializerDeserializerFactory.getAvroGenericDeserializer(AdminResponseRecord.SCHEMA$).deserialize((Object) null, byteArray3);
                        try {
                            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                            try {
                                GenericDatumWriter genericDatumWriter = new GenericDatumWriter(AdminResponseRecord.SCHEMA$);
                                JsonEncoder newJsonEncoder = AvroCompatibilityHelper.newJsonEncoder(AdminResponseRecord.SCHEMA$, byteArrayOutputStream, true);
                                genericDatumWriter.write(deserialize, newJsonEncoder);
                                newJsonEncoder.flush();
                                byteArrayOutputStream.flush();
                                LOGGER.info("Got an admin response: {}", byteArrayOutputStream);
                                byteArrayOutputStream.close();
                                GenericRecord genericRecord = (GenericRecord) deserialize;
                                Assert.assertNotNull(genericRecord.get("partitionConsumptionStates"));
                                Assert.assertTrue(((List) genericRecord.get("partitionConsumptionStates")).size() > 0);
                                if (content != null) {
                                    content.close();
                                }
                                if (createDefault != null) {
                                    createDefault.close();
                                }
                                for (boolean z : new boolean[]{false, true}) {
                                    String str = "{reuseObjectsForSerialization = " + z + "}";
                                    AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setVeniceURL(this.routerAddr).setReuseObjectsForSerialization(z));
                                    try {
                                        HashSet hashSet = new HashSet();
                                        for (int i3 = 0; i3 < 10; i3++) {
                                            hashSet.add("key_" + i3);
                                        }
                                        hashSet.add("unknown_key");
                                        try {
                                            Map map = (Map) andStartGenericAvroClient.batchGet(hashSet).get();
                                            Assert.assertEquals(map.size(), 10, "Unexpected result size " + str);
                                            for (int i4 = 0; i4 < 10; i4++) {
                                                Assert.assertEquals(((CharSequence) map.get("key_" + i4)).toString(), "value_" + i4, "Key " + i4 + " does not have expected value " + str);
                                            }
                                        } catch (Exception e) {
                                            Assert.fail("Batch get failed " + str, e);
                                        }
                                        if (andStartGenericAvroClient != null) {
                                            andStartGenericAvroClient.close();
                                        }
                                    } catch (Throwable th) {
                                        if (andStartGenericAvroClient != null) {
                                            try {
                                                andStartGenericAvroClient.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        }
                                        throw th;
                                    }
                                }
                            } catch (Throwable th3) {
                                try {
                                    byteArrayOutputStream.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                                throw th3;
                            }
                        } catch (IOException e2) {
                            throw new VeniceException(e2);
                        }
                    } finally {
                        if (content != null) {
                            try {
                                content.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th6) {
            if (createDefault != null) {
                try {
                    createDefault.close();
                } catch (Throwable th7) {
                    th6.addSuppressed(th7);
                }
            }
            throw th6;
        }
    }

    @Test(timeOut = 60000)
    public void testDiskHealthCheckService() throws Exception {
        VeniceServerWrapper veniceServerWrapper = null;
        try {
            Properties properties = new Properties();
            properties.put("server.disk.health.check.interval.in.seconds", 5);
            properties.put("server.shutdown.ssd.unhealthy.time.ms", 1000);
            veniceServerWrapper = this.veniceCluster.addVeniceServer(properties);
            String address = veniceServerWrapper.getAddress();
            CloseableHttpAsyncClient createDefault = HttpAsyncClients.createDefault();
            try {
                createDefault.start();
                Assert.assertEquals(sendHeartbeatRequest(createDefault, address).getStatusLine().getStatusCode(), 200);
                Thread.sleep(TimeUnit.SECONDS.toMillis(5L));
                Assert.assertEquals(sendHeartbeatRequest(createDefault, address).getStatusLine().getStatusCode(), 200);
                FileUtils.deleteDirectory(veniceServerWrapper.getDataDirectory());
                Thread.sleep(TimeUnit.SECONDS.toMillis(5L));
                Assert.assertEquals(sendHeartbeatRequest(createDefault, address).getStatusLine().getStatusCode(), 500);
                if (createDefault != null) {
                    createDefault.close();
                }
                if (veniceServerWrapper != null) {
                    this.veniceCluster.removeVeniceServer(veniceServerWrapper.getPort());
                }
            } finally {
            }
        } catch (Throwable th) {
            if (veniceServerWrapper != null) {
                this.veniceCluster.removeVeniceServer(veniceServerWrapper.getPort());
            }
            throw th;
        }
    }

    private HttpResponse sendHeartbeatRequest(CloseableHttpAsyncClient closeableHttpAsyncClient, String str) throws Exception {
        return (HttpResponse) closeableHttpAsyncClient.execute(new HttpGet("http://" + str + "/" + QueryAction.HEALTH.toString().toLowerCase() + "?f=b64"), (FutureCallback) null).get();
    }

    private void pushSyntheticData(String str, String str2, int i, VeniceClusterWrapper veniceClusterWrapper, VeniceWriter<Object, Object, Object> veniceWriter, int i2) throws Exception {
        veniceWriter.broadcastStartOfPush(new HashMap());
        Future[] futureArr = new Future[i];
        for (int i3 = 0; i3 < i; i3++) {
            futureArr[i3] = veniceWriter.put(str + i3, str2 + i3, this.valueSchemaId);
        }
        for (int i4 = 0; i4 < i; i4++) {
            futureArr[i4].get();
        }
        veniceWriter.broadcastEndOfPush(new HashMap());
        String allControllersURLs = veniceClusterWrapper.getAllControllersURLs();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(ControllerClient.getStore(allControllersURLs, veniceClusterWrapper.getClusterName(), this.storeName).getStore().getCurrentVersion(), i2, "The new version is not activated yet!");
            for (int i5 = 0; i5 < i; i5++) {
                String str3 = str + i5;
                String str4 = null;
                try {
                    str4 = this.client.get(str3).get().toString();
                } catch (Exception e) {
                    LOGGER.error("Caught exception while trying to get data from the store", e);
                    Assert.fail("Caught exception while trying to get data from the store: " + e.getMessage());
                }
                Assert.assertNotNull(str4, "Key '" + str3 + "' is not in the store yet.");
                Assert.assertEquals(str4, str2 + i5, "Key '" + str3 + "' does not have the right value.");
            }
        });
    }
}
