package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.KafkaConsumerServiceStats;
import com.linkedin.venice.exceptions.UnsubscribedTopicPartitionException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.class */
public class SharedKafkaConsumer implements PubSubConsumerAdapter {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) SharedKafkaConsumer.class);
    protected final PubSubConsumerAdapter delegate;
    private final KafkaConsumerServiceStats stats;
    private final Runnable assignmentChangeListener;
    private final UnsubscriptionListener unsubscriptionListener;
    private final AtomicInteger currentAssignmentSize;
    private final AtomicBoolean waitingForPoll;
    private final Time time;
    private final VeniceConcurrentHashMap<PubSubTopicPartition, PubSubTopic> subscribedTopicPartitionToVersionTopic;
    private Set<PubSubTopicPartition> currentAssignment;
    private volatile long pollTimes;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer$UnsubscriptionListener.class */
    public interface UnsubscriptionListener {
        void call(SharedKafkaConsumer sharedKafkaConsumer, PubSubTopicPartition pubSubTopicPartition);
    }

    public SharedKafkaConsumer(PubSubConsumerAdapter pubSubConsumerAdapter, KafkaConsumerServiceStats kafkaConsumerServiceStats, Runnable runnable, UnsubscriptionListener unsubscriptionListener) {
        this(pubSubConsumerAdapter, kafkaConsumerServiceStats, runnable, unsubscriptionListener, new SystemTime());
    }

    SharedKafkaConsumer(PubSubConsumerAdapter pubSubConsumerAdapter, KafkaConsumerServiceStats kafkaConsumerServiceStats, Runnable runnable, UnsubscriptionListener unsubscriptionListener, Time time) {
        this.waitingForPoll = new AtomicBoolean(false);
        this.subscribedTopicPartitionToVersionTopic = new VeniceConcurrentHashMap<>();
        this.pollTimes = 0L;
        this.delegate = pubSubConsumerAdapter;
        this.stats = kafkaConsumerServiceStats;
        this.assignmentChangeListener = runnable;
        this.unsubscriptionListener = unsubscriptionListener;
        this.time = time;
        this.currentAssignment = Collections.emptySet();
        this.currentAssignmentSize = new AtomicInteger(0);
    }

    protected synchronized void updateCurrentAssignment(Set<PubSubTopicPartition> set) {
        long currentTimeMillis = System.currentTimeMillis();
        this.currentAssignmentSize.set(set.size());
        this.currentAssignment = Collections.unmodifiableSet(set);
        this.assignmentChangeListener.run();
        this.stats.recordUpdateCurrentAssignmentLatency(LatencyUtils.getElapsedTimeInMs(currentTimeMillis));
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized void subscribe(PubSubTopicPartition pubSubTopicPartition, long j) {
        throw new VeniceException(getClass().getSimpleName() + " does not support subscribe without specifying a version-topic.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void subscribe(PubSubTopic pubSubTopic, PubSubTopicPartition pubSubTopicPartition, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        this.delegate.subscribe(pubSubTopicPartition, j);
        PubSubTopic put = this.subscribedTopicPartitionToVersionTopic.put(pubSubTopicPartition, pubSubTopic);
        if (put != null && !put.equals(pubSubTopic)) {
            throw new IllegalStateException("A shared consumer cannot be used to subscribe to the same topic-partition by different VTs! versionTopic: " + pubSubTopic + ", previousVersionTopic: " + put + ", topicPartitionToSubscribe: " + pubSubTopicPartition);
        }
        this.stats.recordDelegateSubscribeLatency(LatencyUtils.getElapsedTimeInMs(currentTimeMillis));
        updateCurrentAssignment(this.delegate.getAssignment());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
        unSubscribeAction(() -> {
            this.delegate.unSubscribe(pubSubTopicPartition);
            this.subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
            this.unsubscriptionListener.call(this, pubSubTopicPartition);
            return 1;
        });
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> set) {
        unSubscribeAction(() -> {
            this.delegate.batchUnsubscribe(set);
            Iterator it2 = set.iterator();
            while (it2.hasNext()) {
                PubSubTopicPartition pubSubTopicPartition = (PubSubTopicPartition) it2.next();
                this.subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
                this.unsubscriptionListener.call(this, pubSubTopicPartition);
            }
            return set.size();
        });
    }

    protected synchronized void unSubscribeAction(IntSupplier intSupplier) {
        long j = this.pollTimes;
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Shared consumer {} unsubscribed {} partition(s) in {} ms.", getClass().getSimpleName(), Integer.valueOf(intSupplier.getAsInt()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        updateCurrentAssignment(this.delegate.getAssignment());
        waitAfterUnsubscribe(j);
    }

    protected void waitAfterUnsubscribe(long j) {
        long j2 = j + 1;
        this.waitingForPoll.set(true);
        long nanoseconds = (this.time.getNanoseconds() / 1000000) + 10000;
        while (j2 > this.pollTimes) {
            try {
                long nanoseconds2 = nanoseconds - (this.time.getNanoseconds() / 1000000);
                if (nanoseconds2 <= 0) {
                    break;
                } else {
                    wait(nanoseconds2);
                }
            } catch (InterruptedException e) {
                LOGGER.info("Wait for poll request in `unsubscribe` function got interrupted.");
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized void resetOffset(PubSubTopicPartition pubSubTopicPartition) throws UnsubscribedTopicPartitionException {
        this.delegate.resetOffset(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter, java.lang.AutoCloseable, java.io.Closeable
    public synchronized void close() {
        this.delegate.close();
        updateCurrentAssignment(Collections.emptySet());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> poll(long j) {
        this.pollTimes++;
        if (this.waitingForPoll.get()) {
            this.waitingForPoll.set(false);
            notifyAll();
        }
        try {
            if (hasAnySubscription()) {
                return this.delegate.poll(j);
            }
            Thread.sleep(j);
            return Collections.emptyMap();
        } catch (InterruptedException e) {
            throw new VeniceException("Shared Consumer poll sleep got interrupted", e);
        }
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public boolean hasAnySubscription() {
        return !this.currentAssignment.isEmpty();
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition) {
        return this.currentAssignment.contains(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized void pause(PubSubTopicPartition pubSubTopicPartition) {
        this.delegate.pause(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized void resume(PubSubTopicPartition pubSubTopicPartition) {
        this.delegate.resume(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public synchronized Set<PubSubTopicPartition> getAssignment() {
        return this.currentAssignment;
    }

    public int getAssignmentSize() {
        return this.currentAssignmentSize.get();
    }

    synchronized void setCurrentAssignment(Set<PubSubTopicPartition> set) {
        this.currentAssignment = set;
        this.currentAssignmentSize.set(set.size());
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public long getOffsetLag(PubSubTopicPartition pubSubTopicPartition) {
        return this.delegate.getOffsetLag(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) {
        return this.delegate.getLatestOffset(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long j, Duration duration) {
        throw new UnsupportedOperationException("offsetForTime is not supported in SharedKafkaConsumer");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long j) {
        throw new UnsupportedOperationException("offsetForTime is not supported in SharedKafkaConsumer");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long beginningOffset(PubSubTopicPartition pubSubTopicPartition, Duration duration) {
        throw new UnsupportedOperationException("beginningOffset is not supported in SharedKafkaConsumer");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Map<PubSubTopicPartition, Long> endOffsets(Collection<PubSubTopicPartition> collection, Duration duration) {
        throw new UnsupportedOperationException("endOffsets is not supported in SharedKafkaConsumer");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public Long endOffset(PubSubTopicPartition pubSubTopicPartition) {
        throw new UnsupportedOperationException("endOffset is not supported in SharedKafkaConsumer");
    }

    @Override // com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
    public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic pubSubTopic) {
        throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer");
    }
}
