package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/SeparatedStoreBufferService.class */
public class SeparatedStoreBufferService extends AbstractStoreBufferService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) SeparatedStoreBufferService.class);
    protected final StoreBufferService sortedServiceDelegate;
    protected final StoreBufferService unsortedServiceDelegate;
    private final int sortedPoolSize;
    private final int unsortedPoolSize;
    private final Map<PubSubTopic, Boolean> topicToSortedIngestionMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SeparatedStoreBufferService(VeniceServerConfig veniceServerConfig) {
        this(veniceServerConfig.getDrainerPoolSizeSortedInput(), veniceServerConfig.getDrainerPoolSizeUnsortedInput(), new StoreBufferService(veniceServerConfig.getDrainerPoolSizeSortedInput(), veniceServerConfig.getStoreWriterBufferMemoryCapacity(), veniceServerConfig.getStoreWriterBufferNotifyDelta(), veniceServerConfig.isStoreWriterBufferAfterLeaderLogicEnabled()), new StoreBufferService(veniceServerConfig.getDrainerPoolSizeUnsortedInput(), veniceServerConfig.getStoreWriterBufferMemoryCapacity(), veniceServerConfig.getStoreWriterBufferNotifyDelta(), veniceServerConfig.isStoreWriterBufferAfterLeaderLogicEnabled()));
        LOGGER.info("Created separated store buffer service with {} sorted drainers and {} unsorted drainers queues with capacity of {}", Integer.valueOf(this.sortedPoolSize), Integer.valueOf(this.unsortedPoolSize), Long.valueOf(veniceServerConfig.getStoreWriterBufferMemoryCapacity()));
    }

    SeparatedStoreBufferService(int i, int i2, StoreBufferService storeBufferService, StoreBufferService storeBufferService2) {
        this.topicToSortedIngestionMode = new VeniceConcurrentHashMap();
        this.sortedPoolSize = i;
        this.unsortedPoolSize = i2;
        this.sortedServiceDelegate = storeBufferService;
        this.unsortedServiceDelegate = storeBufferService2;
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public void putConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) throws InterruptedException {
        PartitionConsumptionState partitionConsumptionState = storeIngestionTask.getPartitionConsumptionState(i);
        boolean z = false;
        if (partitionConsumptionState != null) {
            z = partitionConsumptionState.isDeferredWrite();
            Boolean bool = this.topicToSortedIngestionMode.get(pubSubMessage.getTopicPartition().getPubSubTopic());
            if (bool == null) {
                this.topicToSortedIngestionMode.put(pubSubMessage.getTopicPartition().getPubSubTopic(), Boolean.valueOf(z));
            } else if (bool.booleanValue() != z) {
                LOGGER.info("Switching drainer buffer for topic {} to use {}", pubSubMessage.getTopicPartition().getPubSubTopic().getName(), z ? "sorted queue." : "unsorted queue.");
                drainBufferedRecordsFromTopicPartition(pubSubMessage.getTopicPartition().getPartitionNumber() != i ? new PubSubTopicPartitionImpl(pubSubMessage.getTopicPartition().getPubSubTopic(), i) : pubSubMessage.getTopicPartition());
                this.topicToSortedIngestionMode.put(pubSubMessage.getTopicPartition().getPubSubTopic(), Boolean.valueOf(z));
            }
        }
        (z ? this.sortedServiceDelegate : this.unsortedServiceDelegate).putConsumerRecord(pubSubMessage, storeIngestionTask, leaderProducedRecordContext, i, str, j);
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public void drainBufferedRecordsFromTopicPartition(PubSubTopicPartition pubSubTopicPartition) throws InterruptedException {
        this.sortedServiceDelegate.drainBufferedRecordsFromTopicPartition(pubSubTopicPartition);
        this.unsortedServiceDelegate.drainBufferedRecordsFromTopicPartition(pubSubTopicPartition);
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() throws Exception {
        this.sortedServiceDelegate.startInner();
        this.unsortedServiceDelegate.startInner();
        return true;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws Exception {
        this.sortedServiceDelegate.stopInner();
        this.unsortedServiceDelegate.stopInner();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public int getDrainerCount() {
        return this.unsortedServiceDelegate.getDrainerCount() + this.sortedServiceDelegate.getDrainerCount();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getDrainerQueueMemoryUsage(int i) {
        return i < this.sortedPoolSize ? this.sortedServiceDelegate.getDrainerQueueMemoryUsage(i) : this.unsortedServiceDelegate.getDrainerQueueMemoryUsage(i - this.sortedPoolSize);
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getTotalMemoryUsage() {
        return this.unsortedServiceDelegate.getTotalMemoryUsage() + this.sortedServiceDelegate.getTotalMemoryUsage();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getTotalRemainingMemory() {
        return this.unsortedServiceDelegate.getTotalRemainingMemory() + this.sortedServiceDelegate.getTotalRemainingMemory();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getMaxMemoryUsagePerDrainer() {
        return this.unsortedServiceDelegate.getMaxMemoryUsagePerDrainer() + this.sortedServiceDelegate.getMaxMemoryUsagePerDrainer();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getMinMemoryUsagePerDrainer() {
        return this.sortedServiceDelegate.getMinMemoryUsagePerDrainer() + this.unsortedServiceDelegate.getMinMemoryUsagePerDrainer();
    }
}
