package com.linkedin.venice.producer.online;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.RandomRecordGenerator;
import com.linkedin.avroutil1.compatibility.RecordGenerationConfig;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.StoreJSONSerializer;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PartitionerConfigImpl;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadStrategy;
import com.linkedin.venice.meta.RoutingStrategy;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import com.linkedin.venice.writer.update.UpdateBuilder;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/producer/online/OnlineVeniceProducerTest.class */
public class OnlineVeniceProducerTest {
    private static final String storeName = "test_store";
    private static final String clusterName = "test-cluster";
    private static final String FIELD_NUMBER = "favorite_number";
    private static final String FIELD_COLOR = "favorite_color";
    private static final String FIELD_COMPANY = "favorite_company";
    private static final String TOTAL_OPERATION_METRIC_NAME = ".test_store--write_operation.OccurrenceRate";
    private static final String PUT_OPERATION_METRIC_NAME = ".test_store--put_operation.OccurrenceRate";
    private static final String DELETE_OPERATION_METRIC_NAME = ".test_store--delete_operation.OccurrenceRate";
    private static final String UPDATE_OPERATION_METRIC_NAME = ".test_store--update_operation.OccurrenceRate";
    private static final String SUCCESS_OPERATION_METRIC_NAME = ".test_store--success_write_operation.OccurrenceRate";
    private static final String FAILED_OPERATION_METRIC_NAME = ".test_store--failed_write_operation.OccurrenceRate";
    private static final String MIN_PENDING_OPERATION_METRIC_NAME = ".test_store--pending_write_operation.Min";
    private static final String MAX_PENDING_OPERATION_METRIC_NAME = ".test_store--pending_write_operation.Max";
    private static final ObjectMapper MAPPER = ObjectMapperFactory.getInstance();
    private static final StoreJSONSerializer STORE_SERIALIZER = new StoreJSONSerializer();
    private static final Schema KEY_SCHEMA = AvroCompatibilityHelper.parse(new String[]{"\"string\""});
    private static final Schema VALUE_SCHEMA_1 = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsStringQuietlyWithErrorLogged("RecordValueSchema1.avsc")});
    private static final Schema VALUE_SCHEMA_2 = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsStringQuietlyWithErrorLogged("RecordValueSchema2.avsc")});
    private static final Schema VALUE_SCHEMA_3 = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsStringQuietlyWithErrorLogged("RecordValueSchema3.avsc")});
    private static final Schema VALUE_SCHEMA_4 = AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsStringQuietlyWithErrorLogged("RecordValueSchema4.avsc")});
    private static final GenericRecord mockValue1 = getMockValue(VALUE_SCHEMA_1);
    private static final GenericRecord mockValue2 = getMockValue(VALUE_SCHEMA_2);
    private static final Schema UPDATE_SCHEMA_1 = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(VALUE_SCHEMA_1);
    private static final Schema UPDATE_SCHEMA_2 = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(VALUE_SCHEMA_2);
    private static final Schema UPDATE_SCHEMA_3 = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(VALUE_SCHEMA_3);
    private static final Schema UPDATE_SCHEMA_4 = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(VALUE_SCHEMA_4);
    private static final RecordSerializer<Object> keySerializer = getSerializer(KEY_SCHEMA);
    private static final RecordSerializer<Object> value1Serializer = getSerializer(VALUE_SCHEMA_1);
    private static final RecordSerializer<Object> value2Serializer = getSerializer(VALUE_SCHEMA_2);
    private static final RecordSerializer<Object> update2Serializer = getSerializer(UPDATE_SCHEMA_2);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/producer/online/OnlineVeniceProducerTest$TestOnlineVeniceProducer.class */
    public static class TestOnlineVeniceProducer<K, V> extends OnlineVeniceProducer<K, V> {
        private VeniceWriter<byte[], byte[], byte[]> mockVeniceWriter;
        private boolean failPubSubWrites;

        public TestOnlineVeniceProducer(AbstractAvroStoreClient abstractAvroStoreClient, SchemaReader schemaReader, VeniceProperties veniceProperties, MetricsRepository metricsRepository) {
            this(abstractAvroStoreClient, schemaReader, veniceProperties, metricsRepository, false);
        }

        public TestOnlineVeniceProducer(AbstractAvroStoreClient abstractAvroStoreClient, SchemaReader schemaReader, VeniceProperties veniceProperties, MetricsRepository metricsRepository, boolean z) {
            super(abstractAvroStoreClient, schemaReader, veniceProperties, metricsRepository, (ICProvider) null);
            this.failPubSubWrites = z;
            configureVeniceWriteMock();
        }

        protected VeniceWriter<byte[], byte[], byte[]> constructVeniceWriter(Properties properties, VeniceWriterOptions veniceWriterOptions) {
            if (this.mockVeniceWriter == null) {
                this.mockVeniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
            }
            return this.mockVeniceWriter;
        }

        private void configureVeniceWriteMock() {
            ((VeniceWriter) Mockito.doAnswer(getPubSubProducerCallbackAnswer(this.failPubSubWrites, 3)).when(this.mockVeniceWriter)).put((byte[]) ArgumentMatchers.any(), (byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (PubSubProducerCallback) ArgumentMatchers.any());
            ((VeniceWriter) Mockito.doAnswer(getPubSubProducerCallbackAnswer(this.failPubSubWrites, 4)).when(this.mockVeniceWriter)).put((byte[]) ArgumentMatchers.any(), (byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (PubSubProducerCallback) ArgumentMatchers.any());
            ((VeniceWriter) Mockito.doAnswer(getPubSubProducerCallbackAnswer(this.failPubSubWrites, 1)).when(this.mockVeniceWriter)).delete((byte[]) ArgumentMatchers.any(), (PubSubProducerCallback) ArgumentMatchers.any());
            ((VeniceWriter) Mockito.doAnswer(getPubSubProducerCallbackAnswer(this.failPubSubWrites, 2)).when(this.mockVeniceWriter)).delete((byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (PubSubProducerCallback) ArgumentMatchers.any());
            ((VeniceWriter) Mockito.doAnswer(getPubSubProducerCallbackAnswer(this.failPubSubWrites, 4)).when(this.mockVeniceWriter)).update((byte[]) ArgumentMatchers.any(), (byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (PubSubProducerCallback) ArgumentMatchers.any());
            ((VeniceWriter) Mockito.doAnswer(getPubSubProducerCallbackAnswer(this.failPubSubWrites, 4)).when(this.mockVeniceWriter)).update((byte[]) ArgumentMatchers.any(), (byte[]) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (PubSubProducerCallback) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        }

        private static Answer getPubSubProducerCallbackAnswer(boolean z, int i) {
            return z ? invocationOnMock -> {
                ((PubSubProducerCallback) invocationOnMock.getArguments()[i]).onCompletion((PubSubProduceResult) null, new VeniceException());
                return null;
            } : invocationOnMock2 -> {
                ((PubSubProducerCallback) invocationOnMock2.getArguments()[i]).onCompletion((PubSubProduceResult) null, (Exception) null);
                return null;
            };
        }
    }

    @Test
    public void testConstructor() throws IOException, ExecutionException, InterruptedException {
        new TestOnlineVeniceProducer(getMockStoreClient(), getKmeSchemaReader(), new VeniceProperties(new Properties()), new MetricsRepository()).close();
    }

    @Test
    public void testFailRequestTopic() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        MetricsRepository metricsRepository = new MetricsRepository();
        Properties properties = new Properties();
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setError("ERROR RESPONSE");
        AbstractAvroStoreClient mockStoreClientWithRequestTopicResponse = getMockStoreClientWithRequestTopicResponse(MAPPER.writeValueAsBytes(versionCreationResponse));
        Assert.assertThrows(VeniceException.class, () -> {
            new TestOnlineVeniceProducer(mockStoreClientWithRequestTopicResponse, kmeSchemaReader, new VeniceProperties(properties), metricsRepository);
        });
        AbstractAvroStoreClient mockStoreClientWithRequestTopicResponse2 = getMockStoreClientWithRequestTopicResponse(versionCreationResponse.getError().getBytes(StandardCharsets.UTF_8));
        Assert.assertThrows(VeniceException.class, () -> {
            new TestOnlineVeniceProducer(mockStoreClientWithRequestTopicResponse2, kmeSchemaReader, new VeniceProperties(properties), metricsRepository);
        });
    }

    @Test
    public void testPut() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass4 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            testOnlineVeniceProducer.asyncPut("KEY1", mockValue1).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(1))).put(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ArgumentMatchers.eq(-2L), (PubSubProducerCallback) forClass4.capture());
            Assert.assertEquals(keySerializer.serialize("KEY1"), (byte[]) forClass.getValue());
            Assert.assertEquals(value1Serializer.serialize(mockValue1), (byte[]) forClass2.getValue());
            Assert.assertEquals(1, ((Integer) forClass3.getValue()).intValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.asyncPut("KEY2", mockValue2).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).put(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ArgumentMatchers.eq(-2L), (PubSubProducerCallback) forClass4.capture());
            Assert.assertEquals(keySerializer.serialize("KEY2"), (byte[]) forClass.getValue());
            Assert.assertEquals(value2Serializer.serialize(mockValue2), (byte[]) forClass2.getValue());
            Assert.assertEquals(2, ((Integer) forClass3.getValue()).intValue());
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPutWithLogicalTs() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Long.TYPE);
            ArgumentCaptor forClass5 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            testOnlineVeniceProducer.asyncPut(1000L, "KEY1", mockValue1).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(1))).put(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Long) forClass4.capture()).longValue(), (PubSubProducerCallback) forClass5.capture());
            Assert.assertEquals(keySerializer.serialize("KEY1"), (byte[]) forClass.getValue());
            Assert.assertEquals(value1Serializer.serialize(mockValue1), (byte[]) forClass2.getValue());
            Assert.assertEquals(1, ((Integer) forClass3.getValue()).intValue());
            Assert.assertEquals(1000L, ((Long) forClass4.getValue()).longValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.asyncPut(1002L, "KEY2", mockValue2).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).put(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Long) forClass4.capture()).longValue(), (PubSubProducerCallback) forClass5.capture());
            Assert.assertEquals(keySerializer.serialize("KEY2"), (byte[]) forClass.getValue());
            Assert.assertEquals(value2Serializer.serialize(mockValue2), (byte[]) forClass2.getValue());
            Assert.assertEquals(2, ((Integer) forClass3.getValue()).intValue());
            Assert.assertEquals(1002L, ((Long) forClass4.getValue()).longValue());
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut(-5L, "KEY1", mockValue1).get();
            });
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).put(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Long) forClass4.capture()).longValue(), (PubSubProducerCallback) forClass5.capture());
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPutWithInvalidSchema() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", true).get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", "random_string").get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", 10).get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", 10L).get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", Double.valueOf(1.0d)).get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", Float.valueOf(1.0f)).get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", "bytes".getBytes(StandardCharsets.UTF_8)).get();
            });
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", VALUE_SCHEMA_1).get();
            });
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertTrue(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPutWithFailedWrite() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository, true);
        try {
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncPut("KEY1", mockValue1).get();
            });
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertTrue(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testDelete() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            testOnlineVeniceProducer.asyncDelete("KEY1").get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(1))).delete(forClass.capture(), ArgumentMatchers.eq(-2L), (PubSubProducerCallback) forClass2.capture());
            Assert.assertEquals(keySerializer.serialize("KEY1"), (byte[]) forClass.getValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.asyncDelete("KEY2").get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).delete(forClass.capture(), ArgumentMatchers.eq(-2L), (PubSubProducerCallback) forClass2.capture());
            Assert.assertEquals(keySerializer.serialize("KEY2"), (byte[]) forClass.getValue());
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testDeleteWithLogicalTs() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Long.TYPE);
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            testOnlineVeniceProducer.asyncDelete(1000L, "KEY1").get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(1))).delete(forClass.capture(), ((Long) forClass2.capture()).longValue(), (PubSubProducerCallback) forClass3.capture());
            Assert.assertEquals(keySerializer.serialize("KEY1"), (byte[]) forClass.getValue());
            Assert.assertEquals(1000L, ((Long) forClass2.getValue()).longValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.asyncDelete(1002L, "KEY2").get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).delete(forClass.capture(), ((Long) forClass2.capture()).longValue(), (PubSubProducerCallback) forClass3.capture());
            Assert.assertEquals(keySerializer.serialize("KEY2"), (byte[]) forClass.getValue());
            Assert.assertEquals(1002L, ((Long) forClass2.getValue()).longValue());
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncDelete(-1L, "KEY1").get();
            });
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).delete(forClass.capture(), ((Long) forClass2.capture()).longValue(), (PubSubProducerCallback) forClass3.capture());
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testDeleteWithFailedWrite() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository, true);
        try {
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncDelete("KEY1").get();
            });
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertTrue(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUpdate() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient(true);
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass5 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            testOnlineVeniceProducer.asyncUpdate("KEY1", obj -> {
                UpdateBuilder updateBuilder = (UpdateBuilder) obj;
                updateBuilder.setNewFieldValue(FIELD_NUMBER, 10L);
                updateBuilder.setNewFieldValue(FIELD_COMPANY, "LinkedIn");
            }).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(1))).update(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Integer) forClass4.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), ArgumentMatchers.eq(-2L));
            GenericData.Record record = new GenericData.Record(UPDATE_SCHEMA_2);
            record.put(FIELD_NUMBER, 10L);
            record.put(FIELD_COMPANY, "LinkedIn");
            record.put(FIELD_COLOR, createFieldNoOpRecord(UPDATE_SCHEMA_2, FIELD_COLOR));
            Assert.assertEquals(keySerializer.serialize("KEY1"), (byte[]) forClass.getValue());
            Assert.assertEquals(update2Serializer.serialize(record), (byte[]) forClass2.getValue());
            Assert.assertEquals(2, ((Integer) forClass3.getValue()).intValue());
            Assert.assertEquals(1, ((Integer) forClass4.getValue()).intValue());
            testOnlineVeniceProducer.asyncUpdate("KEY2", obj2 -> {
                ((UpdateBuilder) obj2).setNewFieldValue(FIELD_COLOR, "green");
            }).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).update(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Integer) forClass4.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), ArgumentMatchers.eq(-2L));
            GenericData.Record record2 = new GenericData.Record(UPDATE_SCHEMA_2);
            record2.put(FIELD_NUMBER, createFieldNoOpRecord(UPDATE_SCHEMA_2, FIELD_NUMBER));
            record2.put(FIELD_COMPANY, createFieldNoOpRecord(UPDATE_SCHEMA_2, FIELD_COMPANY));
            record2.put(FIELD_COLOR, "green");
            Assert.assertEquals(keySerializer.serialize("KEY2"), (byte[]) forClass.getValue());
            Assert.assertEquals(update2Serializer.serialize(record2), (byte[]) forClass2.getValue());
            Assert.assertEquals(2, ((Integer) forClass3.getValue()).intValue());
            Assert.assertEquals(1, ((Integer) forClass4.getValue()).intValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncUpdate(-2L, "KEY1", obj3 -> {
                }).get();
            });
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).update(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Integer) forClass4.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), ArgumentMatchers.eq(-2L));
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUpdateWithLogicalTs() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient(true);
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass5 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            ArgumentCaptor forClass6 = ArgumentCaptor.forClass(Long.TYPE);
            testOnlineVeniceProducer.asyncUpdate(1000L, "KEY1", obj -> {
                UpdateBuilder updateBuilder = (UpdateBuilder) obj;
                updateBuilder.setNewFieldValue(FIELD_NUMBER, 10L);
                updateBuilder.setNewFieldValue(FIELD_COMPANY, "LinkedIn");
            }).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(1))).update(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Integer) forClass4.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), ((Long) forClass6.capture()).longValue());
            GenericData.Record record = new GenericData.Record(UPDATE_SCHEMA_2);
            record.put(FIELD_NUMBER, 10L);
            record.put(FIELD_COMPANY, "LinkedIn");
            record.put(FIELD_COLOR, createFieldNoOpRecord(UPDATE_SCHEMA_2, FIELD_COLOR));
            Assert.assertEquals(keySerializer.serialize("KEY1"), (byte[]) forClass.getValue());
            Assert.assertEquals(update2Serializer.serialize(record), (byte[]) forClass2.getValue());
            Assert.assertEquals(2, ((Integer) forClass3.getValue()).intValue());
            Assert.assertEquals(1, ((Integer) forClass4.getValue()).intValue());
            Assert.assertEquals(1000L, ((Long) forClass6.getValue()).longValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.asyncUpdate(1002L, "KEY2", obj2 -> {
                ((UpdateBuilder) obj2).setNewFieldValue(FIELD_COLOR, "green");
            }).get();
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.times(2))).update(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Integer) forClass4.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), ((Long) forClass6.capture()).longValue());
            GenericData.Record record2 = new GenericData.Record(UPDATE_SCHEMA_2);
            record2.put(FIELD_NUMBER, createFieldNoOpRecord(UPDATE_SCHEMA_2, FIELD_NUMBER));
            record2.put(FIELD_COMPANY, createFieldNoOpRecord(UPDATE_SCHEMA_2, FIELD_COMPANY));
            record2.put(FIELD_COLOR, "green");
            Assert.assertEquals(keySerializer.serialize("KEY2"), (byte[]) forClass.getValue());
            Assert.assertEquals(update2Serializer.serialize(record2), (byte[]) forClass2.getValue());
            Assert.assertEquals(2, ((Integer) forClass3.getValue()).intValue());
            Assert.assertEquals(1, ((Integer) forClass4.getValue()).intValue());
            Assert.assertEquals(1002L, ((Long) forClass6.getValue()).longValue());
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUpdateOnUnsupportedStore() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient();
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository);
        try {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(byte[].class);
            ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass4 = ArgumentCaptor.forClass(Integer.TYPE);
            ArgumentCaptor forClass5 = ArgumentCaptor.forClass(PubSubProducerCallback.class);
            ArgumentCaptor forClass6 = ArgumentCaptor.forClass(Long.TYPE);
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncUpdate(1000L, "KEY1", obj -> {
                    UpdateBuilder updateBuilder = (UpdateBuilder) obj;
                    updateBuilder.setNewFieldValue(FIELD_NUMBER, 10L);
                    updateBuilder.setNewFieldValue(FIELD_COMPANY, "LinkedIn");
                }).get();
            });
            ((VeniceWriter) Mockito.verify(testOnlineVeniceProducer.mockVeniceWriter, Mockito.never())).update(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), ((Integer) forClass4.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), ((Long) forClass6.capture()).longValue());
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertTrue(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testUpdateWithFailedWrite() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient(true);
        MetricsRepository metricsRepository = new MetricsRepository();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(new Properties()), metricsRepository, true);
        try {
            assertThrowsExceptionFromFuture(VeniceException.class, () -> {
                testOnlineVeniceProducer.asyncUpdate(1000L, "KEY1", obj -> {
                    UpdateBuilder updateBuilder = (UpdateBuilder) obj;
                    updateBuilder.setNewFieldValue(FIELD_NUMBER, 10L);
                    updateBuilder.setNewFieldValue(FIELD_COMPANY, "LinkedIn");
                }).get();
            });
            Assert.assertTrue(metricsRepository.getMetric(TOTAL_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertTrue(metricsRepository.getMetric(UPDATE_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(SUCCESS_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(PUT_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(DELETE_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertTrue(metricsRepository.getMetric(FAILED_OPERATION_METRIC_NAME).value() > 0.0d);
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MIN_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(0.0d));
            Assert.assertEquals(Double.valueOf(metricsRepository.getMetric(MAX_PENDING_OPERATION_METRIC_NAME).value()), Double.valueOf(1.0d));
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOperationsOnClosedProducer() throws IOException, ExecutionException, InterruptedException {
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(getMockStoreClient(true), kmeSchemaReader, new VeniceProperties(new Properties()), new MetricsRepository());
        testOnlineVeniceProducer.close();
        assertThrowsExceptionFromFuture(VeniceException.class, () -> {
            testOnlineVeniceProducer.asyncPut("KEY1", mockValue1).get();
        });
        assertThrowsExceptionFromFuture(VeniceException.class, () -> {
            testOnlineVeniceProducer.asyncPut(1000L, "KEY1", mockValue1).get();
        });
        assertThrowsExceptionFromFuture(VeniceException.class, () -> {
            testOnlineVeniceProducer.asyncDelete("KEY1").get();
        });
        assertThrowsExceptionFromFuture(VeniceException.class, () -> {
            testOnlineVeniceProducer.asyncDelete(1000L, "KEY1").get();
        });
        assertThrowsExceptionFromFuture(VeniceException.class, () -> {
            testOnlineVeniceProducer.asyncUpdate("KEY1", obj -> {
            }).get();
        });
        assertThrowsExceptionFromFuture(VeniceException.class, () -> {
            testOnlineVeniceProducer.asyncUpdate(1000L, "KEY1", obj -> {
            }).get();
        });
    }

    @Test
    public void testFetchLatestValueAndUpdateSchemas() throws IOException, ExecutionException, InterruptedException {
        AbstractAvroStoreClient mockStoreClient = getMockStoreClient(true);
        SchemaReader kmeSchemaReader = getKmeSchemaReader();
        MetricsRepository metricsRepository = new MetricsRepository();
        Properties properties = new Properties();
        properties.put("client.producer.schema.refresh.interval.seconds", 1);
        TestOnlineVeniceProducer testOnlineVeniceProducer = new TestOnlineVeniceProducer(mockStoreClient, kmeSchemaReader, new VeniceProperties(properties), metricsRepository);
        try {
            testOnlineVeniceProducer.asyncUpdate(1000L, "KEY1", obj -> {
                UpdateBuilder updateBuilder = (UpdateBuilder) obj;
                updateBuilder.setNewFieldValue(FIELD_COLOR, "green");
                Assert.assertEquals(updateBuilder.build().getSchema().toString(), UPDATE_SCHEMA_2.toString());
            }).get();
            configureSchemaResponseMocks(mockStoreClient, Arrays.asList(VALUE_SCHEMA_1, VALUE_SCHEMA_2, VALUE_SCHEMA_3, VALUE_SCHEMA_4), 3, Arrays.asList(UPDATE_SCHEMA_1, UPDATE_SCHEMA_2, UPDATE_SCHEMA_3, UPDATE_SCHEMA_4), true);
            TestUtils.waitForNonDeterministicAssertion(10L, TimeUnit.MINUTES, () -> {
                try {
                    testOnlineVeniceProducer.asyncUpdate(1000L, "KEY1", obj2 -> {
                        UpdateBuilder updateBuilder = (UpdateBuilder) obj2;
                        updateBuilder.setNewFieldValue(FIELD_COLOR, "green");
                        Assert.assertEquals(updateBuilder.build().getSchema().toString(), UPDATE_SCHEMA_3.toString());
                    }).get();
                } catch (ExecutionException e) {
                    Assert.fail();
                }
            });
            testOnlineVeniceProducer.close();
        } catch (Throwable th) {
            try {
                testOnlineVeniceProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private SchemaReader getKmeSchemaReader() {
        SchemaReader schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
        Mockito.when(schemaReader.getValueSchema(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion())).thenReturn(KafkaMessageEnvelope.SCHEMA$);
        return schemaReader;
    }

    private AbstractAvroStoreClient getMockStoreClient() throws IOException, ExecutionException, InterruptedException {
        return getMockStoreClient(false);
    }

    private AbstractAvroStoreClient getMockStoreClient(boolean z) throws IOException, ExecutionException, InterruptedException {
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        VersionImpl versionImpl = new VersionImpl(storeName, 1, "test-job-id");
        versionImpl.setPartitionCount(10);
        ZKStore zKStore = new ZKStore(storeName, "test-owner", System.currentTimeMillis(), PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, 1, 1000L, 1000L, new HybridStoreConfigImpl(1000L, 1000L, -1L, DataReplicationPolicy.ACTIVE_ACTIVE, BufferReplayPolicy.REWIND_FROM_EOP), partitionerConfigImpl, 3);
        zKStore.setPartitionCount(10);
        zKStore.setVersions(Collections.singletonList(versionImpl));
        zKStore.setWriteComputationEnabled(z);
        AbstractAvroStoreClient abstractAvroStoreClient = (AbstractAvroStoreClient) Mockito.mock(AbstractAvroStoreClient.class);
        ((AbstractAvroStoreClient) Mockito.doReturn(storeName).when(abstractAvroStoreClient)).getStoreName();
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setPartitions(10);
        versionCreationResponse.setPartitionerClass(partitionerConfigImpl.getPartitionerClass());
        versionCreationResponse.setPartitionerParams(partitionerConfigImpl.getPartitionerParams());
        versionCreationResponse.setKafkaBootstrapServers("localhost:9092");
        versionCreationResponse.setKafkaTopic(Version.composeRealTimeTopic(storeName));
        versionCreationResponse.setAmplificationFactor(1);
        versionCreationResponse.setEnableSSL(false);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(versionCreationResponse)).when(completableFuture)).get();
        ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture).when(abstractAvroStoreClient)).getRaw("request_topic/test_store");
        CompletableFuture completableFuture2 = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(STORE_SERIALIZER.serialize(zKStore, (String) null)).when(completableFuture2)).get();
        ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture2).when(abstractAvroStoreClient)).getRaw("store_state/test_store");
        configureSchemaResponseMocks(abstractAvroStoreClient, Arrays.asList(VALUE_SCHEMA_1, VALUE_SCHEMA_2), 2, Arrays.asList(UPDATE_SCHEMA_1, UPDATE_SCHEMA_2), z);
        return abstractAvroStoreClient;
    }

    private void configureSchemaResponseMocks(AbstractAvroStoreClient abstractAvroStoreClient, List<Schema> list, int i, List<Schema> list2, boolean z) throws JsonProcessingException, ExecutionException, InterruptedException {
        String schema = KEY_SCHEMA.toString();
        SchemaResponse schemaResponse = new SchemaResponse();
        schemaResponse.setId(1);
        schemaResponse.setSchemaStr(schema);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(schemaResponse)).when(completableFuture)).get();
        ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture).when(abstractAvroStoreClient)).getRaw("key_schema/test_store");
        MultiSchemaResponse.Schema[] schemaArr = new MultiSchemaResponse.Schema[list.size()];
        for (int i2 = 0; i2 < list.size(); i2++) {
            MultiSchemaResponse.Schema schema2 = new MultiSchemaResponse.Schema();
            schema2.setId(i2 + 1);
            schema2.setSchemaStr(list.get(i2).toString());
            schemaArr[i2] = schema2;
        }
        MultiSchemaResponse multiSchemaResponse = new MultiSchemaResponse();
        multiSchemaResponse.setSchemas(schemaArr);
        multiSchemaResponse.setCluster(clusterName);
        if (i > 0) {
            multiSchemaResponse.setSuperSetSchemaId(i);
        }
        CompletableFuture completableFuture2 = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(multiSchemaResponse)).when(completableFuture2)).get();
        ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture2).when(abstractAvroStoreClient)).getRaw("value_schema/test_store");
        if (!z) {
            for (int i3 = 0; i3 < list2.size(); i3++) {
                SchemaResponse schemaResponse2 = new SchemaResponse();
                schemaResponse2.setError("Update schema doesn't exist for value schema id: " + (i3 + 1) + " of store: " + storeName);
                CompletableFuture completableFuture3 = (CompletableFuture) Mockito.mock(CompletableFuture.class);
                ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(schemaResponse2)).when(completableFuture3)).get();
                ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture3).when(abstractAvroStoreClient)).getRaw("update_schema/test_store/" + (i3 + 1));
            }
            MultiSchemaResponse multiSchemaResponse2 = new MultiSchemaResponse();
            multiSchemaResponse2.setCluster(clusterName);
            multiSchemaResponse2.setName(storeName);
            multiSchemaResponse2.setSchemas(new MultiSchemaResponse.Schema[0]);
            CompletableFuture completableFuture4 = (CompletableFuture) Mockito.mock(CompletableFuture.class);
            ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(multiSchemaResponse2)).when(completableFuture4)).get();
            ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture4).when(abstractAvroStoreClient)).getRaw("update_schema/test_store");
            return;
        }
        MultiSchemaResponse multiSchemaResponse3 = new MultiSchemaResponse();
        multiSchemaResponse3.setCluster(clusterName);
        multiSchemaResponse3.setName(storeName);
        MultiSchemaResponse.Schema[] schemaArr2 = new MultiSchemaResponse.Schema[list2.size()];
        for (int i4 = 0; i4 < list2.size(); i4++) {
            SchemaResponse schemaResponse3 = new SchemaResponse();
            schemaResponse3.setCluster(clusterName);
            schemaResponse3.setName(storeName);
            schemaResponse3.setId(i4 + 1);
            schemaResponse3.setDerivedSchemaId(1);
            schemaResponse3.setSchemaStr(list2.get(i4).toString());
            CompletableFuture completableFuture5 = (CompletableFuture) Mockito.mock(CompletableFuture.class);
            ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(schemaResponse3)).when(completableFuture5)).get();
            ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture5).when(abstractAvroStoreClient)).getRaw("update_schema/test_store/" + (i4 + 1));
            MultiSchemaResponse.Schema schema3 = new MultiSchemaResponse.Schema();
            schema3.setId(i4 + 1);
            schema3.setDerivedSchemaId(1);
            schema3.setSchemaStr(list2.get(i4).toString());
            schemaArr2[i4] = schema3;
        }
        multiSchemaResponse3.setSchemas(schemaArr2);
        CompletableFuture completableFuture6 = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(MAPPER.writeValueAsBytes(multiSchemaResponse3)).when(completableFuture6)).get();
        ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture6).when(abstractAvroStoreClient)).getRaw("update_schema/test_store");
    }

    private AbstractAvroStoreClient getMockStoreClientWithRequestTopicResponse(byte[] bArr) throws IOException, ExecutionException, InterruptedException {
        AbstractAvroStoreClient abstractAvroStoreClient = (AbstractAvroStoreClient) Mockito.mock(AbstractAvroStoreClient.class);
        ((AbstractAvroStoreClient) Mockito.doReturn(storeName).when(abstractAvroStoreClient)).getStoreName();
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doReturn(bArr).when(completableFuture)).get();
        ((AbstractAvroStoreClient) Mockito.doReturn(completableFuture).when(abstractAvroStoreClient)).getRaw("request_topic/" + storeName);
        return abstractAvroStoreClient;
    }

    private static GenericRecord getMockValue(Schema schema) {
        return (GenericRecord) new RandomRecordGenerator().randomGeneric(schema, RecordGenerationConfig.newConfig().withAvoidNulls(true));
    }

    private GenericRecord createFieldNoOpRecord(Schema schema, String str) {
        return new GenericData.Record((Schema) schema.getField(str).schema().getTypes().get(0));
    }

    private static RecordSerializer<Object> getSerializer(Schema schema) {
        return FastSerializerDeserializerFactory.getAvroGenericSerializer(schema);
    }

    private void assertThrowsExceptionFromFuture(Class cls, Assert.ThrowingRunnable throwingRunnable) {
        Throwable th = null;
        try {
            throwingRunnable.run();
        } catch (ExecutionException e) {
            if (e.getCause() != null && cls.isInstance(e.getCause())) {
                return;
            } else {
                th = e;
            }
        } catch (Throwable th2) {
            th = th2;
        }
        if (th == null) {
            Assert.fail("Expected exception to be thrown");
        }
        throw new AssertionError(th.getMessage(), th);
    }
}
