package com.linkedin.davinci.store.view;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState;
import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent;
import com.linkedin.venice.client.change.capture.protocol.ValueBytes;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.views.ChangeCaptureView;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/store/view/ChangeCaptureViewWriter.class */
public class ChangeCaptureViewWriter extends VeniceViewWriter {
    private static final Logger LOGGER = LogManager.getLogger(ChangeCaptureViewWriter.class);
    private final ChangeCaptureView internalView;
    private VeniceWriter veniceWriter;
    private final Object2IntMap<String> kafkaClusterUrlToIdMap;
    private final int maxColoIdValue;

    public ChangeCaptureViewWriter(VeniceConfigLoader veniceConfigLoader, Store store, Schema schema, Map<String, String> map) {
        super(veniceConfigLoader, store, schema, map);
        this.internalView = new ChangeCaptureView(veniceConfigLoader.getCombinedProperties().toProperties(), store, map);
        this.kafkaClusterUrlToIdMap = veniceConfigLoader.getVeniceServerConfig().getKafkaClusterUrlToIdMap();
        this.maxColoIdValue = ((Integer) this.kafkaClusterUrlToIdMap.values().stream().max((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(-1)).intValue();
    }

    @Override // com.linkedin.davinci.store.view.VeniceViewWriter
    public void processRecord(ByteBuffer byteBuffer, ByteBuffer byteBuffer2, byte[] bArr, int i, int i2, int i3, GenericRecord genericRecord) {
        RecordChangeEvent recordChangeEvent = new RecordChangeEvent();
        recordChangeEvent.currentValue = constructValueBytes(byteBuffer, i2);
        recordChangeEvent.previousValue = constructValueBytes(byteBuffer2, i3);
        recordChangeEvent.key = ByteBuffer.wrap(bArr);
        recordChangeEvent.replicationCheckpointVector = RmdUtils.extractOffsetVectorFromRmd(genericRecord);
        if (this.veniceWriter == null) {
            initializeVeniceWriter(i);
        }
        try {
            this.veniceWriter.put(bArr, recordChangeEvent, 1).get();
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Failed to produce to Change Capture view topic for store: {} version: {}", this.store.getName(), Integer.valueOf(i));
            throw new VeniceException(e);
        }
    }

    @Override // com.linkedin.davinci.store.view.VeniceViewWriter
    public void processControlMessage(ControlMessage controlMessage, int i, PartitionConsumptionState partitionConsumptionState, int i2) {
        List<Long> emptyList;
        if ((controlMessage.getControlMessageUnion() instanceof VersionSwap) && partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) {
            VersionSwap versionSwap = (VersionSwap) controlMessage.getControlMessageUnion();
            if (Version.parseVersionFromVersionTopicName(versionSwap.oldServingVersionTopic.toString()) != i2) {
                return;
            }
            Map<String, Long> latestProcessedUpstreamRTOffsetMap = partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap();
            if (this.maxColoIdValue > -1) {
                emptyList = new ArrayList(Collections.nCopies(this.maxColoIdValue + 1, 0L));
                for (String str : latestProcessedUpstreamRTOffsetMap.keySet()) {
                    emptyList.set(this.kafkaClusterUrlToIdMap.getInt(str), partitionConsumptionState.getLatestProcessedUpstreamRTOffsetMap().get(str));
                }
            } else {
                emptyList = Collections.emptyList();
            }
            if (this.veniceWriter == null) {
                initializeVeniceWriter(i2);
            }
            this.veniceWriter.sendControlMessage(constructVersionSwapControlMessage(versionSwap, emptyList), partitionConsumptionState.getPartition(), Collections.emptyMap(), (PubSubProducerCallback) null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER);
        }
    }

    public Map<String, VeniceProperties> getTopicNamesAndConfigsForVersion(int i) {
        return this.internalView.getTopicNamesAndConfigsForVersion(i);
    }

    public String getWriterClassName() {
        return this.internalView.getWriterClassName();
    }

    public void close() {
        this.internalView.close();
        if (this.veniceWriter != null) {
            this.veniceWriter.close();
        }
    }

    void setVeniceWriter(VeniceWriter veniceWriter) {
        this.veniceWriter = veniceWriter;
    }

    VeniceWriterOptions buildWriterOptions(int i) {
        VeniceWriterOptions.Builder builder = new VeniceWriterOptions.Builder(getTopicNamesAndConfigsForVersion(i).keySet().stream().findAny().get());
        builder.setValueSerializer(new VeniceAvroKafkaSerializer(RecordChangeEvent.getClassSchema()));
        Version version = (Version) this.store.getVersion(i).get();
        PartitionerConfig partitionerConfig = version.getPartitionerConfig();
        if (partitionerConfig != null) {
            builder.setPartitioner(PartitionUtils.getVenicePartitioner(partitionerConfig));
        }
        builder.setChunkingEnabled(version.isChunkingEnabled());
        return builder.build();
    }

    private synchronized void initializeVeniceWriter(int i) {
        if (this.veniceWriter != null) {
            return;
        }
        this.veniceWriter = new VeniceWriterFactory(this.props).createVeniceWriter(buildWriterOptions(i));
    }

    private ValueBytes constructValueBytes(ByteBuffer byteBuffer, int i) {
        if (byteBuffer == null) {
            return null;
        }
        ValueBytes valueBytes = new ValueBytes();
        valueBytes.schemaId = i;
        valueBytes.value = byteBuffer;
        return valueBytes;
    }

    private ControlMessage constructVersionSwapControlMessage(VersionSwap versionSwap, List<Long> list) {
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageType = ControlMessageType.VERSION_SWAP.getValue();
        controlMessage.controlMessageUnion = ControlMessageType.VERSION_SWAP.getNewInstance();
        VersionSwap versionSwap2 = new VersionSwap();
        versionSwap2.oldServingVersionTopic = versionSwap.oldServingVersionTopic;
        versionSwap2.newServingVersionTopic = versionSwap.newServingVersionTopic;
        versionSwap2.localHighWatermarks = list;
        controlMessage.controlMessageUnion = versionSwap2;
        return controlMessage;
    }
}
