package com.linkedin.davinci.consumer;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent;
import com.linkedin.venice.client.change.capture.protocol.ValueBytes;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.EndOfPush;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.StartOfPush;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.class */
public class VeniceChangelogConsumerImplTest {
    private String storeName;
    private RecordSerializer<String> keySerializer;
    private RecordSerializer<String> valueSerializer;
    private Schema rmdSchema;
    private SchemaReader schemaReader;

    @BeforeMethod
    public void setUp() {
        this.storeName = Utils.getUniqueString();
        this.schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
        Schema parse = AvroCompatibilityHelper.parse(new String[]{"\"string\""});
        ((SchemaReader) Mockito.doReturn(parse).when(this.schemaReader)).getKeySchema();
        Schema parse2 = AvroCompatibilityHelper.parse(new String[]{"\"string\""});
        ((SchemaReader) Mockito.doReturn(parse2).when(this.schemaReader)).getValueSchema(1);
        this.rmdSchema = RmdSchemaGenerator.generateMetadataSchema(parse2, 1);
        this.keySerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(parse);
        this.valueSerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(parse2);
    }

    @Test
    public void testConsumeBeforeAndAfterImage() throws ExecutionException, InterruptedException {
        D2ControllerClient d2ControllerClient = (D2ControllerClient) Mockito.mock(D2ControllerClient.class);
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        StoreInfo storeInfo = (StoreInfo) Mockito.mock(StoreInfo.class);
        ((StoreInfo) Mockito.doReturn(1).when(storeInfo)).getCurrentVersion();
        ((StoreInfo) Mockito.doReturn(2).when(storeInfo)).getPartitionCount();
        ((StoreResponse) Mockito.doReturn(storeInfo).when(storeResponse)).getStore();
        ((D2ControllerClient) Mockito.doReturn(storeResponse).when(d2ControllerClient)).getStore(this.storeName);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ((Consumer) Mockito.doReturn(new HashSet()).when(consumer)).assignment();
        String composeKafkaTopic = Version.composeKafkaTopic(this.storeName, 2);
        String composeKafkaTopic2 = Version.composeKafkaTopic(this.storeName, 1);
        String str = composeKafkaTopic + "_cc";
        prepareChangeCaptureRecordsToBePolled(0L, 5L, consumer, composeKafkaTopic2 + "_cc", 0, composeKafkaTopic2, composeKafkaTopic);
        VeniceChangelogConsumerImpl veniceChangelogConsumerImpl = new VeniceChangelogConsumerImpl(new ChangelogClientConfig().setD2ControllerClient(d2ControllerClient).setSchemaReader(this.schemaReader).setStoreName(this.storeName).setViewName("changeCaptureView"), consumer);
        veniceChangelogConsumerImpl.setReadOnlySchemaRepository((ThinClientMetaStoreBasedRepository) Mockito.mock(ThinClientMetaStoreBasedRepository.class));
        veniceChangelogConsumerImpl.subscribe(new HashSet(Arrays.asList(0))).get();
        ((Consumer) Mockito.verify(consumer)).assign(Arrays.asList(new TopicPartition(composeKafkaTopic2, 0)));
        List list = (List) veniceChangelogConsumerImpl.poll(100L);
        for (int i = 0; i < 5; i++) {
            ChangeEvent changeEvent = (ChangeEvent) ((PubSubMessage) list.get(i)).getValue();
            Assert.assertEquals(((Utf8) changeEvent.getCurrentValue()).toString(), "newValue" + i);
            Assert.assertEquals(((Utf8) changeEvent.getPreviousValue()).toString(), "oldValue" + i);
        }
        ((Consumer) Mockito.verify(consumer)).assign(Arrays.asList(new TopicPartition(str, 0)));
        Assert.assertTrue(((List) veniceChangelogConsumerImpl.poll(100L)).isEmpty());
    }

    @Test
    public void testConsumeAfterImage() throws ExecutionException, InterruptedException {
        D2ControllerClient d2ControllerClient = (D2ControllerClient) Mockito.mock(D2ControllerClient.class);
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        StoreInfo storeInfo = (StoreInfo) Mockito.mock(StoreInfo.class);
        ((StoreInfo) Mockito.doReturn(1).when(storeInfo)).getCurrentVersion();
        ((StoreInfo) Mockito.doReturn(2).when(storeInfo)).getPartitionCount();
        ((StoreResponse) Mockito.doReturn(storeInfo).when(storeResponse)).getStore();
        ((D2ControllerClient) Mockito.doReturn(storeResponse).when(d2ControllerClient)).getStore(this.storeName);
        MultiSchemaResponse multiSchemaResponse = (MultiSchemaResponse) Mockito.mock(MultiSchemaResponse.class);
        MultiSchemaResponse.Schema schema = (MultiSchemaResponse.Schema) Mockito.mock(MultiSchemaResponse.Schema.class);
        ((MultiSchemaResponse.Schema) Mockito.doReturn(this.rmdSchema.toString()).when(schema)).getSchemaStr();
        ((MultiSchemaResponse) Mockito.doReturn(new MultiSchemaResponse.Schema[]{schema}).when(multiSchemaResponse)).getSchemas();
        ((D2ControllerClient) Mockito.doReturn(multiSchemaResponse).when(d2ControllerClient)).getAllReplicationMetadataSchemas(this.storeName);
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        String composeKafkaTopic = Version.composeKafkaTopic(this.storeName, 1);
        String str = composeKafkaTopic + "_cc";
        prepareVersionTopicRecordsToBePolled(0L, 5L, consumer, composeKafkaTopic, 0, true);
        VeniceAfterImageConsumerImpl veniceAfterImageConsumerImpl = new VeniceAfterImageConsumerImpl(new ChangelogClientConfig().setD2ControllerClient(d2ControllerClient).setSchemaReader(this.schemaReader).setStoreName(this.storeName).setViewName(""), consumer);
        veniceAfterImageConsumerImpl.setReadOnlySchemaRepository((ThinClientMetaStoreBasedRepository) Mockito.mock(ThinClientMetaStoreBasedRepository.class));
        veniceAfterImageConsumerImpl.subscribe(new HashSet(Arrays.asList(0))).get();
        ((Consumer) Mockito.verify(consumer)).assign(Arrays.asList(new TopicPartition(composeKafkaTopic, 0)));
        List list = (List) veniceAfterImageConsumerImpl.poll(100L);
        for (int i = 0; i < 5; i++) {
            Assert.assertEquals(((Utf8) ((ChangeEvent) ((PubSubMessage) list.get(i)).getValue()).getCurrentValue()).toString(), "newValue" + i);
        }
        ((Consumer) Mockito.verify(consumer)).assign(Arrays.asList(new TopicPartition(str, 0)));
        prepareChangeCaptureRecordsToBePolled(0L, 10L, consumer, str, 0, composeKafkaTopic, "");
        List list2 = (List) veniceAfterImageConsumerImpl.poll(100L);
        Assert.assertFalse(list2.isEmpty());
        Assert.assertEquals(list2.size(), 10);
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertEquals(((Utf8) ((ChangeEvent) ((PubSubMessage) list2.get(i2)).getValue()).getCurrentValue()).toString(), "newValue" + i2);
        }
        veniceAfterImageConsumerImpl.close();
        ((Consumer) Mockito.verify(consumer, Mockito.times(2))).assign((Collection) Mockito.any());
        ((Consumer) Mockito.verify(consumer)).close();
    }

    private void prepareChangeCaptureRecordsToBePolled(long j, long j2, Consumer consumer, String str, int i, String str2, String str3) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(constructStartOfPushMessage(str2, i));
        HashMap hashMap = new HashMap();
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 >= j2) {
                break;
            }
            arrayList.add(constructChangeCaptureConsumerRecord(str, i, "oldValue" + j4, "newValue" + j4, "key" + j4, Arrays.asList(Long.valueOf(j4), Long.valueOf(j4))));
            j3 = j4 + 1;
        }
        if (!str3.isEmpty()) {
            arrayList.add(constructVersionSwapMessage(str2, str2, str3, i, Arrays.asList(Long.valueOf(j2), Long.valueOf(j2))));
        }
        hashMap.put(new TopicPartition(str, i), arrayList);
        ((Consumer) Mockito.doReturn(new ConsumerRecords(hashMap)).when(consumer)).poll(100L);
    }

    private void prepareVersionTopicRecordsToBePolled(long j, long j2, Consumer consumer, String str, int i, boolean z) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 >= j2) {
                break;
            }
            arrayList.add(constructConsumerRecord(str, i, "newValue" + j4, "key" + j4, Arrays.asList(Long.valueOf(j4), Long.valueOf(j4))));
            j3 = j4 + 1;
        }
        if (z) {
            arrayList.add(constructEndOfPushMessage(str, i));
        }
        hashMap.put(new TopicPartition(str, i), arrayList);
        ((Consumer) Mockito.doReturn(new ConsumerRecords(hashMap)).when(consumer)).poll(100L);
    }

    private ConsumerRecord<KafkaKey, KafkaMessageEnvelope> constructVersionSwapMessage(String str, String str2, String str3, int i, List<Long> list) {
        KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, (byte[]) null);
        VersionSwap versionSwap = new VersionSwap();
        versionSwap.oldServingVersionTopic = str2;
        versionSwap.newServingVersionTopic = str3;
        versionSwap.localHighWatermarks = list;
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageUnion = versionSwap;
        controlMessage.controlMessageType = ControlMessageType.VERSION_SWAP.getValue();
        kafkaMessageEnvelope.payloadUnion = controlMessage;
        return new ConsumerRecord<>(str, i, 0L, kafkaKey, kafkaMessageEnvelope);
    }

    private ConsumerRecord<KafkaKey, KafkaMessageEnvelope> constructChangeCaptureConsumerRecord(String str, int i, String str2, String str3, String str4, List<Long> list) {
        ValueBytes valueBytes = new ValueBytes();
        valueBytes.schemaId = 1;
        valueBytes.value = ByteBuffer.wrap(this.valueSerializer.serialize(str2));
        ValueBytes valueBytes2 = new ValueBytes();
        valueBytes2.schemaId = 1;
        valueBytes2.value = ByteBuffer.wrap(this.valueSerializer.serialize(str3));
        RecordChangeEvent recordChangeEvent = new RecordChangeEvent();
        recordChangeEvent.currentValue = valueBytes2;
        recordChangeEvent.previousValue = valueBytes;
        recordChangeEvent.key = ByteBuffer.wrap(str4.getBytes());
        recordChangeEvent.replicationCheckpointVector = list;
        RecordSerializer fastAvroGenericSerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(AvroProtocolDefinition.RECORD_CHANGE_EVENT.getCurrentProtocolVersionSchema());
        fastAvroGenericSerializer.serialize(recordChangeEvent);
        return new ConsumerRecord<>(str, i, 0L, new KafkaKey(MessageType.PUT, this.keySerializer.serialize(str4)), new KafkaMessageEnvelope(Integer.valueOf(MessageType.PUT.getValue()), new ProducerMetadata(), new Put(ByteBuffer.wrap(fastAvroGenericSerializer.serialize(recordChangeEvent)), 0, 0, ByteBuffer.allocate(0)), (LeaderMetadata) null));
    }

    private ConsumerRecord<KafkaKey, KafkaMessageEnvelope> constructConsumerRecord(String str, int i, String str2, String str3, List<Long> list) {
        GenericData.Record record = new GenericData.Record(this.rmdSchema);
        record.put("timestamp", 0L);
        record.put("replication_checkpoint_vector", list);
        return new ConsumerRecord<>(str, i, 0L, new KafkaKey(MessageType.PUT, this.keySerializer.serialize(str3)), new KafkaMessageEnvelope(Integer.valueOf(MessageType.PUT.getValue()), new ProducerMetadata(), new Put(ByteBuffer.wrap(this.valueSerializer.serialize(str2)), 1, 1, RmdUtils.serializeRmdRecord(this.rmdSchema, record)), (LeaderMetadata) null));
    }

    private ConsumerRecord<KafkaKey, KafkaMessageEnvelope> constructEndOfPushMessage(String str, int i) {
        KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, (byte[]) null);
        EndOfPush endOfPush = new EndOfPush();
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageUnion = endOfPush;
        controlMessage.controlMessageType = ControlMessageType.END_OF_PUSH.getValue();
        kafkaMessageEnvelope.payloadUnion = controlMessage;
        return new ConsumerRecord<>(str, i, 0L, kafkaKey, kafkaMessageEnvelope);
    }

    private ConsumerRecord<KafkaKey, KafkaMessageEnvelope> constructStartOfPushMessage(String str, int i) {
        KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, (byte[]) null);
        StartOfPush startOfPush = new StartOfPush();
        startOfPush.compressionStrategy = CompressionStrategy.NO_OP.getValue();
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageUnion = startOfPush;
        controlMessage.controlMessageType = ControlMessageType.START_OF_PUSH.getValue();
        kafkaMessageEnvelope.payloadUnion = controlMessage;
        return new ConsumerRecord<>(str, i, 0L, kafkaKey, kafkaMessageEnvelope);
    }
}
