package com.linkedin.venice.samza;

import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import com.linkedin.venice.writer.update.UpdateBuilderImpl;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/samza/VeniceSystemProducerTest.class */
public class VeniceSystemProducerTest {
    @Test
    public void testPartialUpdateConversion() {
        VeniceSystemProducer veniceSystemProducer = new VeniceSystemProducer("zookeeper.com:2181", "zookeeper.com:2181", "ChildController", "test_store", Version.PushType.BATCH, "push-job-id-1", "dc-0", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty(), SystemTime.INSTANCE);
        MultiSchemaResponse.Schema schema = new MultiSchemaResponse.Schema();
        schema.setSchemaStr("{\"type\":\"record\",\"name\":\"nameRecord\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastName\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"age\",\"type\":\"int\",\"default\":-1}]}");
        schema.setId(1);
        schema.setDerivedSchemaId(-1);
        MultiSchemaResponse.Schema schema2 = new MultiSchemaResponse.Schema();
        schema2.setSchemaStr("{\"type\":\"record\",\"name\":\"nameRecordWriteOpRecord\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"firstName\",\"type\":[{\"type\":\"record\",\"name\":\"NoOp\",\"fields\":[]},\"string\"],\"default\":{}},{\"name\":\"lastName\",\"type\":[\"NoOp\",\"string\"],\"default\":{}},{\"name\":\"age\",\"type\":[\"NoOp\",\"int\"],\"default\":{}}]}");
        schema2.setId(1);
        schema2.setDerivedSchemaId(1);
        char[] cArr = new char[5];
        Arrays.fill(cArr, 'f');
        String str = new String(cArr);
        Arrays.fill(cArr, 'l');
        String str2 = new String(cArr);
        UpdateBuilderImpl updateBuilderImpl = new UpdateBuilderImpl(Schema.parse(schema2.getSchemaStr()));
        updateBuilderImpl.setNewFieldValue("firstName", str);
        updateBuilderImpl.setNewFieldValue("lastName", str2);
        GenericRecord build = updateBuilderImpl.build();
        D2ControllerClient d2ControllerClient = (D2ControllerClient) Mockito.mock(D2ControllerClient.class);
        MultiSchemaResponse multiSchemaResponse = new MultiSchemaResponse();
        multiSchemaResponse.setSchemas(new MultiSchemaResponse.Schema[0]);
        Mockito.when(d2ControllerClient.getAllValueAndDerivedSchema(Mockito.anyString())).thenReturn(multiSchemaResponse);
        veniceSystemProducer.setControllerClient(d2ControllerClient);
        Assert.assertThrows(() -> {
            veniceSystemProducer.convertPartialUpdateToFullPut(new Pair(1, 1), build);
        });
        MultiSchemaResponse multiSchemaResponse2 = new MultiSchemaResponse();
        multiSchemaResponse2.setSchemas(new MultiSchemaResponse.Schema[]{schema, schema2});
        D2ControllerClient d2ControllerClient2 = (D2ControllerClient) Mockito.mock(D2ControllerClient.class);
        Mockito.when(d2ControllerClient2.getAllValueAndDerivedSchema(Mockito.anyString())).thenReturn(multiSchemaResponse2);
        veniceSystemProducer.setControllerClient(d2ControllerClient2);
        GenericRecord genericRecord = (GenericRecord) veniceSystemProducer.convertPartialUpdateToFullPut(new Pair(1, 1), build);
        Assert.assertNotNull(genericRecord);
        Assert.assertEquals(genericRecord.getSchema().toString(), schema.getSchemaStr());
        Assert.assertEquals(genericRecord.get("firstName"), build.get("firstName"));
        Assert.assertEquals(genericRecord.get("lastName"), build.get("lastName"));
        Assert.assertEquals(genericRecord.get("age"), -1);
        OutgoingMessageEnvelope outgoingMessageEnvelope = new OutgoingMessageEnvelope(new SystemStream("venice", "storeName"), "key1", build);
        Assert.assertThrows(() -> {
            veniceSystemProducer.send("venice", outgoingMessageEnvelope);
        });
    }

    @Test(dataProvider = "BatchOrStreamReprocessing")
    public void testGetVeniceWriter(Version.PushType pushType) {
        VeniceSystemProducer veniceSystemProducer = (VeniceSystemProducer) Mockito.spy(new VeniceSystemProducer("zookeeper.com:2181", "zookeeper.com:2181", "ChildController", "test_store", pushType, "push-job-id-1", "dc-0", true, (VeniceSystemFactory) null, Optional.empty(), Optional.empty(), SystemTime.INSTANCE));
        VeniceWriter veniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Properties.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(VeniceWriterOptions.class);
        ((VeniceSystemProducer) Mockito.doReturn(veniceWriter).when(veniceSystemProducer)).constructVeniceWriter((Properties) forClass.capture(), (VeniceWriterOptions) forClass2.capture());
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setKafkaBootstrapServers("venice-kafka.db:2023");
        versionCreationResponse.setPartitions(2);
        versionCreationResponse.setKafkaTopic("test_store_v1");
        VeniceWriter veniceWriter2 = veniceSystemProducer.getVeniceWriter(versionCreationResponse);
        Properties properties = (Properties) forClass.getValue();
        VeniceWriterOptions veniceWriterOptions = (VeniceWriterOptions) forClass2.getValue();
        Assert.assertNotNull(veniceWriter2);
        Assert.assertEquals(veniceWriter2, veniceWriter);
        Assert.assertEquals(properties.getProperty("kafka.bootstrap.servers"), "venice-kafka.db:2023");
        Assert.assertEquals(veniceWriterOptions.getTopicName(), "test_store_v1");
        if (pushType != Version.PushType.BATCH && pushType != Version.PushType.STREAM_REPROCESSING) {
            Assert.assertNull(veniceWriterOptions.getPartitionCount());
        } else {
            Assert.assertNotNull(veniceWriterOptions.getPartitionCount());
            Assert.assertEquals(veniceWriterOptions.getPartitionCount().intValue(), 2);
        }
    }

    @DataProvider(name = "BatchOrStreamReprocessing")
    public Version.PushType[] batchOrStreamReprocessing() {
        return new Version.PushType[]{Version.PushType.BATCH, Version.PushType.STREAM_REPROCESSING, Version.PushType.STREAM, Version.PushType.INCREMENTAL};
    }
}
