package com.linkedin.davinci.consumer;

import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
import com.linkedin.davinci.storage.chunking.AbstractAvroChunkingAdapter;
import com.linkedin.davinci.storage.chunking.GenericChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SpecificRecordChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.memory.InMemoryStorageEngine;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.class */
public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsumer<K, V> {
    private final int partitionCount;
    protected ThinClientMetaStoreBasedRepository storeRepository;
    protected final ReadOnlySchemaRepository recordChangeEventSchemaRepository;
    protected final AbstractAvroChunkingAdapter<V> userEventChunkingAdapter;
    protected final SchemaReader schemaReader;
    private final String viewClassName;
    protected final RecordDeserializer<K> keyDeserializer;
    private final D2ControllerClient d2ControllerClient;
    protected final ReplicationMetadataSchemaRepository replicationMetadataSchemaRepository;
    protected final String storeName;
    protected final InMemoryStorageEngine inMemoryStorageEngine;
    protected final PubSubConsumerAdapter pubSubConsumer;
    protected final int[] currentValuePayloadSize;
    protected final ChangelogClientConfig changelogClientConfig;
    private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class);
    protected static final VeniceCompressor NO_OP_COMPRESSOR = new NoopCompressor();
    protected final CompressorFactory compressorFactory = new CompressorFactory();
    protected final HashMap<Integer, VeniceCompressor> compressorMap = new HashMap<>();
    protected final AbstractAvroChunkingAdapter<RecordChangeEvent> recordChangeEventChunkingAdapter = new SpecificRecordChunkingAdapter(RecordChangeEvent.class);
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private final RecordDeserializer<RecordChangeEvent> recordChangeDeserializer = FastSerializerDeserializerFactory.getFastAvroSpecificDeserializer(AvroProtocolDefinition.RECORD_CHANGE_EVENT.getCurrentProtocolVersionSchema(), RecordChangeEvent.class);
    protected final Map<Integer, List<Long>> currentVersionHighWatermarks = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl$SeekFunction.class */
    public interface SeekFunction {
        void apply(PubSubTopicPartition pubSubTopicPartition);
    }

    public VeniceChangelogConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter pubSubConsumerAdapter) {
        this.pubSubConsumer = pubSubConsumerAdapter;
        this.storeName = changelogClientConfig.getStoreName();
        this.d2ControllerClient = changelogClientConfig.getD2ControllerClient();
        StoreResponse store = changelogClientConfig.getD2ControllerClient().getStore(this.storeName);
        if (store.isError()) {
            throw new VeniceException("Failed to get store info for store: " + this.storeName + " with error: " + store.getError());
        }
        StoreInfo store2 = store.getStore();
        this.changelogClientConfig = ChangelogClientConfig.cloneConfig(changelogClientConfig);
        this.partitionCount = store2.getPartitionCount();
        this.currentValuePayloadSize = new int[this.partitionCount];
        this.viewClassName = changelogClientConfig.getViewName();
        this.replicationMetadataSchemaRepository = new ReplicationMetadataSchemaRepository(this.d2ControllerClient);
        this.schemaReader = changelogClientConfig.getSchemaReader();
        Schema keySchema = this.schemaReader.getKeySchema();
        this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
        this.inMemoryStorageEngine = new InMemoryStorageEngine(this.storeName);
        this.storeRepository = new ThinClientMetaStoreBasedRepository(changelogClientConfig.getInnerClientConfig(), VeniceProperties.empty(), null);
        this.recordChangeEventSchemaRepository = new RecordChangeEventReadOnlySchemaRepository(this.storeRepository);
        Class specificValueClass = changelogClientConfig.getInnerClientConfig().getSpecificValueClass();
        if (specificValueClass != null) {
            this.userEventChunkingAdapter = new SpecificRecordChunkingAdapter(specificValueClass);
        } else {
            this.userEventChunkingAdapter = GenericChunkingAdapter.INSTANCE;
        }
        LOGGER.info("Start a change log consumer client for store: {}, with partition count: {} and view class: {} ", this.storeName, Integer.valueOf(this.partitionCount), this.viewClassName);
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> subscribe(Set<Integer> set) {
        return internalSubscribe(set, null);
    }

    public CompletableFuture<Void> internalSubscribe(Set<Integer> set, PubSubTopic pubSubTopic) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                this.storeRepository.start();
                this.storeRepository.subscribe(this.storeName);
                this.storeRepository.refresh();
                PubSubTopic currentServingVersionTopic = pubSubTopic == null ? getCurrentServingVersionTopic() : pubSubTopic;
                synchronized (this.pubSubConsumer) {
                    HashSet hashSet = new HashSet(this.pubSubConsumer.getAssignment());
                    for (PubSubTopicPartition pubSubTopicPartition : this.pubSubConsumer.getAssignment()) {
                        if (set.contains(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()))) {
                            this.pubSubConsumer.unSubscribe(pubSubTopicPartition);
                        }
                    }
                    List<PubSubTopicPartition> partitionListToSubscribe = getPartitionListToSubscribe(set, hashSet, currentServingVersionTopic);
                    List<PubSubTopicPartition> partitionListToSubscribe2 = getPartitionListToSubscribe(set, Collections.EMPTY_SET, currentServingVersionTopic);
                    for (PubSubTopicPartition pubSubTopicPartition2 : partitionListToSubscribe) {
                        if (!pubSubTopicPartition2.getPubSubTopic().getName().endsWith("_cc")) {
                            this.compressorMap.put(Integer.valueOf(pubSubTopicPartition2.getPartitionNumber()), getVersionCompressor(pubSubTopicPartition2));
                        }
                    }
                    Iterator<PubSubTopicPartition> it = partitionListToSubscribe2.iterator();
                    while (it.hasNext()) {
                        this.pubSubConsumer.subscribe(it.next(), -1L);
                    }
                }
                return null;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected VeniceCompressor getVersionCompressor(PubSubTopicPartition pubSubTopicPartition) {
        VeniceCompressor compressor;
        Store store = this.storeRepository.getStore(this.storeName);
        String name = pubSubTopicPartition.getPubSubTopic().getName();
        Version version = (Version) store.getVersion(Version.parseVersionFromVersionTopicName(name)).get();
        if (CompressionStrategy.ZSTD_WITH_DICT.equals(version.getCompressionStrategy())) {
            compressor = this.compressorFactory.getVersionSpecificCompressor(name);
            if (compressor == null) {
                compressor = this.compressorFactory.createVersionSpecificCompressorIfNotExist(version.getCompressionStrategy(), name, DictionaryUtils.readDictionaryFromKafka(name, new VeniceProperties(this.changelogClientConfig.getConsumerProperties())).array());
            }
        } else {
            compressor = this.compressorFactory.getCompressor(version.getCompressionStrategy());
        }
        return compressor;
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToBeginningOfPush(Set<Integer> set) {
        return internalSeek(set, getCurrentServingVersionTopic(), pubSubTopicPartition -> {
            this.pubSubConsumer.subscribe(pubSubTopicPartition, -1L);
        });
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToBeginningOfPush() {
        return seekToBeginningOfPush((Set) this.pubSubConsumer.getAssignment().stream().map(pubSubTopicPartition -> {
            return Integer.valueOf(pubSubTopicPartition.getPartitionNumber());
        }).collect(Collectors.toSet()));
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToEndOfPush(Set<Integer> set) {
        this.storeRepository.refresh();
        return internalSeek(set, this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(this.storeName, this.storeRepository.getStore(this.storeName).getCurrentVersion()) + "_cc"), pubSubTopicPartition -> {
            this.pubSubConsumer.subscribe(pubSubTopicPartition, -1L);
        });
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void pause() {
        pause((Set) this.pubSubConsumer.getAssignment().stream().map((v0) -> {
            return v0.getPartitionNumber();
        }).collect(Collectors.toSet()));
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void resume(Set<Integer> set) {
        synchronized (this.pubSubConsumer) {
            for (PubSubTopicPartition pubSubTopicPartition : this.pubSubConsumer.getAssignment()) {
                if (set.contains(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()))) {
                    this.pubSubConsumer.resume(pubSubTopicPartition);
                }
            }
        }
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void resume() {
        resume((Set) this.pubSubConsumer.getAssignment().stream().map((v0) -> {
            return v0.getPartitionNumber();
        }).collect(Collectors.toSet()));
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void pause(Set<Integer> set) {
        synchronized (this.pubSubConsumer) {
            for (PubSubTopicPartition pubSubTopicPartition : this.pubSubConsumer.getAssignment()) {
                if (set.contains(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()))) {
                    this.pubSubConsumer.pause(pubSubTopicPartition);
                }
            }
        }
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToEndOfPush() {
        return seekToEndOfPush((Set) this.pubSubConsumer.getAssignment().stream().map(pubSubTopicPartition -> {
            return Integer.valueOf(pubSubTopicPartition.getPartitionNumber());
        }).collect(Collectors.toSet()));
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToTail(Set<Integer> set) {
        return internalSeek(set, this.pubSubTopicRepository.getTopic(getCurrentServingVersionTopic() + "_cc"), pubSubTopicPartition -> {
            pubSubConsumerSeek(pubSubTopicPartition, this.pubSubConsumer.endOffset(pubSubTopicPartition));
        });
    }

    private PubSubTopic getCurrentServingVersionTopic() {
        this.storeRepository.refresh();
        return this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(this.storeName, this.storeRepository.getStore(this.storeName).getCurrentVersion()));
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToTail() {
        return seekToTail((Set) this.pubSubConsumer.getAssignment().stream().map(pubSubTopicPartition -> {
            return Integer.valueOf(pubSubTopicPartition.getPartitionNumber());
        }).collect(Collectors.toSet()));
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> set) {
        return CompletableFuture.supplyAsync(() -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                VeniceChangeCoordinate veniceChangeCoordinate = (VeniceChangeCoordinate) it.next();
                PubSubTopic topic = this.pubSubTopicRepository.getTopic(veniceChangeCoordinate.getTopic());
                PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(topic, veniceChangeCoordinate.getPartition().intValue());
                internalSeek(Collections.singleton(veniceChangeCoordinate.getPartition()), topic, pubSubTopicPartition -> {
                    pubSubConsumerSeek(pubSubTopicPartitionImpl, Long.valueOf(veniceChangeCoordinate.getPosition().getOffset()));
                }).join();
            }
            return null;
        });
    }

    private void pubSubConsumerSeek(PubSubTopicPartition pubSubTopicPartition, Long l) {
        if (l.longValue() == -1) {
            this.pubSubConsumer.subscribe(pubSubTopicPartition, -1L);
        } else {
            this.pubSubConsumer.subscribe(pubSubTopicPartition, l.longValue() - 1);
        }
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> subscribeAll() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.partitionCount; i++) {
            hashSet.add(Integer.valueOf(i));
        }
        return subscribe(hashSet);
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> map) {
        this.storeRepository.refresh();
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeKafkaTopic(this.storeName, this.storeRepository.getStore(this.storeName).getCurrentVersion()) + "_cc");
        HashMap hashMap = new HashMap();
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            hashMap.put(new PubSubTopicPartitionImpl(topic, entry.getKey().intValue()), entry.getValue());
        }
        return internalSeek(map.keySet(), topic, pubSubTopicPartition -> {
            Long offsetForTime = this.pubSubConsumer.offsetForTime(pubSubTopicPartition, ((Long) hashMap.get(pubSubTopicPartition)).longValue());
            if (offsetForTime == null) {
                offsetForTime = this.pubSubConsumer.endOffset(pubSubTopicPartition);
            }
            pubSubConsumerSeek(pubSubTopicPartition, offsetForTime);
        });
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> seekToTimestamp(Long l) {
        HashSet hashSet = new HashSet(this.pubSubConsumer.getAssignment());
        HashMap hashMap = new HashMap();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            hashMap.put(Integer.valueOf(((PubSubTopicPartition) it.next()).getPartitionNumber()), l);
        }
        return seekToTimestamps(hashMap);
    }

    public CompletableFuture<Void> internalSeek(Set<Integer> set, PubSubTopic pubSubTopic, SeekFunction seekFunction) {
        return CompletableFuture.supplyAsync(() -> {
            synchronized (this.pubSubConsumer) {
                for (PubSubTopicPartition pubSubTopicPartition : this.pubSubConsumer.getAssignment()) {
                    this.currentVersionHighWatermarks.remove(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()));
                    if (set.contains(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()))) {
                        this.pubSubConsumer.unSubscribe(pubSubTopicPartition);
                    }
                }
                for (PubSubTopicPartition pubSubTopicPartition2 : getPartitionListToSubscribe(set, Collections.EMPTY_SET, pubSubTopic)) {
                    if (!pubSubTopicPartition2.getPubSubTopic().getName().endsWith("_cc")) {
                        this.compressorMap.put(Integer.valueOf(pubSubTopicPartition2.getPartitionNumber()), getVersionCompressor(pubSubTopicPartition2));
                    }
                    seekFunction.apply(pubSubTopicPartition2);
                }
            }
            return null;
        });
    }

    private List<PubSubTopicPartition> getPartitionListToSubscribe(Set<Integer> set, Set<PubSubTopicPartition> set2, PubSubTopic pubSubTopic) {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(pubSubTopic, it.next().intValue());
            if (!set2.contains(pubSubTopicPartitionImpl)) {
                arrayList.add(pubSubTopicPartitionImpl);
            }
        }
        arrayList.addAll(set2);
        return arrayList;
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void unsubscribe(Set<Integer> set) {
        synchronized (this.pubSubConsumer) {
            HashSet<PubSubTopicPartition> hashSet = new HashSet(this.pubSubConsumer.getAssignment());
            HashSet hashSet2 = new HashSet();
            for (PubSubTopicPartition pubSubTopicPartition : hashSet) {
                if (set.contains(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()))) {
                    hashSet2.add(pubSubTopicPartition);
                }
            }
            this.pubSubConsumer.batchUnsubscribe(hashSet2);
        }
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void unsubscribeAll() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.partitionCount; i++) {
            hashSet.add(Integer.valueOf(i));
        }
        unsubscribe(hashSet);
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll(long j) {
        return internalPoll(j, "_cc");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(long j, String str) {
        Map poll;
        ArrayList arrayList = new ArrayList();
        synchronized (this.pubSubConsumer) {
            poll = this.pubSubConsumer.poll(j);
        }
        for (Map.Entry<K, V> entry : poll.entrySet()) {
            PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) entry.getKey();
            for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage : (List) entry.getValue()) {
                if (((KafkaKey) pubSubMessage.getKey()).isControlMessage()) {
                    if (handleControlMessage((ControlMessage) ((KafkaMessageEnvelope) pubSubMessage.getValue()).getPayloadUnion(), pubSubTopicPartition, str)) {
                        break;
                    }
                } else {
                    Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventMessage = convertPubSubMessageToPubSubChangeEventMessage(pubSubMessage, pubSubTopicPartition);
                    Objects.requireNonNull(arrayList);
                    convertPubSubMessageToPubSubChangeEventMessage.ifPresent((v1) -> {
                        r1.add(v1);
                    });
                }
            }
        }
        return arrayList;
    }

    protected boolean handleControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String str) {
        ControlMessageType valueOf = ControlMessageType.valueOf(controlMessage);
        if (valueOf.equals(ControlMessageType.END_OF_PUSH)) {
            LOGGER.info("End of Push message received for version {} for store {}", Integer.valueOf(Version.parseVersionFromKafkaTopicName(pubSubTopicPartition.getPubSubTopic().getName())), this.storeName);
            switchToNewTopic(pubSubTopicPartition.getPubSubTopic(), "_cc", Integer.valueOf(pubSubTopicPartition.getPartitionNumber()));
            return true;
        }
        if (valueOf.equals(ControlMessageType.VERSION_SWAP)) {
            return handleVersionSwapControlMessage(controlMessage, pubSubTopicPartition, "_cc");
        }
        return false;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <T> T bufferAndAssembleRecordChangeEvent(PubSubTopicPartition pubSubTopicPartition, int i, byte[] bArr, ByteBuffer byteBuffer, long j, AbstractAvroChunkingAdapter<T> abstractAvroChunkingAdapter, Lazy<RecordDeserializer<T>> lazy, int i2, ReadOnlySchemaRepository readOnlySchemaRepository) {
        T t = null;
        VeniceCompressor veniceCompressor = pubSubTopicPartition.getPubSubTopic().isVersionTopic() ? this.compressorMap.get(Integer.valueOf(pubSubTopicPartition.getPartitionNumber())) : NO_OP_COMPRESSOR;
        if (!this.inMemoryStorageEngine.containsPartition(pubSubTopicPartition.getPartitionNumber())) {
            this.inMemoryStorageEngine.addStoragePartition(pubSubTopicPartition.getPartitionNumber());
        }
        if (i == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) {
            this.inMemoryStorageEngine.put(pubSubTopicPartition.getPartitionNumber(), bArr, ValueRecord.create(i, byteBuffer.array()).serialize());
            return null;
        }
        if (i == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
            this.inMemoryStorageEngine.put(pubSubTopicPartition.getPartitionNumber(), bArr, ValueRecord.create(i, byteBuffer.array()).serialize());
            try {
                t = abstractAvroChunkingAdapter.get((AbstractStorageEngine) this.inMemoryStorageEngine, i2, pubSubTopicPartition.getPartitionNumber(), ByteBuffer.wrap(bArr), false, (boolean) null, (BinaryDecoder) null, (ReadResponse) null, veniceCompressor.getCompressionStrategy(), true, readOnlySchemaRepository, this.storeName, veniceCompressor);
            } catch (Exception e) {
                LOGGER.warn("Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}", Long.valueOf(j), pubSubTopicPartition.getPubSubTopic().getName());
            }
        } else {
            try {
                t = ((RecordDeserializer) lazy.get()).deserialize(veniceCompressor.decompress(byteBuffer));
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
        this.inMemoryStorageEngine.dropPartition(pubSubTopicPartition.getPartitionNumber());
        return t;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventMessage(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, PubSubTopicPartition pubSubTopicPartition) {
        Lazy of;
        AbstractAvroChunkingAdapter<RecordChangeEvent> abstractAvroChunkingAdapter;
        int intValue;
        ReadOnlySchemaRepository readOnlySchemaRepository;
        List<Long> extractOffsetVectorFromMessage;
        Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> empty = Optional.empty();
        byte[] key = ((KafkaKey) pubSubMessage.getKey()).getKey();
        if (MessageType.valueOf((KafkaMessageEnvelope) pubSubMessage.getValue()).equals(MessageType.PUT)) {
            Put put = (Put) ((KafkaMessageEnvelope) pubSubMessage.getValue()).payloadUnion;
            if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) {
                Schema valueSchema = this.schemaReader.getValueSchema(put.schemaId);
                of = Lazy.of(() -> {
                    return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(valueSchema, valueSchema);
                });
                abstractAvroChunkingAdapter = this.userEventChunkingAdapter;
                intValue = AvroProtocolDefinition.RECORD_CHANGE_EVENT.getCurrentProtocolVersion();
                readOnlySchemaRepository = this.storeRepository;
            } else {
                of = Lazy.of(() -> {
                    return this.recordChangeDeserializer;
                });
                abstractAvroChunkingAdapter = this.recordChangeEventChunkingAdapter;
                intValue = this.schemaReader.getLatestValueSchemaId().intValue();
                readOnlySchemaRepository = this.recordChangeEventSchemaRepository;
            }
            Object bufferAndAssembleRecordChangeEvent = bufferAndAssembleRecordChangeEvent(pubSubTopicPartition, put.getSchemaId(), key, put.getPutValue(), ((Long) pubSubMessage.getOffset()).longValue(), abstractAvroChunkingAdapter, of, intValue, readOnlySchemaRepository);
            if (bufferAndAssembleRecordChangeEvent == null) {
                return Optional.empty();
            }
            int payloadSize = pubSubMessage.getPayloadSize();
            if (bufferAndAssembleRecordChangeEvent instanceof RecordChangeEvent) {
                RecordChangeEvent recordChangeEvent = (RecordChangeEvent) bufferAndAssembleRecordChangeEvent;
                extractOffsetVectorFromMessage = recordChangeEvent.replicationCheckpointVector;
                empty = Optional.of(convertChangeEventToPubSubMessage(recordChangeEvent, this.keyDeserializer.deserialize(key), pubSubTopicPartition, (Long) pubSubMessage.getOffset(), Long.valueOf(pubSubMessage.getPubSubMessageTime()), payloadSize));
            } else {
                extractOffsetVectorFromMessage = extractOffsetVectorFromMessage(put.getReplicationMetadataVersionId(), put.getReplicationMetadataPayload());
                empty = Optional.of(new ImmutableChangeCapturePubSubMessage(this.keyDeserializer.deserialize(key), new ChangeEvent(null, bufferAndAssembleRecordChangeEvent), pubSubTopicPartition, ((Long) pubSubMessage.getOffset()).longValue(), pubSubMessage.getPubSubMessageTime(), payloadSize));
            }
            if (filterRecordByVersionSwapHighWatermarks(extractOffsetVectorFromMessage, pubSubTopicPartition)) {
                empty = Optional.empty();
            }
        }
        return empty;
    }

    protected List<Long> extractOffsetVectorFromMessage(int i, ByteBuffer byteBuffer) {
        if (i <= 0) {
            return new ArrayList();
        }
        GenericData.Array array = (GenericData.Array) ((GenericRecord) SerializerDeserializerFactory.getAvroGenericDeserializer(Schema.parse(this.replicationMetadataSchemaRepository.getReplicationMetadataSchemaById(this.storeName, i).getSchemaStr())).deserialize(byteBuffer)).get("replication_checkpoint_vector");
        ArrayList arrayList = new ArrayList();
        Iterator it = array.iterator();
        while (it.hasNext()) {
            arrayList.add((Long) it.next());
        }
        return arrayList;
    }

    protected boolean handleVersionSwapControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String str) {
        if (!ControlMessageType.valueOf(controlMessage).equals(ControlMessageType.VERSION_SWAP)) {
            return false;
        }
        VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion;
        LOGGER.info("Obtain version swap message: {} and versions swap high watermarks: {}", versionSwap, versionSwap.getLocalHighWatermarks());
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(versionSwap.newServingVersionTopic.toString());
        if (RmdUtils.hasOffsetAdvanced(this.currentVersionHighWatermarks.getOrDefault(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()), Collections.EMPTY_LIST), versionSwap.getLocalHighWatermarks())) {
            this.currentVersionHighWatermarks.put(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()), versionSwap.getLocalHighWatermarks());
        }
        switchToNewTopic(topic, str, Integer.valueOf(pubSubTopicPartition.getPartitionNumber()));
        this.inMemoryStorageEngine.drop();
        return true;
    }

    private PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> convertChangeEventToPubSubMessage(RecordChangeEvent recordChangeEvent, K k, PubSubTopicPartition pubSubTopicPartition, Long l, Long l2, int i) {
        V v = null;
        if (recordChangeEvent.currentValue != null && recordChangeEvent.currentValue.getSchemaId() > 0) {
            this.currentValuePayloadSize[pubSubTopicPartition.getPartitionNumber()] = recordChangeEvent.currentValue.getValue().array().length;
            v = deserializeValueFromBytes(recordChangeEvent.currentValue.getValue(), recordChangeEvent.currentValue.getSchemaId());
        }
        V v2 = null;
        if (recordChangeEvent.previousValue != null && recordChangeEvent.previousValue.getSchemaId() > 0) {
            v2 = deserializeValueFromBytes(recordChangeEvent.previousValue.getValue(), recordChangeEvent.previousValue.getSchemaId());
        }
        return new ImmutableChangeCapturePubSubMessage(k, new ChangeEvent(v2, v), pubSubTopicPartition, l.longValue(), l2.longValue(), i);
    }

    private V deserializeValueFromBytes(ByteBuffer byteBuffer, int i) {
        Schema valueSchema = this.schemaReader.getValueSchema(i);
        RecordDeserializer fastAvroGenericDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(valueSchema, valueSchema);
        if (byteBuffer != null) {
            return (V) fastAvroGenericDeserializer.deserialize(byteBuffer);
        }
        return null;
    }

    private boolean filterRecordByVersionSwapHighWatermarks(List<Long> list, PubSubTopicPartition pubSubTopicPartition) {
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        return (list == null || !this.currentVersionHighWatermarks.containsKey(Integer.valueOf(partitionNumber)) || RmdUtils.hasOffsetAdvanced(this.currentVersionHighWatermarks.getOrDefault(Integer.valueOf(partitionNumber), Collections.EMPTY_LIST), list)) ? false : true;
    }

    protected void switchToNewTopic(PubSubTopic pubSubTopic, String str, Integer num) {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(pubSubTopic.getName() + str);
        Set<Integer> singleton = Collections.singleton(num);
        unsubscribe(singleton);
        try {
            internalSubscribe(singleton, topic).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new VeniceException("Subscribe to new topic:" + topic + " is not successful, error: " + e);
        }
    }

    protected PubSubTopicPartition getPubSubTopicPartitionFromConsumerRecord(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord) {
        return new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(consumerRecord.topic()), consumerRecord.partition());
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void close() {
        unsubscribeAll();
        this.pubSubConsumer.close();
    }

    protected void setStoreRepository(ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository) {
        this.storeRepository = thinClientMetaStoreBasedRepository;
    }
}
