package com.linkedin.venice.samza;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemProducer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/samza/VeniceSystemFactoryTest.class */
public class VeniceSystemFactoryTest {
    private static final int TEST_TIMEOUT = 15000;
    private VeniceClusterWrapper cluster;

    @BeforeClass
    public void setUp() {
        this.cluster = ServiceFactory.getVeniceCluster();
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
    }

    @Test
    public void testGetProducer() throws Exception {
        String str = "\"string\"";
        String str2 = "{\n  \"type\" : \"record\",\n  \"name\" : \"testRecord\",\n  \"fields\" : [ {\n    \"name\" : \"number\",\n    \"type\" : [ \"double\", \"null\" ],\n    \"default\" : 100.0\n  }, {\n    \"name\" : \"string\",\n    \"type\" : [ \"string\", \"null\" ],\n    \"default\" : \"100\"\n  }, {\n    \"name\" : \"intArray\",\n    \"type\" : {\n      \"type\" : \"array\",\n      \"items\" : \"int\"\n    },\n    \"default\" :  [ ]\n  } ]\n}";
        String uniqueString = Utils.getUniqueString("store");
        Schema convertFromValueRecordSchemaStr = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchemaStr("{\n  \"type\" : \"record\",\n  \"name\" : \"testRecord\",\n  \"fields\" : [ {\n    \"name\" : \"number\",\n    \"type\" : [ \"double\", \"null\" ],\n    \"default\" : 100.0\n  }, {\n    \"name\" : \"string\",\n    \"type\" : [ \"string\", \"null\" ],\n    \"default\" : \"100\"\n  }, {\n    \"name\" : \"intArray\",\n    \"type\" : {\n      \"type\" : \"array\",\n      \"items\" : \"int\"\n    },\n    \"default\" :  [ ]\n  } ]\n}");
        this.cluster.useControllerClient(controllerClient -> {
            TestUtils.assertCommand(controllerClient.createNewStore(uniqueString, "owner", str, str2));
            TestUtils.assertCommand(controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridRewindSeconds(10L).setHybridOffsetLagThreshold(10L)));
            TestUtils.assertCommand(controllerClient.addDerivedSchema(uniqueString, 1, convertFromValueRecordSchemaStr.toString()));
        });
        String str3 = "keystring";
        Schema schema = Schema.parse("{\n  \"type\" : \"record\",\n  \"name\" : \"testRecord\",\n  \"fields\" : [ {\n    \"name\" : \"number\",\n    \"type\" : [ \"double\", \"null\" ],\n    \"default\" : 100.0\n  }, {\n    \"name\" : \"string\",\n    \"type\" : [ \"string\", \"null\" ],\n    \"default\" : \"100\"\n  }, {\n    \"name\" : \"intArray\",\n    \"type\" : {\n      \"type\" : \"array\",\n      \"items\" : \"int\"\n    },\n    \"default\" :  [ ]\n  } ]\n}").getField("intArray").schema();
        GenericData.Record record = new GenericData.Record(Schema.parse("{\n  \"type\" : \"record\",\n  \"name\" : \"testRecord\",\n  \"fields\" : [ {\n    \"name\" : \"number\",\n    \"type\" : [ \"double\", \"null\" ],\n    \"default\" : 100.0\n  }, {\n    \"name\" : \"string\",\n    \"type\" : [ \"string\", \"null\" ],\n    \"default\" : \"100\"\n  }, {\n    \"name\" : \"intArray\",\n    \"type\" : {\n      \"type\" : \"array\",\n      \"items\" : \"int\"\n    },\n    \"default\" :  [ ]\n  } ]\n}"));
        record.put("string", (Object) null);
        record.put("number", Double.valueOf(0.0d));
        record.put("intArray", Collections.emptyList());
        this.cluster.createVersion(uniqueString, "\"string\"", "{\n  \"type\" : \"record\",\n  \"name\" : \"testRecord\",\n  \"fields\" : [ {\n    \"name\" : \"number\",\n    \"type\" : [ \"double\", \"null\" ],\n    \"default\" : 100.0\n  }, {\n    \"name\" : \"string\",\n    \"type\" : [ \"string\", \"null\" ],\n    \"default\" : \"100\"\n  }, {\n    \"name\" : \"intArray\",\n    \"type\" : {\n      \"type\" : \"array\",\n      \"items\" : \"int\"\n    },\n    \"default\" :  [ ]\n  } ]\n}", Stream.of(new AbstractMap.SimpleEntry("keystring", record)));
        SystemProducer systemProducer = null;
        try {
            AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.cluster.getRandomRouterURL()));
            try {
                systemProducer = IntegrationTestPushUtils.getSamzaProducer(this.cluster, uniqueString, Version.PushType.STREAM, new Pair[0]);
                GenericData.Record record2 = new GenericData.Record(Schema.parse("{\n  \"type\" : \"record\",\n  \"name\" : \"testRecord\",\n  \"fields\" : [ {\n    \"name\" : \"number\",\n    \"type\" : [ \"double\", \"null\" ],\n    \"default\" : 100.0\n  }, {\n    \"name\" : \"string\",\n    \"type\" : [ \"string\", \"null\" ],\n    \"default\" : \"100\"\n  }, {\n    \"name\" : \"intArray\",\n    \"type\" : {\n      \"type\" : \"array\",\n      \"items\" : \"int\"\n    },\n    \"default\" :  [ ]\n  } ]\n}"));
                record2.put("string", "somestring");
                record2.put("number", Double.valueOf(3.74d));
                record2.put("intArray", new GenericData.Array(schema, Collections.singletonList(1)));
                IntegrationTestPushUtils.sendStreamingRecord(systemProducer, uniqueString, "keystring", record2);
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, true, () -> {
                    try {
                        GenericRecord genericRecord = (GenericRecord) andStartGenericAvroClient.get(str3).get();
                        Assert.assertNotNull(genericRecord, "Value for key: '" + str3 + "' should not be null. This means not even the batch data made it in!");
                        Object obj = genericRecord.get("string");
                        Assert.assertNotNull(obj, "'string' field should not be null. This means the RT data did not make it in.");
                        Assert.assertEquals(obj.toString(), "somestring");
                        Object obj2 = genericRecord.get("number");
                        Assert.assertNotNull(obj2, "'number' field should not be null");
                        Assert.assertEquals(obj2, Double.valueOf(3.74d));
                        Assert.assertEquals(genericRecord.get("intArray"), new GenericData.Array(schema, Collections.singletonList(1)));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                IntegrationTestPushUtils.sendStreamingRecord(systemProducer, uniqueString, "keystring", null);
                TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.SECONDS, () -> {
                    try {
                        Assert.assertNull((GenericRecord) andStartGenericAvroClient.get(str3).get());
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                if (andStartGenericAvroClient != null) {
                    andStartGenericAvroClient.close();
                }
                if (systemProducer != null) {
                    systemProducer.stop();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (systemProducer != null) {
                systemProducer.stop();
            }
            throw th;
        }
    }

    @Test(timeOut = 15000)
    public void testSchemaMismatchError() throws Exception {
        String createStore = this.cluster.createStore(0);
        SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(this.cluster, createStore, Version.PushType.BATCH, new Pair[0]);
        try {
            Assert.assertThrows(SamzaException.class, () -> {
                IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, createStore, new byte[]{49, 50, 51}, 0);
            });
        } finally {
            samzaProducer.stop();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "testSerializationParams")
    public static Object[][] testSerializationParams() {
        return new Object[]{new Object[]{5, 10, "\"int\""}, new Object[]{new Utf8("one"), new Utf8("two"), "\"string\""}, new Object[]{6L, 8L, "\"long\""}, new Object[]{Double.valueOf(9.12d), Double.valueOf(12.45d), "\"double\""}, new Object[]{Float.valueOf(1.6f), Float.valueOf(7.4f), "\"float\""}, new Object[]{ByteBuffer.wrap(new byte[]{1, 2, 3}), ByteBuffer.wrap(new byte[]{11, 12, 13}), "\"bytes\""}, new Object[]{true, false, "\"boolean\""}};
    }

    @Test(timeOut = 15000, dataProvider = "testSerializationParams")
    public void testSerialization(Object obj, Object obj2, String str) throws Exception {
        testSerializationCast(obj, obj, obj2, obj2, str);
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "testSerializationCastParams")
    public static Object[][] testSerializationCastParams() {
        GenericData.Record record = new GenericData.Record(Schema.parse("{\n  \"type\": \"record\",\n  \"name\": \"SomeRecord\",\n  \"fields\": [\n     {\"name\": \"int_field\", \"type\": \"int\", \"java\": \"abc\"}\n   ]\n}"));
        record.put("int_field", 100);
        GenericData.Record record2 = new GenericData.Record(Schema.parse("{\n  \"type\": \"record\",\n  \"name\": \"SomeRecord\",\n  \"fields\": [\n     {\"name\": \"int_field\", \"type\": \"int\"}\n   ]\n}"));
        record2.put("int_field", 200);
        return new Object[]{new Object[]{new byte[]{3, 4, 5}, ByteBuffer.wrap(new byte[]{3, 4, 5}), new byte[]{13, 14, 15}, ByteBuffer.wrap(new byte[]{13, 14, 15}), "\"bytes\""}, new Object[]{"three", "three", "four", new Utf8("four"), "\"string\""}, new Object[]{record, record, record2, record2, "{\n  \"type\": \"record\",\n  \"name\": \"SomeRecord\",\n  \"fields\": [\n     {\"name\": \"int_field\", \"type\": \"int\"}\n   ]\n}"}};
    }

    @Test(timeOut = 15000, dataProvider = "testSerializationCastParams")
    public void testSerializationCast(Object obj, Object obj2, Object obj3, Object obj4, String str) throws Exception {
        String uniqueString = Utils.getUniqueString("schema-test-store");
        this.cluster.useControllerClient(controllerClient -> {
            controllerClient.createNewStore(uniqueString, "owner", str, str);
            SystemProducer samzaProducer = IntegrationTestPushUtils.getSamzaProducer(this.cluster, uniqueString, Version.PushType.BATCH, new Pair[0]);
            try {
                IntegrationTestPushUtils.sendStreamingRecord(samzaProducer, uniqueString, obj, obj3);
                samzaProducer.stop();
                controllerClient.writeEndOfPush(uniqueString, 1);
                this.cluster.waitVersion(uniqueString, 1, controllerClient);
            } catch (Throwable th) {
                samzaProducer.stop();
                throw th;
            }
        });
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.cluster.getRandomRouterURL()));
        try {
            Object obj5 = andStartGenericAvroClient.get(obj2).get();
            Assert.assertEquals(obj5, obj4, "Expected [" + obj4 + "] of type " + obj4.getClass() + " but found [" + obj5 + "] of type " + obj5.getClass());
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
        } catch (Throwable th) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeOut = 15000)
    public void testGetProducerRunningFabric() throws Exception {
        VeniceSystemFactory veniceSystemFactory = new VeniceSystemFactory();
        Map<String, String> samzaProducerConfig = IntegrationTestPushUtils.getSamzaProducerConfig(this.cluster, "test-store-sr", Version.PushType.STREAM_REPROCESSING);
        VeniceSystemProducer producer = veniceSystemFactory.getProducer("venice", new MapConfig(samzaProducerConfig), (MetricsRegistry) null);
        if (producer instanceof VeniceSystemProducer) {
            Assert.assertEquals(producer.getRunningFabric(), (String) null);
        }
        System.setProperty("com.linkedin.app.env", "dc-1");
        VeniceSystemProducer producer2 = veniceSystemFactory.getProducer("venice", new MapConfig(samzaProducerConfig), (MetricsRegistry) null);
        if (producer2 instanceof VeniceSystemProducer) {
            Assert.assertEquals(producer2.getRunningFabric(), "dc-1");
        }
        samzaProducerConfig.put("com.linkedin.app.env", "dc-0");
        VeniceSystemProducer producer3 = veniceSystemFactory.getProducer("venice", new MapConfig(samzaProducerConfig), (MetricsRegistry) null);
        if (producer3 instanceof VeniceSystemProducer) {
            Assert.assertEquals(producer3.getRunningFabric(), "dc-0");
        }
        samzaProducerConfig.put("com.linkedin.app.env", "dc-parent");
        VeniceSystemProducer producer4 = veniceSystemFactory.getProducer("venice", new MapConfig(samzaProducerConfig), (MetricsRegistry) null);
        if (producer4 instanceof VeniceSystemProducer) {
            Assert.assertEquals(producer4.getRunningFabric(), "dc-parent");
        }
    }
}
