package com.linkedin.venice;

import com.linkedin.alpini.io.RateLimitedStream;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.client.store.QueryTool;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/TopicMessageFinder.class */
public class TopicMessageFinder {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) TopicMessageFinder.class);

    public static void find(ControllerClient controllerClient, Properties properties, String str, String str2, long j, long j2, long j3) {
        String parseStoreFromRealTimeTopic;
        int i = -1;
        if (Version.isVersionTopic(str)) {
            parseStoreFromRealTimeTopic = Version.parseStoreFromKafkaTopicName(str);
            i = Version.parseVersionFromKafkaTopicName(str);
        } else {
            parseStoreFromRealTimeTopic = Version.parseStoreFromRealTimeTopic(str);
        }
        String schemaStr = controllerClient.getKeySchema(parseStoreFromRealTimeTopic).getSchemaStr();
        LOGGER.info("The key schema for store: {} : {}", parseStoreFromRealTimeTopic, schemaStr);
        StoreInfo store = controllerClient.getStore(parseStoreFromRealTimeTopic).getStore();
        int partitionCount = store.getPartitionCount();
        byte[] serializeKey = serializeKey(str2, schemaStr);
        if (i != -1) {
            if (!store.getVersion(i).isPresent()) {
                throw new RuntimeException("Couldn't find version: " + i + " from store: " + parseStoreFromRealTimeTopic);
            }
            if (store.getVersion(i).get().isChunkingEnabled()) {
                serializeKey = new KeyWithChunkingSuffixSerializer().serializeNonChunkedKey(serializeKey);
            }
            partitionCount = store.getVersion(i).get().getPartitionCount();
        }
        if (partitionCount == 0) {
            throw new VeniceException("Invalid partition count: " + partitionCount);
        }
        LOGGER.info("Got partition count: {}", Integer.valueOf(partitionCount));
        int partitionId = new DefaultVenicePartitioner().getPartitionId(serializeKey, partitionCount);
        LOGGER.info("Assigned partition: {} for key: {}", Integer.valueOf(partitionId), str2);
        TopicPartition topicPartition = new TopicPartition(str, partitionId);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            OffsetAndTimestamp offsetAndTimestamp = kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(j))).get(topicPartition);
            long offset = offsetAndTimestamp == null ? -1L : offsetAndTimestamp.offset();
            OffsetAndTimestamp offsetAndTimestamp2 = kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(j2))).get(topicPartition);
            long offset2 = offsetAndTimestamp2 != null ? offsetAndTimestamp2.offset() : kafkaConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition).longValue();
            kafkaConsumer.close();
            LOGGER.info("Got start offset: {} and end offset: {} for the specified time range", Long.valueOf(offset), Long.valueOf(offset2));
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            consume(new ApacheKafkaConsumerAdapter(properties, new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new))), new PubSubTopicPartitionImpl(new PubSubTopicRepository().getTopic(str), partitionId), offset, offset2, j3, serializeKey);
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    static void consume(PubSubConsumerAdapter pubSubConsumerAdapter, PubSubTopicPartition pubSubTopicPartition, long j, long j2, long j3, byte[] bArr) {
        try {
            long j4 = 0;
            long j5 = 0;
            pubSubConsumerAdapter.subscribe(pubSubTopicPartition, j);
            boolean z = false;
            while (!z) {
                Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> poll = pubSubConsumerAdapter.poll(RateLimitedStream.DEFAULT_MAX_QUANTA_PER_SEC);
                if (poll.isEmpty()) {
                    break;
                }
                long j6 = 0;
                Iterator<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> it2 = poll.get(pubSubTopicPartition).iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> next = it2.next();
                    if (next.getOffset().longValue() >= j2) {
                        z = true;
                        break;
                    }
                    if (Arrays.equals(next.getKey().getKey(), bArr)) {
                        LOGGER.info("Offset: {}, Value: {}", next.getOffset(), next.getValue().toString());
                    }
                    j6 = next.getPubSubMessageTime();
                    j4++;
                }
                if (j4 - j5 >= j3) {
                    LOGGER.info("Consumed {} messages from topic partition: {}, and last consumed timestamp: {}", Long.valueOf(j4), pubSubTopicPartition, new Date(j6));
                    j5 = j4;
                }
            }
            if (pubSubConsumerAdapter != null) {
                pubSubConsumerAdapter.close();
            }
        } catch (Throwable th) {
            if (pubSubConsumerAdapter != null) {
                try {
                    pubSubConsumerAdapter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static byte[] serializeKey(String str, String str2) {
        String removeQuotes = QueryTool.removeQuotes(str);
        Schema parse = AvroCompatibilityHelper.parse(str2);
        return SerializerDeserializerFactory.getAvroGenericSerializer(parse).serialize(QueryTool.convertKey(removeQuotes, parse));
    }
}
