package com.linkedin.venice.utils;

import com.linkedin.alpini.io.RateLimitedStream;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.StartOfPush;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/utils/DictionaryUtils.class */
public class DictionaryUtils {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) DictionaryUtils.class);

    private static VeniceProperties getKafkaConsumerProps(VeniceProperties veniceProperties) {
        Properties properties = veniceProperties.toProperties();
        properties.put("receive.buffer.bytes", 1048576);
        return new VeniceProperties(properties);
    }

    public static ByteBuffer readDictionaryFromKafka(String str, VeniceProperties veniceProperties) {
        ApacheKafkaConsumerAdapterFactory apacheKafkaConsumerAdapterFactory = new ApacheKafkaConsumerAdapterFactory();
        PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
        PubSubConsumerAdapter create2 = apacheKafkaConsumerAdapterFactory.create2(getKafkaConsumerProps(veniceProperties), false, (PubSubMessageDeserializer) new KafkaPubSubMessageDeserializer(new KafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)), (String) null);
        try {
            ByteBuffer readDictionaryFromKafka = readDictionaryFromKafka(str, create2, pubSubTopicRepository);
            if (create2 != null) {
                create2.close();
            }
            return readDictionaryFromKafka;
        } catch (Throwable th) {
            if (create2 != null) {
                try {
                    create2.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static ByteBuffer readDictionaryFromKafka(String str, PubSubConsumerAdapter pubSubConsumerAdapter, PubSubTopicRepository pubSubTopicRepository) {
        LOGGER.info("Consuming from topic: {} till StartOfPush", str);
        PubSubTopicPartitionImpl pubSubTopicPartitionImpl = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(str), 0);
        pubSubConsumerAdapter.subscribe(pubSubTopicPartitionImpl, 0L);
        boolean z = false;
        ByteBuffer byteBuffer = null;
        while (!z) {
            Iterator<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> it2 = pubSubConsumerAdapter.poll(RateLimitedStream.DEFAULT_MAX_QUANTA_PER_SEC).get(pubSubTopicPartitionImpl).iterator();
            while (true) {
                if (it2.hasNext()) {
                    PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> next = it2.next();
                    KafkaKey key = next.getKey();
                    KafkaMessageEnvelope value = next.getValue();
                    if (!key.isControlMessage()) {
                        LOGGER.error("Consumed non Control Message before Start of Push from topic partition: {}", next.getTopicPartition());
                        return null;
                    }
                    ControlMessage controlMessage = (ControlMessage) value.payloadUnion;
                    ControlMessageType valueOf = ControlMessageType.valueOf(controlMessage);
                    LOGGER.info("Consumed ControlMessage: {} from topic partition: {}", valueOf.name(), next.getTopicPartition());
                    if (valueOf == ControlMessageType.START_OF_PUSH) {
                        z = true;
                        byteBuffer = ((StartOfPush) controlMessage.controlMessageUnion).compressionDictionary;
                        if (byteBuffer == null || !byteBuffer.hasRemaining()) {
                            LOGGER.warn("No dictionary present in Start of Push message from topic partition: {}", next.getTopicPartition());
                            return null;
                        }
                    }
                }
            }
        }
        return byteBuffer;
    }
}
