package org.apache.kafka.server.log.remote.metadata.storage;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/kafka-storage-3.4.0.jar:org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.class */
public class ConsumerManager implements Closeable {
    public static final String COMMITTED_OFFSETS_FILE_NAME = "_rlmm_committed_offsets";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerManager.class);
    private static final long CONSUME_RECHECK_INTERVAL_MS = 50;
    private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
    private final Time time;
    private final ConsumerTask consumerTask;
    private final Thread consumerTaskThread;

    public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig topicBasedRemoteLogMetadataManagerConfig, RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner, Time time) {
        this.rlmmConfig = topicBasedRemoteLogMetadataManagerConfig;
        this.time = time;
        this.consumerTask = new ConsumerTask(new KafkaConsumer(topicBasedRemoteLogMetadataManagerConfig.consumerProperties()), remotePartitionMetadataEventHandler, remoteLogMetadataTopicPartitioner, new File(topicBasedRemoteLogMetadataManagerConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath(), time, 60000L);
        this.consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", this.consumerTask);
    }

    public void startConsumerThread() {
        try {
            this.consumerTaskThread.start();
            log.info("RLMM Consumer task thread is started");
        } catch (Exception e) {
            throw new KafkaException("Error encountered while initializing and scheduling ConsumerTask thread", e);
        }
    }

    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws TimeoutException {
        waitTillConsumptionCatchesUp(recordMetadata, this.rlmmConfig.consumeWaitMs());
    }

    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata, long j) throws TimeoutException {
        int partition = recordMetadata.partition();
        long min = Math.min(50L, j);
        if (!this.consumerTask.isPartitionAssigned(partition)) {
            throw new KafkaException("This consumer is not subscribed to the target partition " + partition + " on which message is produced.");
        }
        long offset = recordMetadata.offset();
        long milliseconds = this.time.milliseconds();
        while (true) {
            long longValue = this.consumerTask.receivedOffsetForPartition(partition).orElse(-1L).longValue();
            if (longValue >= offset) {
                return;
            }
            log.debug("Committed offset [{}] for partition [{}], but the target offset: [{}],  Sleeping for [{}] to retry again", Long.valueOf(offset), Integer.valueOf(partition), Long.valueOf(longValue), Long.valueOf(min));
            if (this.time.milliseconds() - milliseconds > j) {
                log.warn("Committed offset for partition:[{}] is : [{}], but the target offset: [{}] ", Integer.valueOf(partition), Long.valueOf(longValue), Long.valueOf(offset));
                throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
            }
            this.time.sleep(min);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Utils.closeQuietly(this.consumerTask, "ConsumerTask");
        try {
            this.consumerTaskThread.join();
        } catch (Exception e) {
            log.error("Encountered error while waiting for consumerTaskThread to finish.", (Throwable) e);
        }
    }

    public void addAssignmentsForPartitions(Set<TopicIdPartition> set) {
        this.consumerTask.addAssignmentsForPartitions(set);
    }

    public void removeAssignmentsForPartitions(Set<TopicIdPartition> set) {
        this.consumerTask.removeAssignmentsForPartitions(set);
    }

    public Optional<Long> receivedOffsetForPartition(int i) {
        return this.consumerTask.receivedOffsetForPartition(i);
    }
}
