package com.linkedin.venice.unit.kafka.consumer.poll;

import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.unit.kafka.InMemoryKafkaMessage;
import com.linkedin.venice.utils.ByteUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/* loaded from: input_file:com/linkedin/venice/unit/kafka/consumer/poll/AbstractPollStrategy.class */
public abstract class AbstractPollStrategy implements PollStrategy {
    private static final int DEFAULT_MAX_MESSAGES_PER_POLL = 3;
    private final int maxMessagePerPoll;
    protected final boolean keepPollingWhenEmpty;

    public AbstractPollStrategy(boolean z) {
        this(z, DEFAULT_MAX_MESSAGES_PER_POLL);
    }

    public AbstractPollStrategy(boolean z, int i) {
        this.keepPollingWhenEmpty = z;
        this.maxMessagePerPoll = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract PubSubTopicPartitionOffset getNextPoll(Map<PubSubTopicPartition, Long> map);

    @Override // com.linkedin.venice.unit.kafka.consumer.poll.PollStrategy
    public synchronized Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> poll(InMemoryKafkaBroker inMemoryKafkaBroker, Map<PubSubTopicPartition, Long> map, long j) {
        HashMap hashMap = new HashMap();
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (i < this.maxMessagePerPoll && System.currentTimeMillis() < currentTimeMillis + j) {
            PubSubTopicPartitionOffset nextPoll = getNextPoll(map);
            if (nextPoll != null) {
                PubSubTopicPartition pubSubTopicPartition = nextPoll.getPubSubTopicPartition();
                long longValue = nextPoll.getOffset().longValue();
                String name = pubSubTopicPartition.getPubSubTopic().getName();
                int partitionNumber = pubSubTopicPartition.getPartitionNumber();
                long j2 = longValue + 1;
                Optional<InMemoryKafkaMessage> consume = inMemoryKafkaBroker.consume(name, partitionNumber, j2);
                if (consume.isPresent()) {
                    if (!AdminTopicUtils.isAdminTopic(name)) {
                        KafkaMessageEnvelope kafkaMessageEnvelope = consume.get().value;
                        if (!consume.get().key.isControlMessage() && MessageType.valueOf(kafkaMessageEnvelope) == MessageType.PUT && !consume.get().isPutValueChanged()) {
                            Put put = (Put) kafkaMessageEnvelope.payloadUnion;
                            put.putValue = ByteUtils.enlargeByteBufferForIntHeader(put.putValue);
                            consume.get().putValueChanged();
                        }
                    }
                    ImmutablePubSubMessage immutablePubSubMessage = new ImmutablePubSubMessage(consume.get().key, consume.get().value, pubSubTopicPartition, j2, System.currentTimeMillis(), -1);
                    if (!hashMap.containsKey(pubSubTopicPartition)) {
                        hashMap.put(pubSubTopicPartition, new ArrayList());
                    }
                    ((List) hashMap.get(pubSubTopicPartition)).add(immutablePubSubMessage);
                    incrementOffset(map, pubSubTopicPartition, longValue);
                    i++;
                } else if (!this.keepPollingWhenEmpty) {
                    map.remove(pubSubTopicPartition);
                }
            } else if (!this.keepPollingWhenEmpty) {
                break;
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void incrementOffset(Map<PubSubTopicPartition, Long> map, PubSubTopicPartition pubSubTopicPartition, long j) {
        map.put(pubSubTopicPartition, Long.valueOf(j + 1));
    }
}
