package com.linkedin.davinci.consumer;

import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
import com.linkedin.davinci.storage.chunking.AbstractAvroChunkingAdapter;
import com.linkedin.davinci.storage.chunking.GenericChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SpecificRecordChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.memory.InMemoryStorageEngine;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.StartOfPush;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.rmd.RmdConstants;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.ChangeCaptureView;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.class */
public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsumer<K, V> {
    private final int partitionCount;
    protected ThinClientMetaStoreBasedRepository readOnlySchemaRepository;
    protected final ReadOnlySchemaRepository recordChangeEventSchemaRepository;
    protected final AbstractAvroChunkingAdapter<V> userEventChunkingAdapter;
    protected final SchemaReader schemaReader;
    private final String viewClassName;
    private final Set<Integer> subscribedPartitions;
    protected final RecordDeserializer<K> keyDeserializer;
    private final D2ControllerClient d2ControllerClient;
    protected final ReplicationMetadataSchemaRepository replicationMetadataSchemaRepository;
    protected final String storeName;
    protected final int storeCurrentVersion;
    protected final boolean storeChunkingEnabled;
    protected final InMemoryStorageEngine inMemoryStorageEngine;
    protected final Consumer<KafkaKey, KafkaMessageEnvelope> kafkaConsumer;
    protected String currentTopic;
    protected final int[] currentValuePayloadSize;
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) VeniceChangelogConsumerImpl.class);
    protected static final VeniceCompressor NO_OP_COMPRESSOR = new NoopCompressor();
    protected VeniceCompressor currentCompressor = NO_OP_COMPRESSOR;
    protected final CompressorFactory compressorFactory = new CompressorFactory();
    protected final AbstractAvroChunkingAdapter<RecordChangeEvent> recordChangeEventChunkingAdapter = new SpecificRecordChunkingAdapter(RecordChangeEvent.class);
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private final RecordDeserializer<RecordChangeEvent> recordChangeDeserializer = FastSerializerDeserializerFactory.getFastAvroSpecificDeserializer(AvroProtocolDefinition.RECORD_CHANGE_EVENT.getCurrentProtocolVersionSchema(), RecordChangeEvent.class);
    protected Map<Integer, List<Long>> currentVersionTempHighWatermarks = new HashMap();
    protected final Map<Integer, List<Long>> currentVersionHighWatermarks = new HashMap();

    public VeniceChangelogConsumerImpl(ChangelogClientConfig changelogClientConfig, Consumer<KafkaKey, KafkaMessageEnvelope> consumer) {
        this.kafkaConsumer = consumer;
        this.storeName = changelogClientConfig.getStoreName();
        this.d2ControllerClient = changelogClientConfig.getD2ControllerClient();
        StoreResponse store = changelogClientConfig.getD2ControllerClient().getStore(this.storeName);
        if (store.isError()) {
            throw new VeniceException("Failed to get store info for store: " + this.storeName + " with error: " + store.getError());
        }
        StoreInfo store2 = store.getStore();
        this.storeCurrentVersion = store2.getCurrentVersion();
        this.partitionCount = store2.getPartitionCount();
        this.storeChunkingEnabled = store2.isChunkingEnabled();
        this.currentValuePayloadSize = new int[this.partitionCount];
        this.viewClassName = changelogClientConfig.getViewName();
        this.currentTopic = Version.composeKafkaTopic(this.storeName, this.storeCurrentVersion);
        this.replicationMetadataSchemaRepository = new ReplicationMetadataSchemaRepository(this.d2ControllerClient);
        this.schemaReader = changelogClientConfig.getSchemaReader();
        this.subscribedPartitions = new HashSet();
        Schema keySchema = this.schemaReader.getKeySchema();
        this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
        this.inMemoryStorageEngine = new InMemoryStorageEngine(this.storeName);
        this.readOnlySchemaRepository = new ThinClientMetaStoreBasedRepository(changelogClientConfig.getInnerClientConfig(), new VeniceProperties(), null);
        this.recordChangeEventSchemaRepository = new RecordChangeEventReadOnlySchemaRepository(this.readOnlySchemaRepository);
        Class specificValueClass = changelogClientConfig.getInnerClientConfig().getSpecificValueClass();
        if (specificValueClass != null) {
            this.userEventChunkingAdapter = new SpecificRecordChunkingAdapter(specificValueClass);
        } else {
            this.userEventChunkingAdapter = GenericChunkingAdapter.INSTANCE;
        }
        LOGGER.info("Start a change log consumer client for store: {}, current version: {}, with partition count: {} and view class: {} ", this.storeName, Integer.valueOf(this.storeCurrentVersion), Integer.valueOf(this.partitionCount), this.viewClassName);
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> subscribe(Set<Integer> set) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                this.readOnlySchemaRepository.start();
                this.readOnlySchemaRepository.subscribe(this.storeName);
                this.readOnlySchemaRepository.refresh();
                List<TopicPartition> partitionListToSubscribe = getPartitionListToSubscribe(set, this.kafkaConsumer.assignment());
                this.kafkaConsumer.assign(partitionListToSubscribe);
                this.kafkaConsumer.seekToBeginning(partitionListToSubscribe);
                this.subscribedPartitions.addAll(set);
                return null;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public CompletableFuture<Void> subscribeAll() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.partitionCount; i++) {
            hashSet.add(Integer.valueOf(i));
        }
        return subscribe(hashSet);
    }

    private List<TopicPartition> getPartitionListToSubscribe(Set<Integer> set, Set<TopicPartition> set2) {
        ArrayList arrayList = new ArrayList(set2.size());
        Iterator<Integer> it2 = set.iterator();
        while (it2.hasNext()) {
            TopicPartition topicPartition = new TopicPartition(this.currentTopic, it2.next().intValue());
            if (!set2.contains(topicPartition)) {
                arrayList.add(topicPartition);
            }
        }
        return arrayList;
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void unsubscribe(Set<Integer> set) {
        Iterator<Integer> it2 = set.iterator();
        while (it2.hasNext()) {
            TopicPartition topicPartition = new TopicPartition(this.currentTopic, it2.next().intValue());
            Set<TopicPartition> assignment = this.kafkaConsumer.assignment();
            if (assignment.contains(topicPartition)) {
                ArrayList arrayList = new ArrayList(assignment);
                if (arrayList.remove(topicPartition)) {
                    this.kafkaConsumer.assign(arrayList);
                }
            }
            this.subscribedPartitions.remove(Integer.valueOf(topicPartition.partition()));
        }
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void unsubscribeAll() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.partitionCount; i++) {
            hashSet.add(Integer.valueOf(i));
        }
        unsubscribe(hashSet);
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public Collection<PubSubMessage<K, ChangeEvent<V>, Long>> poll(long j) {
        return internalPoll(j, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<PubSubMessage<K, ChangeEvent<V>, Long>> internalPoll(long j, String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<ConsumerRecord<KafkaKey, KafkaMessageEnvelope>> it2 = this.kafkaConsumer.poll(j).iterator();
        while (it2.hasNext()) {
            ConsumerRecord<KafkaKey, KafkaMessageEnvelope> next = it2.next();
            PubSubTopicPartition pubSubTopicPartitionFromConsumerRecord = getPubSubTopicPartitionFromConsumerRecord(next);
            if (!next.key().isControlMessage()) {
                Optional<PubSubMessage<K, ChangeEvent<V>, Long>> convertConsumerRecordToPubSubChangeEventMessage = convertConsumerRecordToPubSubChangeEventMessage(next, pubSubTopicPartitionFromConsumerRecord);
                Objects.requireNonNull(arrayList);
                convertConsumerRecordToPubSubChangeEventMessage.ifPresent((v1) -> {
                    r1.add(v1);
                });
            } else if (handleControlMessage((ControlMessage) next.value().payloadUnion, pubSubTopicPartitionFromConsumerRecord, str)) {
                return arrayList;
            }
        }
        return arrayList;
    }

    protected boolean handleControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String str) {
        ControlMessageType valueOf = ControlMessageType.valueOf(controlMessage);
        if (valueOf.equals(ControlMessageType.START_OF_PUSH)) {
            StartOfPush startOfPush = (StartOfPush) controlMessage.controlMessageUnion;
            byte[] bArr = null;
            if (startOfPush.compressionDictionary != null) {
                bArr = startOfPush.compressionDictionary.array();
            }
            this.currentCompressor = this.compressorFactory.createVersionSpecificCompressorIfNotExist(CompressionStrategy.valueOf(startOfPush.compressionStrategy), pubSubTopicPartition.getPubSubTopic().getName(), bArr);
        }
        if (!valueOf.equals(ControlMessageType.END_OF_PUSH)) {
            if (valueOf.equals(ControlMessageType.VERSION_SWAP)) {
                return handleVersionSwapControlMessage(controlMessage, pubSubTopicPartition, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
            }
            return false;
        }
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        LOGGER.info("Obtain End of Push message and current local high watermarks: {}", this.currentVersionTempHighWatermarks.get(Integer.valueOf(partitionNumber)));
        if (this.currentVersionTempHighWatermarks.containsKey(Integer.valueOf(partitionNumber))) {
            this.currentVersionHighWatermarks.put(Integer.valueOf(partitionNumber), this.currentVersionTempHighWatermarks.get(Integer.valueOf(partitionNumber)));
        }
        switchToNewTopic(this.currentTopic, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);
        return true;
    }

    protected <T> T bufferAndAssembleRecordChangeEvent(PubSubTopicPartition pubSubTopicPartition, int i, byte[] bArr, ByteBuffer byteBuffer, long j, AbstractAvroChunkingAdapter<T> abstractAvroChunkingAdapter, Lazy<RecordDeserializer<T>> lazy, int i2, ReadOnlySchemaRepository readOnlySchemaRepository) {
        T t = null;
        VeniceCompressor veniceCompressor = pubSubTopicPartition.getPubSubTopic().isVersionTopic() ? this.currentCompressor : NO_OP_COMPRESSOR;
        if (this.storeChunkingEnabled) {
            if (!this.inMemoryStorageEngine.containsPartition(pubSubTopicPartition.getPartitionNumber())) {
                this.inMemoryStorageEngine.addStoragePartition(pubSubTopicPartition.getPartitionNumber());
            }
            if (i == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()) {
                this.inMemoryStorageEngine.put(pubSubTopicPartition.getPartitionNumber(), bArr, ValueRecord.create(i, byteBuffer.array()).serialize());
                return null;
            }
            if (i == AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()) {
                this.inMemoryStorageEngine.put(pubSubTopicPartition.getPartitionNumber(), bArr, ValueRecord.create(i, byteBuffer.array()).serialize());
                try {
                    t = abstractAvroChunkingAdapter.get((AbstractStorageEngine) this.inMemoryStorageEngine, i2, pubSubTopicPartition.getPartitionNumber(), ByteBuffer.wrap(bArr), false, (boolean) null, (BinaryDecoder) null, (ReadResponse) null, veniceCompressor.getCompressionStrategy(), true, readOnlySchemaRepository, this.storeName, veniceCompressor);
                } catch (Exception e) {
                    LOGGER.warn("Encountered error assembling chunked record, this can happen when seeking between chunked records. Skipping offset {} on topic {}", Long.valueOf(j), pubSubTopicPartition.getPubSubTopic().getName());
                }
            } else {
                try {
                    t = lazy.get().deserialize(veniceCompressor.decompress(byteBuffer));
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
            this.inMemoryStorageEngine.dropPartition(pubSubTopicPartition.getPartitionNumber());
        } else {
            try {
                t = lazy.get().deserialize(veniceCompressor.decompress(byteBuffer));
            } catch (Exception e3) {
                throw new RuntimeException(e3);
            }
        }
        return t;
    }

    protected Optional<PubSubMessage<K, ChangeEvent<V>, Long>> convertConsumerRecordToPubSubChangeEventMessage(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord, PubSubTopicPartition pubSubTopicPartition) {
        Lazy of;
        AbstractAvroChunkingAdapter<RecordChangeEvent> abstractAvroChunkingAdapter;
        int intValue;
        ReadOnlySchemaRepository readOnlySchemaRepository;
        List<Long> extractOffsetVectorFromMessage;
        Optional<PubSubMessage<K, ChangeEvent<V>, Long>> empty = Optional.empty();
        byte[] key = consumerRecord.key().getKey();
        if (MessageType.valueOf(consumerRecord.value()).equals(MessageType.PUT)) {
            Put put = (Put) consumerRecord.value().payloadUnion;
            if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) {
                Schema valueSchema = this.schemaReader.getValueSchema(put.schemaId);
                of = Lazy.of(() -> {
                    return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(valueSchema, valueSchema);
                });
                abstractAvroChunkingAdapter = this.userEventChunkingAdapter;
                intValue = AvroProtocolDefinition.RECORD_CHANGE_EVENT.getCurrentProtocolVersion();
                readOnlySchemaRepository = this.readOnlySchemaRepository;
            } else {
                of = Lazy.of(() -> {
                    return this.recordChangeDeserializer;
                });
                abstractAvroChunkingAdapter = this.recordChangeEventChunkingAdapter;
                intValue = this.schemaReader.getLatestValueSchemaId().intValue();
                readOnlySchemaRepository = this.recordChangeEventSchemaRepository;
            }
            Object bufferAndAssembleRecordChangeEvent = bufferAndAssembleRecordChangeEvent(pubSubTopicPartition, put.getSchemaId(), key, put.getPutValue(), consumerRecord.offset(), abstractAvroChunkingAdapter, of, intValue, readOnlySchemaRepository);
            if (bufferAndAssembleRecordChangeEvent == null) {
                return Optional.empty();
            }
            int serializedKeySize = consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize();
            if (bufferAndAssembleRecordChangeEvent instanceof RecordChangeEvent) {
                RecordChangeEvent recordChangeEvent = (RecordChangeEvent) bufferAndAssembleRecordChangeEvent;
                extractOffsetVectorFromMessage = recordChangeEvent.replicationCheckpointVector;
                empty = Optional.of(convertChangeEventToPubSubMessage(recordChangeEvent, this.keyDeserializer.deserialize(key), pubSubTopicPartition, Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), serializedKeySize));
            } else {
                extractOffsetVectorFromMessage = extractOffsetVectorFromMessage(put.getReplicationMetadataVersionId(), put.getReplicationMetadataPayload());
                empty = Optional.of(new ImmutablePubSubMessage(this.keyDeserializer.deserialize(key), new ChangeEvent(null, bufferAndAssembleRecordChangeEvent), pubSubTopicPartition, consumerRecord.offset(), consumerRecord.timestamp(), serializedKeySize));
            }
            if (filterRecordByVersionSwapHighWatermarks(extractOffsetVectorFromMessage, pubSubTopicPartition)) {
                empty = Optional.empty();
            }
        }
        return empty;
    }

    protected List<Long> extractOffsetVectorFromMessage(int i, ByteBuffer byteBuffer) {
        if (i <= 0) {
            return new ArrayList();
        }
        GenericData.Array array = (GenericData.Array) ((GenericRecord) SerializerDeserializerFactory.getAvroGenericDeserializer(Schema.parse(this.replicationMetadataSchemaRepository.getReplicationMetadataSchemaById(this.storeName, i).getSchemaStr())).deserialize(byteBuffer)).get(RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD);
        ArrayList arrayList = new ArrayList();
        Iterator it2 = array.iterator();
        while (it2.hasNext()) {
            arrayList.add((Long) it2.next());
        }
        return arrayList;
    }

    protected boolean handleVersionSwapControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String str) {
        if (!ControlMessageType.valueOf(controlMessage).equals(ControlMessageType.VERSION_SWAP)) {
            return false;
        }
        VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion;
        LOGGER.info("Obtain version swap message: {} and versions swap high watermarks: {}", versionSwap, versionSwap.getLocalHighWatermarks());
        String charSequence = versionSwap.newServingVersionTopic.toString();
        this.currentVersionHighWatermarks.put(Integer.valueOf(pubSubTopicPartition.getPartitionNumber()), versionSwap.getLocalHighWatermarks());
        switchToNewTopic(charSequence, str);
        this.inMemoryStorageEngine.drop();
        return true;
    }

    private PubSubMessage<K, ChangeEvent<V>, Long> convertChangeEventToPubSubMessage(RecordChangeEvent recordChangeEvent, K k, PubSubTopicPartition pubSubTopicPartition, Long l, Long l2, int i) {
        V v = null;
        if (recordChangeEvent.currentValue != null && recordChangeEvent.currentValue.getSchemaId() > 0) {
            this.currentValuePayloadSize[pubSubTopicPartition.getPartitionNumber()] = recordChangeEvent.currentValue.getValue().array().length;
            v = deserializeValueFromBytes(recordChangeEvent.currentValue.getValue(), recordChangeEvent.currentValue.getSchemaId());
        }
        V v2 = null;
        if (recordChangeEvent.previousValue != null && recordChangeEvent.previousValue.getSchemaId() > 0) {
            v2 = deserializeValueFromBytes(recordChangeEvent.previousValue.getValue(), recordChangeEvent.previousValue.getSchemaId());
        }
        return new ImmutablePubSubMessage(k, new ChangeEvent(v2, v), pubSubTopicPartition, l.longValue(), l2.longValue(), i);
    }

    private V deserializeValueFromBytes(ByteBuffer byteBuffer, int i) {
        Schema valueSchema = this.schemaReader.getValueSchema(i);
        RecordDeserializer fastAvroGenericDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(valueSchema, valueSchema);
        if (byteBuffer != null) {
            return (V) fastAvroGenericDeserializer.deserialize(byteBuffer);
        }
        return null;
    }

    private boolean filterRecordByVersionSwapHighWatermarks(List<Long> list, PubSubTopicPartition pubSubTopicPartition) {
        int partitionNumber = pubSubTopicPartition.getPartitionNumber();
        if (list == null || !this.currentVersionHighWatermarks.containsKey(Integer.valueOf(partitionNumber))) {
            return false;
        }
        List<Long> list2 = this.currentVersionHighWatermarks.get(Integer.valueOf(partitionNumber));
        if (list.size() > list2.size()) {
            return false;
        }
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i).longValue() > list2.get(i).longValue()) {
                return false;
            }
        }
        return true;
    }

    protected void switchToNewTopic(String str, String str2) {
        String str3 = str + str2;
        HashSet hashSet = new HashSet(this.subscribedPartitions);
        unsubscribe(this.subscribedPartitions);
        this.currentTopic = str3;
        try {
            subscribe(hashSet).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new VeniceException("Subscribe to new topic:" + str3 + " is not successful, error: " + e);
        }
    }

    protected PubSubTopicPartition getPubSubTopicPartitionFromConsumerRecord(ConsumerRecord<KafkaKey, KafkaMessageEnvelope> consumerRecord) {
        return new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(consumerRecord.topic()), consumerRecord.partition());
    }

    @Override // com.linkedin.davinci.consumer.VeniceChangelogConsumer
    public void close() {
        unsubscribeAll();
        this.kafkaConsumer.close();
    }

    protected void setReadOnlySchemaRepository(ThinClientMetaStoreBasedRepository thinClientMetaStoreBasedRepository) {
        this.readOnlySchemaRepository = thinClientMetaStoreBasedRepository;
    }
}
