package com.linkedin.venice.client.store;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
import com.linkedin.venice.client.stats.ClientStats;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.client.store.streaming.TrackingStreamingCallback;
import com.linkedin.venice.client.store.streaming.VeniceResponseCompletableFuture;
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
import com.linkedin.venice.client.store.streaming.VeniceResponseMapImpl;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.io.ByteArrayOutputStream;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/client/store/StatTrackingStoreClient.class */
public class StatTrackingStoreClient<K, V> extends DelegatingStoreClient<K, V> {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) StatTrackingStoreClient.class);
    private static String STAT_VENICE_CLIENT_NAME = "venice_client";
    private static String STAT_SCHEMA_READER = "schema_reader";
    private final ClientStats singleGetStats;
    private final ClientStats multiGetStats;
    private final ClientStats multiGetStreamingStats;
    private final ClientStats schemaReaderStats;
    private final ClientStats computeStats;
    private final ClientStats computeStreamingStats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/client/store/StatTrackingStoreClient$StatTrackingStreamingCallback.class */
    public static class StatTrackingStreamingCallback<K, V> extends TrackingStreamingCallback<K, V> {
        private final ClientStats stats;
        private final int keyCntForP50;
        private final int keyCntForP90;
        private final int keyCntForP95;
        private final int keyCntForP99;
        private final long preRequestTimeInNS;
        private final AtomicInteger receivedKeyCnt;

        public StatTrackingStreamingCallback(StreamingCallback<K, V> streamingCallback, ClientStats clientStats, int i, long j) {
            super(streamingCallback);
            this.receivedKeyCnt = new AtomicInteger(0);
            this.stats = clientStats;
            this.keyCntForP50 = i / 2;
            this.keyCntForP90 = (i * 9) / 10;
            this.keyCntForP95 = (i * 95) / 100;
            this.keyCntForP99 = (i * 99) / 100;
            this.preRequestTimeInNS = j;
        }

        @Override // com.linkedin.venice.client.store.streaming.TrackingStreamingCallback
        public ClientStats getStats() {
            return this.stats;
        }

        @Override // com.linkedin.venice.client.store.streaming.TrackingStreamingCallback
        public void onRecordDeserialized() {
            int incrementAndGet = this.receivedKeyCnt.incrementAndGet();
            if (incrementAndGet == 1) {
                this.stats.recordStreamingResponseTimeToReceiveFirstRecord(LatencyUtils.getLatencyInMS(this.preRequestTimeInNS));
            }
            if (incrementAndGet == this.keyCntForP50) {
                this.stats.recordStreamingResponseTimeToReceive50PctRecord(LatencyUtils.getLatencyInMS(this.preRequestTimeInNS));
            }
            if (incrementAndGet == this.keyCntForP90) {
                this.stats.recordStreamingResponseTimeToReceive90PctRecord(LatencyUtils.getLatencyInMS(this.preRequestTimeInNS));
            }
            if (incrementAndGet == this.keyCntForP95) {
                this.stats.recordStreamingResponseTimeToReceive95PctRecord(LatencyUtils.getLatencyInMS(this.preRequestTimeInNS));
            }
            if (incrementAndGet == this.keyCntForP99) {
                this.stats.recordStreamingResponseTimeToReceive99PctRecord(LatencyUtils.getLatencyInMS(this.preRequestTimeInNS));
            }
        }

        @Override // com.linkedin.venice.client.store.streaming.TrackingStreamingCallback
        public void onDeserializationCompletion(Optional<VeniceClientException> optional, int i, int i2) {
            StatTrackingStoreClient.handleMetricTrackingForStreamingCallback(this.stats, this.preRequestTimeInNS, optional, i, i2);
        }
    }

    public StatTrackingStoreClient(InternalAvroStoreClient<K, V> internalAvroStoreClient, ClientConfig clientConfig) {
        super(internalAvroStoreClient);
        MetricsRepository metricsRepository = (MetricsRepository) Optional.ofNullable(clientConfig.getMetricsRepository()).orElse(TehutiUtils.getMetricsRepository(STAT_VENICE_CLIENT_NAME));
        this.singleGetStats = ClientStats.getClientStats(metricsRepository, internalAvroStoreClient.getStoreName(), RequestType.SINGLE_GET, clientConfig);
        this.multiGetStats = ClientStats.getClientStats(metricsRepository, internalAvroStoreClient.getStoreName(), RequestType.MULTI_GET, clientConfig);
        this.multiGetStreamingStats = ClientStats.getClientStats(metricsRepository, internalAvroStoreClient.getStoreName(), RequestType.MULTI_GET_STREAMING, clientConfig);
        this.schemaReaderStats = ClientStats.getClientStats(metricsRepository, internalAvroStoreClient.getStoreName() + "_" + STAT_SCHEMA_READER, RequestType.SINGLE_GET, clientConfig);
        this.computeStats = ClientStats.getClientStats(metricsRepository, internalAvroStoreClient.getStoreName(), RequestType.COMPUTE, clientConfig);
        this.computeStreamingStats = ClientStats.getClientStats(metricsRepository, internalAvroStoreClient.getStoreName(), RequestType.COMPUTE_STREAMING, clientConfig);
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.InternalAvroStoreClient, com.linkedin.venice.client.store.AvroGenericStoreClient
    public CompletableFuture<V> get(K k) {
        long nanoTime = System.nanoTime();
        CompletableFuture<V> completableFuture = super.get(k, Optional.of(this.singleGetStats), nanoTime);
        this.singleGetStats.recordRequestKeyCount(1);
        return AppTimeOutTrackingCompletableFuture.track(completableFuture.handle(getStatCallback(this.singleGetStats, nanoTime)), this.singleGetStats);
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.InternalAvroStoreClient
    public CompletableFuture<byte[]> getRaw(String str) {
        long nanoTime = System.nanoTime();
        CompletableFuture<byte[]> raw = super.getRaw(str, Optional.of(this.schemaReaderStats), nanoTime);
        this.schemaReaderStats.recordRequestKeyCount(1);
        return raw.handle(getStatCallback(this.schemaReaderStats, nanoTime));
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.AvroGenericStoreClient
    public CompletableFuture<Map<K, V>> batchGet(Set<K> set) throws VeniceClientException {
        CompletableFuture completableFuture = new CompletableFuture();
        streamingBatchGet(set).whenComplete((veniceResponseMap, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else if (veniceResponseMap.isFullResponse()) {
                completableFuture.complete(veniceResponseMap);
            } else {
                completableFuture.completeExceptionally(new VeniceClientException("Received partial response, returned entry count: " + veniceResponseMap.getTotalEntryCount() + ", and key count: " + set.size()));
            }
        });
        return AppTimeOutTrackingCompletableFuture.track(completableFuture, this.multiGetStreamingStats);
    }

    @Override // com.linkedin.venice.client.store.AvroGenericStoreClient
    public CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGet(final Set<K> set) throws VeniceClientException {
        final VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap(set.size());
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final VeniceResponseCompletableFuture veniceResponseCompletableFuture = new VeniceResponseCompletableFuture(() -> {
            return new VeniceResponseMapImpl(veniceConcurrentHashMap, concurrentLinkedQueue, false);
        }, set.size(), Optional.of(this.multiGetStreamingStats));
        streamingBatchGet(set, new StreamingCallback<K, V>() { // from class: com.linkedin.venice.client.store.StatTrackingStoreClient.1
            @Override // com.linkedin.venice.client.store.streaming.StreamingCallback
            public void onRecordReceived(K k, V v) {
                if (v != null) {
                    veniceConcurrentHashMap.put(k, v);
                } else {
                    concurrentLinkedQueue.add(k);
                }
            }

            @Override // com.linkedin.venice.client.store.streaming.StreamingCallback
            public void onCompletion(Optional<Exception> optional) {
                if (optional.isPresent()) {
                    veniceResponseCompletableFuture.completeExceptionally(optional.get());
                } else {
                    veniceResponseCompletableFuture.complete(new VeniceResponseMapImpl(veniceConcurrentHashMap, concurrentLinkedQueue, veniceConcurrentHashMap.size() + concurrentLinkedQueue.size() == set.size()));
                }
            }
        });
        return veniceResponseCompletableFuture;
    }

    public void recordRetryCount(RequestType requestType) {
        if (requestType == RequestType.SINGLE_GET) {
            this.singleGetStats.recordRequestRetryCount();
        } else if (requestType == RequestType.MULTI_GET) {
            this.multiGetStats.recordRequestRetryCount();
        } else if (requestType == RequestType.COMPUTE) {
            this.computeStats.recordRequestRetryCount();
        }
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.AvroGenericStoreClient
    public void streamingBatchGet(Set<K> set, StreamingCallback<K, V> streamingCallback) throws VeniceClientException {
        long nanoTime = System.nanoTime();
        this.multiGetStreamingStats.recordRequestKeyCount(set.size());
        super.streamingBatchGet(set, new StatTrackingStreamingCallback(streamingCallback, this.multiGetStreamingStats, set.size(), nanoTime));
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.AvroGenericReadComputeStoreClient
    public void compute(ComputeRequestWrapper computeRequestWrapper, Set<K> set, Schema schema, StreamingCallback<K, ComputeGenericRecord> streamingCallback, long j) throws VeniceClientException {
        this.computeStreamingStats.recordRequestKeyCount(set.size());
        super.compute(computeRequestWrapper, set, schema, new StatTrackingStreamingCallback(streamingCallback, this.computeStreamingStats, set.size(), j), j);
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.AvroGenericReadComputeStoreClient
    public void compute(ComputeRequestWrapper computeRequestWrapper, Set<K> set, Schema schema, StreamingCallback<K, ComputeGenericRecord> streamingCallback, long j, BinaryEncoder binaryEncoder, ByteArrayOutputStream byteArrayOutputStream) throws VeniceClientException {
        this.computeStreamingStats.recordRequestKeyCount(set.size());
        super.compute(computeRequestWrapper, set, schema, new StatTrackingStreamingCallback(streamingCallback, this.computeStreamingStats, set.size(), j), j, binaryEncoder, byteArrayOutputStream);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handleMetricTrackingForStreamingCallback(ClientStats clientStats, long j, Optional<VeniceClientException> optional, int i, int i2) {
        double latencyInMS = LatencyUtils.getLatencyInMS(j);
        if (optional.isPresent()) {
            clientStats.recordUnhealthyRequest();
            clientStats.recordUnhealthyLatency(latencyInMS);
            if (optional.get() instanceof VeniceClientHttpException) {
                clientStats.recordHttpRequest(((VeniceClientHttpException) optional.get()).getHttpStatus());
            } else {
                LOGGER.error("Received exception in streaming callback", (Throwable) optional.get());
            }
        } else {
            emitRequestHealthyMetrics(clientStats, latencyInMS);
        }
        clientStats.recordSuccessRequestKeyCount(i);
        clientStats.recordSuccessDuplicateRequestKeyCount(i2);
    }

    @Override // com.linkedin.venice.client.store.DelegatingStoreClient, com.linkedin.venice.client.store.InternalAvroStoreClient, com.linkedin.venice.client.store.AvroGenericStoreClient
    public ComputeRequestBuilder<K> compute() throws VeniceClientException {
        return super.compute(Optional.of(this.computeStats), Optional.of(this.computeStreamingStats), this, System.nanoTime());
    }

    private static void emitRequestHealthyMetrics(ClientStats clientStats, double d) {
        clientStats.recordHealthyRequest();
        clientStats.recordHealthyLatency(d);
    }

    public static <T> BiFunction<? super T, Throwable, ? extends T> getStatCallback(ClientStats clientStats, long j) {
        return (obj, th) -> {
            double latencyInMS = LatencyUtils.getLatencyInMS(j);
            if (th != null) {
                clientStats.recordUnhealthyRequest();
                clientStats.recordUnhealthyLatency(latencyInMS);
                if (th instanceof VeniceClientHttpException) {
                    clientStats.recordHttpRequest(((VeniceClientHttpException) th).getHttpStatus());
                }
                handleStoreExceptionInternally(th);
            }
            emitRequestHealthyMetrics(clientStats, latencyInMS);
            if (obj == null) {
                clientStats.recordSuccessRequestKeyCount(0);
            } else if (obj instanceof Map) {
                clientStats.recordSuccessRequestKeyCount(((Map) obj).size());
            } else {
                clientStats.recordSuccessRequestKeyCount(1);
            }
            return obj;
        };
    }

    public static void handleStoreExceptionInternally(Throwable th) {
        if (th == null) {
            return;
        }
        if (th instanceof CompletionException) {
            throw ((CompletionException) th);
        }
        if (!(th instanceof VeniceClientException)) {
            throw new VeniceClientException(th);
        }
        throw ((VeniceClientException) th);
    }

    public String toString() {
        return getClass().getSimpleName() + "(storeName: " + getStoreName() + DefaultExpressionEngine.DEFAULT_INDEX_END;
    }
}
