package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.common.Measurable;
import com.linkedin.venice.exceptions.VeniceChecksumException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.PartitionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService.class */
public class StoreBufferService extends AbstractStoreBufferService {
    private static final Logger LOGGER = LogManager.getLogger(StoreBufferService.class);
    private final int drainerNum;
    private ExecutorService executorService;
    private final long bufferCapacityPerDrainer;
    private final RecordHandler leaderRecordHandler;
    private final List<StoreBufferDrainer> drainerList = new ArrayList();
    private final ArrayList<MemoryBoundBlockingQueue<QueueNode>> blockingQueueArr = new ArrayList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.davinci.kafka.consumer.StoreBufferService$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$FakePubSubMessage.class */
    public static class FakePubSubMessage implements PubSubMessage {
        private final PubSubTopicPartition topicPartition;

        FakePubSubMessage(PubSubTopicPartition pubSubTopicPartition) {
            this.topicPartition = (PubSubTopicPartition) Objects.requireNonNull(pubSubTopicPartition);
        }

        public Object getKey() {
            return null;
        }

        public Object getValue() {
            return null;
        }

        public PubSubTopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        public Object getOffset() {
            return null;
        }

        public long getPubSubMessageTime() {
            return 0L;
        }

        public int getPayloadSize() {
            return 0;
        }
    }

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$FollowerQueueNode.class */
    private static class FollowerQueueNode extends QueueNode {
        private final CompletableFuture<Void> queuedRecordPersistedFuture;

        public FollowerQueueNode(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, String str, long j, CompletableFuture<Void> completableFuture) {
            super(pubSubMessage, storeIngestionTask, str, j);
            this.queuedRecordPersistedFuture = completableFuture;
        }

        @Override // com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode
        public CompletableFuture<Void> getQueuedRecordPersistedFuture() {
            return this.queuedRecordPersistedFuture;
        }

        @Override // com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode
        public int hashCode() {
            return super.hashCode();
        }

        @Override // com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode
        public boolean equals(Object obj) {
            return super.equals(obj);
        }
    }

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$LeaderQueueNode.class */
    private static class LeaderQueueNode extends QueueNode {
        private final LeaderProducedRecordContext leaderProducedRecordContext;

        public LeaderQueueNode(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, String str, long j, LeaderProducedRecordContext leaderProducedRecordContext) {
            super(pubSubMessage, storeIngestionTask, str, j);
            this.leaderProducedRecordContext = leaderProducedRecordContext;
        }

        @Override // com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode
        public LeaderProducedRecordContext getLeaderProducedRecordContext() {
            return this.leaderProducedRecordContext;
        }

        @Override // com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode
        public int hashCode() {
            return super.hashCode();
        }

        @Override // com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode
        public boolean equals(Object obj) {
            return super.equals(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$QueueNode.class */
    public static class QueueNode implements Measurable {
        private static final int QUEUE_NODE_OVERHEAD_IN_BYTE = 256;
        private final PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord;
        private final StoreIngestionTask ingestionTask;
        private final String kafkaUrl;
        private final long beforeProcessingRecordTimestampNs;

        public QueueNode(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, String str, long j) {
            this.consumerRecord = pubSubMessage;
            this.ingestionTask = storeIngestionTask;
            this.kafkaUrl = str;
            this.beforeProcessingRecordTimestampNs = j;
        }

        public PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getConsumerRecord() {
            return this.consumerRecord;
        }

        public StoreIngestionTask getIngestionTask() {
            return this.ingestionTask;
        }

        public LeaderProducedRecordContext getLeaderProducedRecordContext() {
            return null;
        }

        public CompletableFuture<Void> getQueuedRecordPersistedFuture() {
            return null;
        }

        public String getKafkaUrl() {
            return this.kafkaUrl;
        }

        public long getBeforeProcessingRecordTimestampNs() {
            return this.beforeProcessingRecordTimestampNs;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj != null && getClass() == obj.getClass()) {
                return this.consumerRecord.getTopicPartition().equals(((QueueNode) obj).consumerRecord.getTopicPartition());
            }
            return false;
        }

        public int hashCode() {
            return this.consumerRecord.hashCode();
        }

        public int getSize() {
            return ((KafkaKey) this.consumerRecord.getKey()).getEstimatedObjectSizeOnHeap() + getEstimateOfMessageEnvelopeSizeOnHeap((KafkaMessageEnvelope) this.consumerRecord.getValue()) + QUEUE_NODE_OVERHEAD_IN_BYTE;
        }

        private int getEstimateOfMessageEnvelopeSizeOnHeap(KafkaMessageEnvelope kafkaMessageEnvelope) {
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$kafka$protocol$enums$MessageType[MessageType.valueOf(kafkaMessageEnvelope).ordinal()]) {
                case 1:
                    Put put = (Put) kafkaMessageEnvelope.payloadUnion;
                    int capacity = put.putValue.capacity();
                    if (put.replicationMetadataPayload != null && put.replicationMetadataPayload.array() != put.putValue.array()) {
                        capacity += put.replicationMetadataPayload.capacity();
                    }
                    return capacity + 100;
                case 2:
                    return ((Update) kafkaMessageEnvelope.payloadUnion).updateValue.capacity() + 100;
                default:
                    return 100;
            }
        }

        public String toString() {
            return this.consumerRecord.toString();
        }
    }

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$RecordHandler.class */
    private interface RecordHandler {
        void handle(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) throws InterruptedException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StoreBufferService$StoreBufferDrainer.class */
    public static class StoreBufferDrainer implements Runnable {
        private static final Logger LOGGER = LogManager.getLogger(StoreBufferDrainer.class);
        private final BlockingQueue<QueueNode> blockingQueue;
        private final int drainerIndex;
        private final AtomicBoolean isRunning = new AtomicBoolean(true);
        private final ConcurrentMap<PubSubTopicPartition, Long> topicToTimeSpent = new ConcurrentHashMap();

        public StoreBufferDrainer(BlockingQueue<QueueNode> blockingQueue, int i) {
            this.blockingQueue = blockingQueue;
            this.drainerIndex = i;
        }

        public void stop() {
            this.isRunning.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            LOGGER.info("Starting StoreBufferDrainer Thread for drainer: {}....", Integer.valueOf(this.drainerIndex));
            PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage = null;
            LeaderProducedRecordContext leaderProducedRecordContext = null;
            StoreIngestionTask storeIngestionTask = null;
            CompletableFuture<Void> completableFuture = null;
            while (true) {
                if (!this.isRunning.get()) {
                    break;
                }
                try {
                    QueueNode take = this.blockingQueue.take();
                    pubSubMessage = take.getConsumerRecord();
                    leaderProducedRecordContext = take.getLeaderProducedRecordContext();
                    storeIngestionTask = take.getIngestionTask();
                    completableFuture = take.getQueuedRecordPersistedFuture();
                    long currentTimeMillis = System.currentTimeMillis();
                    StoreBufferService.processRecord(pubSubMessage, storeIngestionTask, leaderProducedRecordContext, PartitionUtils.getSubPartition(pubSubMessage.getTopicPartition(), storeIngestionTask.getAmplificationFactor()), take.getKafkaUrl(), take.getBeforeProcessingRecordTimestampNs());
                    if (completableFuture != null) {
                        completableFuture.complete(null);
                    }
                    this.topicToTimeSpent.compute(pubSubMessage.getTopicPartition(), (pubSubTopicPartition, l) -> {
                        return Long.valueOf(((l == null ? 0L : l.longValue()) + System.currentTimeMillis()) - currentTimeMillis);
                    });
                } catch (Throwable th) {
                    if (!(th instanceof InterruptedException)) {
                        StringBuilder append = new StringBuilder().append("Drainer ").append(this.drainerIndex);
                        if (pubSubMessage == null) {
                            append.append(" received throwable: ");
                        } else {
                            String obj = pubSubMessage.toString();
                            if (obj.length() > 1024) {
                                obj = obj.substring(0, 1024);
                                append.append(" received throwable while processing consumer record (truncated at 1024 characters): ");
                            } else {
                                append.append(" received throwable while processing consumer record: ");
                            }
                            append.append(obj);
                        }
                        LOGGER.error(append.toString(), th);
                        if (!(th instanceof Exception)) {
                            break;
                        }
                        Exception exc = (Exception) th;
                        if (storeIngestionTask != null) {
                            try {
                                storeIngestionTask.setIngestionException(pubSubMessage.getTopicPartition().getPartitionNumber(), exc);
                            } catch (VeniceException e) {
                                storeIngestionTask.setLastStoreIngestionException(e);
                            }
                            if (th instanceof VeniceChecksumException) {
                                storeIngestionTask.recordChecksumVerificationFailure();
                            }
                        }
                        if (leaderProducedRecordContext != null) {
                            leaderProducedRecordContext.completePersistedToDBFuture(exc);
                        }
                        if (completableFuture != null) {
                            completableFuture.completeExceptionally(exc);
                        }
                    } else {
                        LOGGER.error("Drainer {} received InterruptedException, will exit", Integer.valueOf(this.drainerIndex));
                        break;
                    }
                }
            }
            LOGGER.info("Current StoreBufferDrainer {} stopped", Integer.valueOf(this.drainerIndex));
        }
    }

    public StoreBufferService(int i, long j, long j2, boolean z) {
        this.drainerNum = i;
        this.bufferCapacityPerDrainer = j;
        for (int i2 = 0; i2 < i; i2++) {
            this.blockingQueueArr.add(new MemoryBoundBlockingQueue<>(j, j2));
        }
        this.leaderRecordHandler = z ? this::queueLeaderRecord : StoreBufferService::processRecord;
    }

    protected MemoryBoundBlockingQueue<QueueNode> getDrainerForConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i) {
        return this.blockingQueueArr.get(getDrainerIndexForConsumerRecord(pubSubMessage, i));
    }

    protected int getDrainerIndexForConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, int i) {
        return Math.abs((Math.abs(pubSubMessage.getTopicPartition().getPubSubTopic().hashCode() / 2) + i) % this.drainerNum);
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public void putConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) throws InterruptedException {
        if (leaderProducedRecordContext != null) {
            this.leaderRecordHandler.handle(pubSubMessage, storeIngestionTask, leaderProducedRecordContext, i, str, j);
            return;
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        getDrainerForConsumerRecord(pubSubMessage, i).put((MemoryBoundBlockingQueue<QueueNode>) new FollowerQueueNode(pubSubMessage, storeIngestionTask, str, j, completableFuture));
        PartitionConsumptionState partitionConsumptionState = storeIngestionTask.getPartitionConsumptionState(pubSubMessage.getTopicPartition().getPartitionNumber());
        if (partitionConsumptionState != null) {
            partitionConsumptionState.setLastQueuedRecordPersistedFuture(completableFuture);
        }
    }

    private void queueLeaderRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) throws InterruptedException {
        getDrainerForConsumerRecord(pubSubMessage, i).put((MemoryBoundBlockingQueue<QueueNode>) new LeaderQueueNode(pubSubMessage, storeIngestionTask, str, j, leaderProducedRecordContext));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage, StoreIngestionTask storeIngestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int i, String str, long j) throws InterruptedException {
        storeIngestionTask.processConsumerRecord(pubSubMessage, leaderProducedRecordContext, i, str, j);
        if (leaderProducedRecordContext != null) {
            leaderProducedRecordContext.completePersistedToDBFuture(null);
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public void drainBufferedRecordsFromTopicPartition(PubSubTopicPartition pubSubTopicPartition) throws InterruptedException {
        internalDrainBufferedRecordsFromTopicPartition(pubSubTopicPartition, 1000, 50);
    }

    protected void internalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition pubSubTopicPartition, int i, int i2) throws InterruptedException {
        FakePubSubMessage fakePubSubMessage = new FakePubSubMessage(pubSubTopicPartition);
        int drainerIndexForConsumerRecord = getDrainerIndexForConsumerRecord(fakePubSubMessage, pubSubTopicPartition.getPartitionNumber());
        MemoryBoundBlockingQueue<QueueNode> memoryBoundBlockingQueue = this.blockingQueueArr.get(drainerIndexForConsumerRecord);
        if (!this.drainerList.get(drainerIndexForConsumerRecord).isRunning.get()) {
            throw new VeniceException("Drainer thread " + drainerIndexForConsumerRecord + " has stopped running, cannot drain the topic " + pubSubTopicPartition.getPubSubTopic().getName());
        }
        QueueNode queueNode = new QueueNode(fakePubSubMessage, null, "dummyKafkaUrl", 0L);
        int i3 = 0;
        while (true) {
            int i4 = i3;
            i3++;
            if (i4 >= i) {
                String str = "There are still some records left in the blocking queue of store writer thread: " + drainerIndexForConsumerRecord + " for topic: " + pubSubTopicPartition.getPubSubTopic().getName() + " partition after retry for " + i + " times";
                LOGGER.error(str);
                throw new VeniceException(str);
            }
            if (!memoryBoundBlockingQueue.contains(queueNode)) {
                LOGGER.info("The blocking queue of store writer thread: {} doesn't contain any record for: {}", Integer.valueOf(drainerIndexForConsumerRecord), pubSubTopicPartition);
                return;
            }
            Thread.sleep(i2);
        }
    }

    public boolean startInner() {
        this.executorService = Executors.newFixedThreadPool(this.drainerNum, new DaemonThreadFactory("Store-writer"));
        for (int i = 0; i < this.drainerNum; i++) {
            StoreBufferDrainer storeBufferDrainer = new StoreBufferDrainer(this.blockingQueueArr.get(i), i);
            this.executorService.submit(storeBufferDrainer);
            this.drainerList.add(storeBufferDrainer);
        }
        this.executorService.shutdown();
        return true;
    }

    public void stopInner() throws Exception {
        this.drainerList.forEach(storeBufferDrainer -> {
            storeBufferDrainer.stop();
        });
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public int getDrainerCount() {
        return this.blockingQueueArr.size();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getDrainerQueueMemoryUsage(int i) {
        return this.blockingQueueArr.get(i).getMemoryUsage();
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getTotalMemoryUsage() {
        long j = 0;
        Iterator<MemoryBoundBlockingQueue<QueueNode>> it = this.blockingQueueArr.iterator();
        while (it.hasNext()) {
            j += it.next().getMemoryUsage();
        }
        return j;
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getTotalRemainingMemory() {
        long j = 0;
        Iterator<MemoryBoundBlockingQueue<QueueNode>> it = this.blockingQueueArr.iterator();
        while (it.hasNext()) {
            j += it.next().remainingMemoryCapacityInByte();
        }
        return j;
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getMaxMemoryUsagePerDrainer() {
        long j = 0;
        boolean z = false;
        Iterator<MemoryBoundBlockingQueue<QueueNode>> it = this.blockingQueueArr.iterator();
        while (it.hasNext()) {
            j = Math.max(j, it.next().getMemoryUsage());
            if (r0.getMemoryUsage() > 0.8d * this.bufferCapacityPerDrainer) {
                z = true;
            }
        }
        for (int i = 0; i < this.blockingQueueArr.size(); i++) {
            StoreBufferDrainer storeBufferDrainer = this.drainerList.get(i);
            if (z) {
                List list = (List) storeBufferDrainer.topicToTimeSpent.entrySet().stream().sorted(Comparator.comparing((v0) -> {
                    return v0.getValue();
                }, Collections.reverseOrder())).limit(((double) this.blockingQueueArr.get(i).getMemoryUsage()) > 0.8d * ((double) this.bufferCapacityPerDrainer) ? 5 : 1).collect(Collectors.toList());
                int i2 = i;
                list.forEach(entry -> {
                    LOGGER.info("In drainer number {}, time spent on {} : {} ms", Integer.valueOf(i2), entry.getKey(), entry.getValue());
                });
            }
            storeBufferDrainer.topicToTimeSpent.clear();
        }
        return j;
    }

    Map<PubSubTopicPartition, Long> getTopicToTimeSpentMap(int i) {
        return this.drainerList.get(i).topicToTimeSpent;
    }

    @Override // com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
    public long getMinMemoryUsagePerDrainer() {
        long j = Long.MAX_VALUE;
        Iterator<MemoryBoundBlockingQueue<QueueNode>> it = this.blockingQueueArr.iterator();
        while (it.hasNext()) {
            j = Math.min(j, it.next().getMemoryUsage());
        }
        return j;
    }
}
