package com.linkedin.venice.endToEnd;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.router.api.VenicePathParser;
import com.linkedin.venice.routerapi.ReplicaState;
import com.linkedin.venice.routerapi.ResourceStateResponse;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.IOUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:com/linkedin/venice/endToEnd/TestHelixCustomizedView.class */
public class TestHelixCustomizedView {
    private VeniceClusterWrapper veniceCluster;
    private HelixAdmin admin;
    private ControllerClient controllerClient;
    private String storeVersionName;
    private int valueSchemaId;
    private String storeName;
    private final int replicationFactor = 2;
    private VeniceKafkaSerializer keySerializer;
    private VeniceKafkaSerializer valueSerializer;
    private VeniceWriter<Object, Object, Object> veniceWriter;
    private static final String KEY_SCHEMA_STR = "\"string\"";
    private static final String VALUE_FIELD_NAME = "int_field";
    private static final Logger LOGGER = LogManager.getLogger(TestHelixCustomizedView.class);
    private static final String VALUE_SCHEMA_STR = "{\n\"type\": \"record\",\n\"name\": \"test_value_schema\",\n\"fields\": [\n  {\"name\": \"int_field\", \"type\": \"int\"}]\n}";
    private static final Schema VALUE_SCHEMA = new Schema.Parser().parse(VALUE_SCHEMA_STR);

    @BeforeClass(alwaysRun = true)
    public void setUp() throws VeniceClientException {
        Utils.thisIsLocalhost();
        this.veniceCluster = ServiceFactory.getVeniceCluster(1, 0, 0, 2, 10, true, false);
        this.admin = new ZKHelixAdmin(this.veniceCluster.getZk().getAddress());
        this.veniceCluster.addVeniceRouter(new Properties());
        Properties properties = new Properties();
        this.veniceCluster.addVeniceServer(new Properties(), properties);
        this.veniceCluster.addVeniceServer(new Properties(), properties);
        this.storeVersionName = this.veniceCluster.getNewStoreVersion(KEY_SCHEMA_STR, VALUE_SCHEMA_STR).getKafkaTopic();
        this.storeName = Version.parseStoreFromKafkaTopicName(this.storeVersionName);
        this.valueSchemaId = 1;
        this.controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getAllControllersURLs());
        this.keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STR);
        this.valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR);
        this.veniceWriter = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(this.storeVersionName).setKeySerializer(this.keySerializer).setValueSerializer(this.valueSerializer).build());
        VeniceOfflinePushMonitorAccessor veniceOfflinePushMonitorAccessor = new VeniceOfflinePushMonitorAccessor(this.veniceCluster.getClusterName(), ZkClientFactory.newZkClient(this.veniceCluster.getZk().getAddress()), new HelixAdapterSerializer(), 3, 1000L);
        HelixUtils.create(veniceOfflinePushMonitorAccessor.getOfflinePushStatusAccessor(), veniceOfflinePushMonitorAccessor.getOfflinePushStatuesParentPath() + "/invalid_topic", (Object) null);
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceWriter});
        if (this.admin != null) {
            this.admin.close();
        }
    }

    @Test(timeOut = 60000)
    public void testUpdatingCustomizedState() throws Exception {
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(this.storeVersionName);
        Assert.assertEquals(getResourceStateFromRouter().getName(), this.storeVersionName);
        for (int i = 0; i < 10; i++) {
            GenericData.Record record = new GenericData.Record(VALUE_SCHEMA);
            record.put(VALUE_FIELD_NAME, Integer.valueOf(i));
            this.veniceWriter.put("key_" + i, record, this.valueSchemaId).get();
        }
        this.veniceWriter.broadcastEndOfPush(new HashMap());
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return this.controllerClient.getStore(this.storeName).getStore().getCurrentVersion() == parseVersionFromKafkaTopicName;
        });
        LOGGER.info("The current version is {}", Integer.valueOf(this.controllerClient.getStore(this.storeName).getStore().getCurrentVersion()));
        ResourceStateResponse resourceStateFromRouter = getResourceStateFromRouter();
        Assert.assertEquals(resourceStateFromRouter.getName(), this.storeVersionName);
        for (ReplicaState replicaState : resourceStateFromRouter.getReplicaStates()) {
            Assert.assertTrue(replicaState.isReadyToServe());
            Assert.assertEquals(ExecutionStatus.COMPLETED.name(), replicaState.getVenicePushStatus());
        }
    }

    private ResourceStateResponse getResourceStateFromRouter() throws IOException, ExecutionException, InterruptedException {
        String randomRouterURL = this.veniceCluster.getRandomRouterURL();
        CloseableHttpAsyncClient build = HttpAsyncClients.custom().setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(1000).build()).build();
        try {
            build.start();
            HttpResponse httpResponse = (HttpResponse) build.execute(new HttpGet(randomRouterURL + "/" + VenicePathParser.TYPE_RESOURCE_STATE + "/" + this.storeVersionName), (FutureCallback) null).get();
            InputStream content = httpResponse.getEntity().getContent();
            try {
                String iOUtils = IOUtils.toString(content);
                if (content != null) {
                    content.close();
                }
                Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200, "Failed to get resource state for " + this.storeVersionName + ". Response: " + iOUtils);
                ResourceStateResponse resourceStateResponse = (ResourceStateResponse) ObjectMapperFactory.getInstance().readValue(iOUtils.getBytes(), ResourceStateResponse.class);
                if (build != null) {
                    build.close();
                }
                return resourceStateResponse;
            } finally {
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
