package com.linkedin.davinci.store.view;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.class */
public class ChangeCaptureViewWriterTest {
    private static final Schema SCHEMA = AvroCompatibilityHelper.parse(new String[]{"\"string\""});
    private static final byte[] KEY = "fishy_name".getBytes();
    private static final ByteBuffer OLD_VALUE = ByteBuffer.wrap("herring".getBytes());
    private static final ByteBuffer NEW_VALUE = ByteBuffer.wrap("silver_darling".getBytes());
    private static final String STORE_NAME = "Clupea-pallasii";
    private static final String PUSH_JOB_ID = "sitka-sound";
    public static final String LTX_1 = "ltx1";
    public static final String LVA_1 = "lva1";
    public static final String LOR_1 = "lor1";

    @Test
    public void testConstructVersionSwapMessage() {
        HashMap hashMap = new HashMap();
        hashMap.put(LOR_1, 111L);
        hashMap.put(LTX_1, 99L);
        hashMap.put(LVA_1, 22222L);
        PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        Mockito.when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER);
        Mockito.when(partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap()).thenReturn(hashMap);
        Mockito.when(Integer.valueOf(partitionConsumptionState.getPartition())).thenReturn(1);
        PartitionConsumptionState partitionConsumptionState2 = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
        Mockito.when(partitionConsumptionState2.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.STANDBY);
        Mockito.when(partitionConsumptionState2.getLatestProcessedUpstreamRTOffsetMap()).thenReturn(hashMap);
        Mockito.when(Integer.valueOf(partitionConsumptionState2.getPartition())).thenReturn(1);
        VersionSwap versionSwap = new VersionSwap();
        versionSwap.oldServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 1);
        versionSwap.newServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 2);
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageUnion = versionSwap;
        Store store = (Store) Mockito.mock(Store.class);
        VeniceProperties veniceProperties = new VeniceProperties();
        Object2IntOpenHashMap object2IntOpenHashMap = new Object2IntOpenHashMap();
        object2IntOpenHashMap.put(LTX_1, 0);
        object2IntOpenHashMap.put(LVA_1, 1);
        object2IntOpenHashMap.put(LOR_1, 2);
        Future future = (Future) Mockito.mock(Future.class);
        VeniceWriter veniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
        Mockito.when(veniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(future);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        Mockito.when(veniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(object2IntOpenHashMap);
        VeniceConfigLoader veniceConfigLoader = (VeniceConfigLoader) Mockito.mock(VeniceConfigLoader.class);
        Mockito.when(veniceConfigLoader.getCombinedProperties()).thenReturn(veniceProperties);
        Mockito.when(veniceConfigLoader.getVeniceServerConfig()).thenReturn(veniceServerConfig);
        ChangeCaptureViewWriter changeCaptureViewWriter = new ChangeCaptureViewWriter(veniceConfigLoader, store, SCHEMA, Collections.emptyMap());
        changeCaptureViewWriter.setVeniceWriter(veniceWriter);
        changeCaptureViewWriter.processControlMessage(controlMessage, 1, partitionConsumptionState2, 1);
        ((VeniceWriter) Mockito.verify(veniceWriter, Mockito.never())).sendControlMessage((ControlMessage) Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), (PubSubProducerCallback) Mockito.any(), (LeaderMetadataWrapper) Mockito.any());
        ControlMessage controlMessage2 = new ControlMessage();
        controlMessage2.controlMessageUnion = new EndOfIncrementalPush();
        changeCaptureViewWriter.processControlMessage(controlMessage2, 1, partitionConsumptionState, 1);
        ((VeniceWriter) Mockito.verify(veniceWriter, Mockito.never())).sendControlMessage((ControlMessage) Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), (PubSubProducerCallback) Mockito.any(), (LeaderMetadataWrapper) Mockito.any());
        VersionSwap versionSwap2 = new VersionSwap();
        versionSwap2.oldServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 2);
        versionSwap2.newServingVersionTopic = Version.composeKafkaTopic(STORE_NAME, 3);
        controlMessage2.controlMessageUnion = versionSwap2;
        changeCaptureViewWriter.processControlMessage(controlMessage2, 1, partitionConsumptionState, 1);
        ((VeniceWriter) Mockito.verify(veniceWriter, Mockito.never())).sendControlMessage((ControlMessage) Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), (PubSubProducerCallback) Mockito.any(), (LeaderMetadataWrapper) Mockito.any());
        changeCaptureViewWriter.processControlMessage(controlMessage, 1, partitionConsumptionState, 1);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ControlMessage.class);
        ((VeniceWriter) Mockito.verify(veniceWriter)).sendControlMessage((ControlMessage) forClass.capture(), Mockito.eq(1), Mockito.anyMap(), (PubSubProducerCallback) Mockito.isNull(), (LeaderMetadataWrapper) Mockito.eq(VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER));
        ControlMessage controlMessage3 = (ControlMessage) forClass.getValue();
        Assert.assertEquals(controlMessage3.controlMessageType, 8);
        Assert.assertTrue(controlMessage3.controlMessageUnion instanceof VersionSwap);
        VersionSwap versionSwap3 = (VersionSwap) controlMessage3.controlMessageUnion;
        Assert.assertEquals(versionSwap3.oldServingVersionTopic, Version.composeKafkaTopic(STORE_NAME, 1));
        Assert.assertEquals(versionSwap3.newServingVersionTopic, Version.composeKafkaTopic(STORE_NAME, 2));
        Assert.assertEquals(versionSwap3.localHighWatermarks.get(0), hashMap.get(LTX_1));
        Assert.assertEquals(versionSwap3.localHighWatermarks.get(1), hashMap.get(LVA_1));
        Assert.assertEquals(versionSwap3.localHighWatermarks.get(2), hashMap.get(LOR_1));
    }

    @Test
    public void testBuildWriterOptions() {
        Store store = (Store) Mockito.mock(Store.class);
        VersionImpl versionImpl = new VersionImpl(STORE_NAME, 1, PUSH_JOB_ID);
        Mockito.when(store.getVersion(1)).thenReturn(Optional.of(versionImpl));
        Mockito.when(store.getName()).thenReturn(STORE_NAME);
        VeniceProperties veniceProperties = new VeniceProperties();
        Object2IntOpenHashMap object2IntOpenHashMap = new Object2IntOpenHashMap();
        Mockito.when(((VeniceWriter) Mockito.mock(VeniceWriter.class)).put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn((Future) Mockito.mock(Future.class));
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        Mockito.when(veniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(object2IntOpenHashMap);
        VeniceConfigLoader veniceConfigLoader = (VeniceConfigLoader) Mockito.mock(VeniceConfigLoader.class);
        Mockito.when(veniceConfigLoader.getCombinedProperties()).thenReturn(veniceProperties);
        Mockito.when(veniceConfigLoader.getVeniceServerConfig()).thenReturn(veniceServerConfig);
        VeniceWriterOptions buildWriterOptions = new ChangeCaptureViewWriter(veniceConfigLoader, store, SCHEMA, Collections.emptyMap()).buildWriterOptions(1);
        Assert.assertEquals(buildWriterOptions.getTopicName(), "Clupea-pallasii_v1_cc");
        Assert.assertFalse(buildWriterOptions.isChunkingEnabled());
        Assert.assertEquals(versionImpl.getPartitionerConfig().getPartitionerClass(), buildWriterOptions.getPartitioner().getClass().getCanonicalName());
    }

    @Test
    public void testProcessRecord() {
        Store store = (Store) Mockito.mock(Store.class);
        VeniceProperties veniceProperties = new VeniceProperties();
        Object2IntOpenHashMap object2IntOpenHashMap = new Object2IntOpenHashMap();
        Future future = (Future) Mockito.mock(Future.class);
        VeniceWriter veniceWriter = (VeniceWriter) Mockito.mock(VeniceWriter.class);
        Mockito.when(veniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(future);
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        Mockito.when(veniceServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(object2IntOpenHashMap);
        VeniceConfigLoader veniceConfigLoader = (VeniceConfigLoader) Mockito.mock(VeniceConfigLoader.class);
        Mockito.when(veniceConfigLoader.getCombinedProperties()).thenReturn(veniceProperties);
        Mockito.when(veniceConfigLoader.getVeniceServerConfig()).thenReturn(veniceServerConfig);
        ChangeCaptureViewWriter changeCaptureViewWriter = new ChangeCaptureViewWriter(veniceConfigLoader, store, SCHEMA, Collections.emptyMap());
        Schema generateMetadataSchema = RmdSchemaGenerator.generateMetadataSchema(SCHEMA, 1);
        List asList = Arrays.asList(1L, 2L, 3L);
        GenericData.Record record = new GenericData.Record(generateMetadataSchema);
        record.put("timestamp", 20L);
        record.put("replication_checkpoint_vector", asList);
        changeCaptureViewWriter.setVeniceWriter(veniceWriter);
        changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, 1, record);
        changeCaptureViewWriter.processRecord(NEW_VALUE, (ByteBuffer) null, KEY, 1, 1, 1, record);
        changeCaptureViewWriter.processRecord((ByteBuffer) null, OLD_VALUE, KEY, 1, 1, 1, record);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(RecordChangeEvent.class);
        ((VeniceWriter) Mockito.verify(veniceWriter, Mockito.atLeastOnce())).put(forClass.capture(), forClass2.capture(), Mockito.eq(1));
        List allValues = forClass2.getAllValues();
        List allValues2 = forClass.getAllValues();
        Assert.assertEquals((byte[]) allValues2.get(0), KEY);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(0)).key.array(), KEY);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(0)).replicationCheckpointVector, asList);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(0)).currentValue.value, NEW_VALUE);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(0)).previousValue.value, OLD_VALUE);
        Assert.assertEquals((byte[]) allValues2.get(1), KEY);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(1)).key.array(), KEY);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(1)).replicationCheckpointVector, asList);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(1)).currentValue.value, NEW_VALUE);
        Assert.assertNull(((RecordChangeEvent) allValues.get(1)).previousValue);
        Assert.assertEquals((byte[]) allValues2.get(2), KEY);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(2)).key.array(), KEY);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(2)).replicationCheckpointVector, asList);
        Assert.assertNull(((RecordChangeEvent) allValues.get(2)).currentValue);
        Assert.assertEquals(((RecordChangeEvent) allValues.get(2)).previousValue.value, OLD_VALUE);
        changeCaptureViewWriter.close();
        ((VeniceWriter) Mockito.verify(veniceWriter)).close();
    }
}
