package com.linkedin.venice;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelperCommon;
import com.linkedin.avroutil1.compatibility.AvroVersion;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;

/* loaded from: input_file:com/linkedin/venice/VeniceClusterInitializer.class */
public class VeniceClusterInitializer implements Closeable {
    private static final Logger LOGGER = LogManager.getLogger(VeniceClusterInitializer.class);
    public static final String VALUE_SCHEMA_STR = "{  \"namespace\": \"example.compute\",      \"type\": \"record\",          \"name\": \"MemberFeature\",         \"fields\": [                 { \"name\": \"id\", \"type\": \"string\", \"default\": \"default_id\"},                      { \"name\": \"name\", \"type\": \"string\", \"default\": \"default_name\"},                    { \"name\": \"boolean_field\", \"type\": \"boolean\", \"default\": false},                    { \"name\": \"int_field\", \"type\": \"int\", \"default\": 0},                    { \"name\": \"float_field\", \"type\": \"float\", \"default\": 0},                    { \"name\": \"namemap\", \"type\":  {\"type\" : \"map\", \"values\" : \"int\" }},                    { \"name\": \"member_feature\", \"type\": { \"type\": \"array\", \"items\": \"float\" }, \"default\": []},         { \"name\": \"ZookeeperAddress\", \"type\": \"string\"}  ]        }       ";
    public static final String KEY_SCHEMA_STR = "\"string\"";
    public static final String KEY_PREFIX = "key_";
    public static final String ID_FIELD_PREFIX = "id_";
    public static final String ZK_ADDRESS_FIELD = "ZookeeperAddress";
    public static final int ENTRY_COUNT = 1000;
    private final VeniceClusterWrapper veniceCluster;
    private final AvroGenericStoreClient<String, Object> regularStoreClient;
    private final int valueSchemaId;
    private final String pushVersionTopic;
    private final int pushVersion;
    private final ControllerClient controllerClient;
    private final String storeName;
    private final VeniceKafkaSerializer keySerializer;
    private final VeniceKafkaSerializer valueSerializer;

    public VeniceClusterInitializer(String str, int i) {
        Properties properties = new Properties();
        properties.put("server.promotion.to.leader.replica.delay.seconds", 1L);
        this.veniceCluster = ServiceFactory.getVeniceCluster(1, 1, 1, 2, 100, false, false, properties);
        Properties properties2 = new Properties();
        properties2.put("server.compute.fast.avro.enabled", true);
        this.veniceCluster.addVeniceServer(new Properties(), properties2);
        Properties properties3 = new Properties();
        properties3.put("router.long.tail.retry.for.single.get.threshold.ms", 1);
        properties3.put("router.long.tail.retry.for.batch.get.threshold.ms", "1-:1");
        properties3.put("router.smart.long.tail.retry.enabled", false);
        properties3.put("listener.port", Integer.toString(i));
        this.veniceCluster.addVeniceRouter(properties3);
        String str2 = "http://" + ((VeniceRouterWrapper) this.veniceCluster.getVeniceRouters().get(0)).getAddress();
        LOGGER.info("Router address: {}", str2);
        this.storeName = str;
        this.controllerClient = this.veniceCluster.getControllerClient();
        TestUtils.assertCommand(this.controllerClient.createNewStore(str, "test_owner", KEY_SCHEMA_STR, VALUE_SCHEMA_STR));
        this.veniceCluster.createMetaSystemStore(str);
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setReadComputationEnabled(true);
        TestUtils.assertCommand(this.controllerClient.updateStore(str, updateStoreQueryParams));
        VersionCreationResponse assertCommand = TestUtils.assertCommand(this.controllerClient.requestTopicForWrites(str, 10240000L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, false, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
        this.pushVersion = assertCommand.getVersion();
        this.pushVersionTopic = assertCommand.getKafkaTopic();
        this.valueSchemaId = 1;
        this.keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA_STR);
        this.valueSerializer = new VeniceAvroKafkaSerializer(VALUE_SCHEMA_STR);
        this.regularStoreClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(str).setVeniceURL(str2));
        try {
            pushSyntheticData();
            LOGGER.info("Finished pushing the synthetic data");
            try {
                quickVerification();
                LOGGER.info("Finished quick verification");
            } catch (Exception e) {
                throw new VeniceException("Failed to validate the push", e);
            }
        } catch (Exception e2) {
            throw new VeniceException("Failed to push synthetic data", e2);
        }
    }

    private void quickVerification() throws ExecutionException, InterruptedException {
        Object obj = this.regularStoreClient.get("key_0").get();
        if (obj == null) {
            throw new VeniceException("Failed to retrieve value for key: key_0");
        }
        if (!(obj instanceof GenericRecord)) {
            throw new VeniceException("The returned value should be a GenericRecord");
        }
        String obj2 = ((GenericRecord) obj).get("id").toString();
        if (!obj2.equals("id_0")) {
            throw new VeniceException("Expected the value for 'id' field: id_0, but got: " + obj2);
        }
    }

    private void pushSyntheticData() throws ExecutionException, InterruptedException {
        Schema parse = Schema.parse(VALUE_SCHEMA_STR);
        ArrayList arrayList = new ArrayList(ENTRY_COUNT);
        for (int i = 0; i < 1000; i++) {
            GenericData.Record record = new GenericData.Record(parse);
            record.put("id", ID_FIELD_PREFIX + i);
            record.put("name", "name_" + i);
            record.put("namemap", Collections.emptyMap());
            record.put("boolean_field", true);
            record.put("int_field", 10);
            record.put("float_field", Float.valueOf(10.0f));
            record.put(ZK_ADDRESS_FIELD, this.veniceCluster.getZk().getAddress());
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(Float.valueOf(i + 1));
            arrayList2.add(Float.valueOf((i + 1) * 10));
            record.put("member_feature", arrayList2);
            arrayList.add(i, this.valueSerializer.serialize(this.pushVersionTopic, record));
        }
        VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(this.veniceCluster.getKafka().getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(this.pushVersionTopic).setKeySerializer(this.keySerializer).build());
        try {
            createVeniceWriter.broadcastStartOfPush(Collections.emptyMap());
            Future[] futureArr = new Future[ENTRY_COUNT];
            for (int i2 = 0; i2 < 1000; i2++) {
                futureArr[i2] = createVeniceWriter.put(KEY_PREFIX + i2, (byte[]) arrayList.get(i2), this.valueSchemaId);
            }
            for (int i3 = 0; i3 < 1000; i3++) {
                futureArr[i3].get();
            }
            createVeniceWriter.broadcastEndOfPush(new HashMap());
            if (createVeniceWriter != null) {
                createVeniceWriter.close();
            }
            TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                if (this.controllerClient.queryJobStatus(this.pushVersionTopic).getStatus().equals(ExecutionStatus.ERROR.name())) {
                    throw new VeniceException("Push failed.");
                }
                int currentVersion = this.controllerClient.getStore(this.storeName).getStore().getCurrentVersion();
                if (currentVersion == this.pushVersion) {
                    this.veniceCluster.refreshAllRouterMetaData();
                }
                Assert.assertEquals(currentVersion, this.pushVersion, "New version not online yet.");
            });
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.regularStoreClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
    }

    public static void main(String[] strArr) {
        LOGGER.info("Avro version in VeniceClusterInitializer: {}", AvroCompatibilityHelperCommon.getRuntimeAvroVersion());
        Assert.assertEquals(AvroCompatibilityHelperCommon.getRuntimeAvroVersion(), AvroVersion.AVRO_1_9);
        Assert.assertEquals(strArr.length, 2, "Store name and router port arguments are expected");
        VeniceClusterInitializer veniceClusterInitializer = new VeniceClusterInitializer(strArr[0], Integer.parseInt(strArr[1]));
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(veniceClusterInitializer);
        runtime.addShutdownHook(new Thread(veniceClusterInitializer::close));
    }
}
