package com.linkedin.venice.fastclient;

import com.linkedin.restli.common.HttpStatus;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.client.store.streaming.VeniceResponseCompletableFuture;
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
import com.linkedin.venice.client.store.streaming.VeniceResponseMapImpl;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.fastclient.BatchGetRequestContext;
import com.linkedin.venice.fastclient.meta.StoreMetadata;
import com.linkedin.venice.fastclient.transport.R2TransportClient;
import com.linkedin.venice.fastclient.transport.TransportClientResponseForRoute;
import com.linkedin.venice.read.protocol.request.router.MultiGetRouterRequestKeyV1;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.utils.EncodingUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.avro.Schema;
import org.apache.avro.io.ByteBufferOptimizedBinaryDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/fastclient/DispatchingAvroGenericStoreClient.class */
public class DispatchingAvroGenericStoreClient<K, V> extends InternalAvroStoreClient<K, V> {
    private static final String URI_SEPARATOR = "/";
    private final StoreMetadata metadata;
    private final int requiredReplicaCount;
    private final ClientConfig config;
    private final TransportClient transportClient;
    private final Executor deserializationExecutor;
    private RecordSerializer<K> keySerializer;
    private RecordSerializer<MultiGetRouterRequestKeyV1> multiGetSerializer;
    private final String token;
    private static final Logger LOGGER = LogManager.getLogger(DispatchingAvroGenericStoreClient.class);
    private static final Executor DESERIALIZATION_EXECUTOR = AbstractAvroStoreClient.getDefaultDeserializationExecutor();

    public DispatchingAvroGenericStoreClient(StoreMetadata storeMetadata, ClientConfig clientConfig) {
        this.metadata = storeMetadata;
        this.config = clientConfig;
        this.token = clientConfig.getToken();
        this.transportClient = new R2TransportClient(clientConfig.getR2Client());
        if (clientConfig.isSpeculativeQueryEnabled()) {
            this.requiredReplicaCount = 2;
        } else {
            this.requiredReplicaCount = 1;
        }
        this.deserializationExecutor = (Executor) Optional.ofNullable(clientConfig.getDeserializationExecutor()).orElse(DESERIALIZATION_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StoreMetadata getStoreMetadata() {
        return this.metadata;
    }

    private String composeURIForSingleGet(GetRequestContext getRequestContext, K k) {
        int currentVersion = getCurrentVersion();
        String resourceName = getResourceName(currentVersion);
        long nanoTime = System.nanoTime();
        byte[] serialize = this.keySerializer.serialize(k);
        getRequestContext.requestSerializationTime = getLatencyInNS(nanoTime);
        int partitionId = this.metadata.getPartitionId(currentVersion, serialize);
        String base64EncodeToString = EncodingUtils.base64EncodeToString(serialize);
        getRequestContext.currentVersion = currentVersion;
        getRequestContext.partitionId = partitionId;
        return "/storage/" + resourceName + URI_SEPARATOR + partitionId + URI_SEPARATOR + base64EncodeToString + "?f=b64";
    }

    private String composeURIForBatchGetRequest(BatchGetRequestContext<K, V> batchGetRequestContext) {
        int currentVersion = getCurrentVersion();
        String resourceName = getResourceName(currentVersion);
        batchGetRequestContext.currentVersion = currentVersion;
        StringBuilder sb = new StringBuilder();
        sb.append(URI_SEPARATOR).append("storage").append(URI_SEPARATOR).append(resourceName);
        return sb.toString();
    }

    private String getResourceName(int i) {
        return this.metadata.getStoreName() + "_v" + i;
    }

    private int getCurrentVersion() {
        int currentStoreVersion = this.metadata.getCurrentStoreVersion();
        if (currentStoreVersion <= 0) {
            throw new VeniceClientException("No available current version, please do a push first");
        }
        return currentStoreVersion;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.fastclient.InternalAvroStoreClient
    public CompletableFuture<V> get(GetRequestContext getRequestContext, K k) throws VeniceClientException {
        verifyMetadataInitialized();
        getRequestContext.instanceHealthMonitor = this.metadata.getInstanceHealthMonitor();
        if (getRequestContext.requestUri == null) {
            getRequestContext.requestUri = composeURIForSingleGet(getRequestContext, k);
        }
        String str = getRequestContext.requestUri;
        int i = getRequestContext.currentVersion;
        int i2 = getRequestContext.partitionId;
        CompletableFuture<V> completableFuture = new CompletableFuture<>();
        long nanoTime = System.nanoTime();
        List<String> replicas = this.metadata.getReplicas(getRequestContext.requestId, i, i2, this.requiredReplicaCount, getRequestContext.routeRequestMap.keySet());
        if (replicas.isEmpty()) {
            getRequestContext.noAvailableReplica = true;
            completableFuture.completeExceptionally(new VeniceClientException("No available route for store: " + getStoreName() + ", version: " + i + ", partition: " + i2));
            return completableFuture;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        LinkedList linkedList = new LinkedList();
        getRequestContext.requestSentTimestampNS = System.nanoTime();
        for (String str2 : replicas) {
            CompletableFuture<HttpStatus> trackHealthBasedOnRequestToInstance = this.metadata.trackHealthBasedOnRequestToInstance(str2, i, i2);
            getRequestContext.routeRequestMap.put(str2, trackHealthBasedOnRequestToInstance);
            try {
                CompletableFuture completableFuture2 = this.transportClient.get(str2 + str, (this.token == null || this.token.isEmpty()) ? Collections.emptyMap() : Collections.singletonMap("Authorization", "Bearer " + this.token));
                linkedList.add(completableFuture2);
                completableFuture2.whenCompleteAsync((transportClientResponse, th) -> {
                    if (th != null) {
                        trackHealthBasedOnRequestToInstance.complete(th instanceof VeniceClientHttpException ? HttpStatus.fromCode(((VeniceClientHttpException) th).getHttpStatus()) : HttpStatus.S_503_SERVICE_UNAVAILABLE);
                        return;
                    }
                    if (transportClientResponse == null) {
                        trackHealthBasedOnRequestToInstance.complete(HttpStatus.S_404_NOT_FOUND);
                        if (atomicBoolean.getAndSet(true)) {
                            return;
                        }
                        getRequestContext.requestSubmissionToResponseHandlingTime = LatencyUtils.getLatencyInMS(nanoTime);
                        completableFuture.complete(null);
                        return;
                    }
                    try {
                        trackHealthBasedOnRequestToInstance.complete(HttpStatus.S_200_OK);
                        if (!atomicBoolean.getAndSet(true)) {
                            getRequestContext.requestSubmissionToResponseHandlingTime = LatencyUtils.getLatencyInMS(nanoTime);
                            CompressionStrategy compressionStrategy = transportClientResponse.getCompressionStrategy();
                            long nanoTime2 = System.nanoTime();
                            ByteBuffer decompressRecord = decompressRecord(compressionStrategy, ByteBuffer.wrap(transportClientResponse.getBody()), getRequestContext.currentVersion, this.metadata.getCompressor(compressionStrategy, getRequestContext.currentVersion));
                            getRequestContext.decompressionTime = LatencyUtils.getLatencyInMS(nanoTime2);
                            long nanoTime3 = System.nanoTime();
                            Object tryToDeserialize = tryToDeserialize(getDataRecordDeserializer(transportClientResponse.getSchemaId()), decompressRecord, transportClientResponse.getSchemaId(), k);
                            getRequestContext.responseDeserializationTime = LatencyUtils.getLatencyInMS(nanoTime3);
                            getRequestContext.successRequestKeyCount.incrementAndGet();
                            completableFuture.complete(tryToDeserialize);
                        }
                    } catch (Exception e) {
                        if (completableFuture.isDone()) {
                            return;
                        }
                        completableFuture.completeExceptionally(e);
                    }
                }, this.deserializationExecutor);
            } catch (Exception e) {
                LOGGER.error("Received exception while sending request to route: {}", str2, e);
                trackHealthBasedOnRequestToInstance.complete(HttpStatus.S_503_SERVICE_UNAVAILABLE);
            }
        }
        if (linkedList.isEmpty()) {
            completableFuture.completeExceptionally(new VeniceClientException("No available replica for store: " + getStoreName() + ", version: " + i + " and partition: " + i2));
        } else {
            CompletableFuture.allOf((CompletableFuture[]) linkedList.toArray(new CompletableFuture[linkedList.size()])).exceptionally(th2 -> {
                boolean z = true;
                Iterator it = linkedList.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (!((CompletableFuture) it.next()).isCompletedExceptionally()) {
                        z = false;
                        break;
                    }
                }
                if (!z) {
                    return null;
                }
                getRequestContext.requestSubmissionToResponseHandlingTime = LatencyUtils.getLatencyInMS(nanoTime);
                completableFuture.completeExceptionally(th2);
                return null;
            });
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.fastclient.InternalAvroStoreClient
    public CompletableFuture<Map<K, V>> batchGet(BatchGetRequestContext<K, V> batchGetRequestContext, Set<K> set) throws VeniceClientException {
        verifyMetadataInitialized();
        CompletableFuture<Map<K, V>> completableFuture = new CompletableFuture<>();
        streamingBatchGet(batchGetRequestContext, set).whenComplete((veniceResponseMap, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            if (veniceResponseMap.isFullResponse()) {
                completableFuture.complete(veniceResponseMap);
            } else if (batchGetRequestContext.getPartialResponseException().isPresent()) {
                completableFuture.completeExceptionally(new VeniceClientException("Response was not complete", batchGetRequestContext.getPartialResponseException().get()));
            } else {
                completableFuture.completeExceptionally(new VeniceClientException("Response was not complete"));
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.fastclient.InternalAvroStoreClient
    public CompletableFuture<VeniceResponseMap<K, V>> streamingBatchGet(final BatchGetRequestContext<K, V> batchGetRequestContext, Set<K> set) throws VeniceClientException {
        verifyMetadataInitialized();
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
        final VeniceResponseCompletableFuture veniceResponseCompletableFuture = new VeniceResponseCompletableFuture(() -> {
            return new VeniceResponseMapImpl(veniceConcurrentHashMap, concurrentLinkedQueue, false);
        }, set.size(), Optional.empty());
        streamingBatchGet(batchGetRequestContext, set, new StreamingCallback<K, V>() { // from class: com.linkedin.venice.fastclient.DispatchingAvroGenericStoreClient.1
            public void onRecordReceived(K k, V v) {
                if (v == null) {
                    concurrentLinkedQueue.add(k);
                } else {
                    veniceConcurrentHashMap.put(k, v);
                }
            }

            public void onCompletion(Optional<Exception> optional) {
                batchGetRequestContext.complete();
                if (optional.isPresent()) {
                    veniceResponseCompletableFuture.completeExceptionally(optional.get());
                } else {
                    veniceResponseCompletableFuture.complete(new VeniceResponseMapImpl(veniceConcurrentHashMap, concurrentLinkedQueue, true));
                }
            }
        });
        return veniceResponseCompletableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.fastclient.InternalAvroStoreClient
    public void streamingBatchGet(BatchGetRequestContext<K, V> batchGetRequestContext, Set<K> set, StreamingCallback<K, V> streamingCallback) {
        verifyMetadataInitialized();
        streamingBatchGetInternal(batchGetRequestContext, set, (transportClientResponseForRoute, th) -> {
            transportRequestCompletionHandler(batchGetRequestContext, transportClientResponseForRoute, th, streamingCallback);
        });
        CompletableFuture.allOf((CompletableFuture[]) batchGetRequestContext.getAllRouteFutures().toArray(new CompletableFuture[0])).whenComplete((r8, th2) -> {
            if (th2 == null) {
                streamingCallback.onCompletion(Optional.empty());
                return;
            }
            Throwable th2 = th2;
            if (batchGetRequestContext.getPartialResponseException().isPresent()) {
                th2 = batchGetRequestContext.getPartialResponseException().get();
            }
            streamingCallback.onCompletion(Optional.of(new VeniceClientException("At least one route did not complete", th2)));
        });
    }

    private void streamingBatchGetInternal(BatchGetRequestContext<K, V> batchGetRequestContext, Set<K> set, BiConsumer<TransportClientResponseForRoute, Throwable> biConsumer) {
        batchGetRequestContext.instanceHealthMonitor = this.metadata.getInstanceHealthMonitor();
        String composeURIForBatchGetRequest = composeURIForBatchGetRequest(batchGetRequestContext);
        int i = batchGetRequestContext.currentVersion;
        HashMap hashMap = new HashMap();
        for (K k : set) {
            int partitionId = this.metadata.getPartitionId(i, this.keySerializer.serialize(k));
            List list = (List) hashMap.computeIfAbsent(Integer.valueOf(partitionId), num -> {
                return this.metadata.getReplicas(batchGetRequestContext.requestId, i, partitionId, 1, batchGetRequestContext.getRoutesForPartitionMapping().getOrDefault(num, Collections.emptySet()));
            });
            if (list.isEmpty()) {
                batchGetRequestContext.noAvailableReplica = true;
                String format = String.format("No route found for partitionId: %s, store: %s, version: %s", Integer.valueOf(partitionId), getStoreName(), Integer.valueOf(i));
                LOGGER.error(format);
                batchGetRequestContext.setPartialResponseException(new VeniceClientException(format));
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                batchGetRequestContext.addKey((String) it.next(), k, partitionId);
            }
        }
        for (String str : batchGetRequestContext.getRoutes()) {
            String str2 = str + composeURIForBatchGetRequest;
            HashMap hashMap2 = new HashMap();
            hashMap2.put("X-VENICE-API-VERSION", Integer.toString(ReadAvroProtocolDefinition.MULTI_GET_ROUTER_REQUEST_V1.getProtocolVersion()));
            if (this.token != null && !this.token.isEmpty()) {
                hashMap2.put("Authorization", "Bearer " + this.token);
            }
            long nanoTime = System.nanoTime();
            byte[] serializeMultiGetRequest = serializeMultiGetRequest(batchGetRequestContext.keysForRoutes(str));
            batchGetRequestContext.recordRequestSerializationTime(str, getLatencyInNS(nanoTime));
            batchGetRequestContext.recordRequestSentTimeStamp(str);
            this.transportClient.post(str2, hashMap2, serializeMultiGetRequest).whenComplete((transportClientResponse, th) -> {
                batchGetRequestContext.recordRequestSubmissionToResponseHandlingTime(str);
                biConsumer.accept(TransportClientResponseForRoute.fromTransportClientWithRoute(transportClientResponse, str), th);
            });
        }
    }

    private void transportRequestCompletionHandler(BatchGetRequestContext<K, V> batchGetRequestContext, TransportClientResponseForRoute transportClientResponseForRoute, Throwable th, StreamingCallback<K, V> streamingCallback) {
        if (th != null) {
            LOGGER.error("Exception received from transport. ExMsg: {}", th.getMessage());
            batchGetRequestContext.markCompleteExceptionally(transportClientResponseForRoute, th);
            return;
        }
        RecordDeserializer<MultiGetResponseRecordV1> multiGetResponseRecordDeserializer = getMultiGetResponseRecordDeserializer(transportClientResponseForRoute.getSchemaId());
        long nanoTime = System.nanoTime();
        Iterable<MultiGetResponseRecordV1> deserializeObjects = multiGetResponseRecordDeserializer.deserializeObjects(new ByteBufferOptimizedBinaryDecoder(transportClientResponseForRoute.getBody()));
        batchGetRequestContext.recordRequestDeserializationTime(transportClientResponseForRoute.getRouteId(), getLatencyInNS(nanoTime));
        RecordDeserializer<V> dataRecordDeserializer = getDataRecordDeserializer(transportClientResponseForRoute.getSchemaId());
        List<BatchGetRequestContext.KeyInfo<K>> keysForRoutes = batchGetRequestContext.keysForRoutes(transportClientResponseForRoute.getRouteId());
        HashSet hashSet = new HashSet();
        LOGGER.debug("Response received for route {} -> {} ", transportClientResponseForRoute.getRouteId(), deserializeObjects);
        long j = 0;
        VeniceCompressor compressor = this.metadata.getCompressor(transportClientResponseForRoute.getCompressionStrategy(), batchGetRequestContext.currentVersion);
        for (MultiGetResponseRecordV1 multiGetResponseRecordV1 : deserializeObjects) {
            long nanoTime2 = System.nanoTime();
            ByteBuffer decompressRecord = decompressRecord(transportClientResponseForRoute.getCompressionStrategy(), multiGetResponseRecordV1.value, batchGetRequestContext.currentVersion, compressor);
            j += System.nanoTime() - nanoTime2;
            long nanoTime3 = System.nanoTime();
            Object deserialize = dataRecordDeserializer.deserialize(decompressRecord);
            batchGetRequestContext.recordRecordDeserializationTime(transportClientResponseForRoute.getRouteId(), getLatencyInNS(nanoTime3));
            BatchGetRequestContext.KeyInfo<K> keyInfo = keysForRoutes.get(multiGetResponseRecordV1.keyIndex);
            hashSet.add(Integer.valueOf(multiGetResponseRecordV1.keyIndex));
            streamingCallback.onRecordReceived(keyInfo.getKey(), deserialize);
        }
        batchGetRequestContext.recordDecompressionTime(transportClientResponseForRoute.getRouteId(), j);
        for (int i = 0; i < keysForRoutes.size(); i++) {
            if (!hashSet.contains(Integer.valueOf(i))) {
                streamingCallback.onRecordReceived(keysForRoutes.get(i).getKey(), (Object) null);
            }
        }
        batchGetRequestContext.markComplete(transportClientResponseForRoute);
    }

    protected RecordDeserializer<MultiGetResponseRecordV1> getMultiGetResponseRecordDeserializer(int i) {
        return FastSerializerDeserializerFactory.getFastAvroSpecificDeserializer(MultiGetResponseRecordV1.SCHEMA$, MultiGetResponseRecordV1.class);
    }

    protected RecordDeserializer<V> getDataRecordDeserializer(int i) throws VeniceClientException {
        Schema latestValueSchema = this.metadata.getLatestValueSchema();
        if (latestValueSchema == null) {
            throw new VeniceClientException("Failed to get latest value schema for store: " + this.metadata.getStoreName());
        }
        Schema valueSchema = this.metadata.getValueSchema(i);
        if (valueSchema == null) {
            throw new VeniceClientException("Failed to get writer schema with id: " + i + " from store: " + this.metadata.getStoreName());
        }
        return getValueDeserializer(valueSchema, latestValueSchema);
    }

    protected RecordDeserializer<V> getValueDeserializer(Schema schema, Schema schema2) {
        return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(schema, schema2);
    }

    private <T> T tryToDeserialize(RecordDeserializer<T> recordDeserializer, ByteBuffer byteBuffer, int i, K k) {
        return (T) AbstractAvroStoreClient.tryToDeserializeWithVerboseLogging(recordDeserializer, byteBuffer, i, k, this.keySerializer, this.metadata, LOGGER);
    }

    private ByteBuffer decompressRecord(CompressionStrategy compressionStrategy, ByteBuffer byteBuffer, int i, VeniceCompressor veniceCompressor) {
        try {
            if (veniceCompressor == null) {
                throw new VeniceClientException(String.format("Expected to find compressor in metadata but found null, compressionStrategy:%s, store:%s, version:%d", compressionStrategy, getStoreName(), Integer.valueOf(i)));
            }
            return veniceCompressor.decompress(byteBuffer);
        } catch (Exception e) {
            throw new VeniceClientException(String.format("Unable to decompress the record, compressionStrategy:%s store:%s version:%d", compressionStrategy, getStoreName(), Integer.valueOf(i)), e);
        }
    }

    private byte[] serializeMultiGetRequest(List<BatchGetRequestContext.KeyInfo<K>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (int i = 0; i < list.size(); i++) {
            BatchGetRequestContext.KeyInfo<K> keyInfo = list.get(i);
            MultiGetRouterRequestKeyV1 multiGetRouterRequestKeyV1 = new MultiGetRouterRequestKeyV1();
            multiGetRouterRequestKeyV1.keyBytes = ByteBuffer.wrap(this.keySerializer.serialize(keyInfo.getKey()));
            multiGetRouterRequestKeyV1.keyIndex = i;
            multiGetRouterRequestKeyV1.partitionId = keyInfo.getPartitionId();
            arrayList.add(multiGetRouterRequestKeyV1);
        }
        return this.multiGetSerializer.serializeObjects(arrayList);
    }

    private long getLatencyInNS(long j) {
        return System.nanoTime() - j;
    }

    private void verifyMetadataInitialized() throws VeniceClientException {
        if (!this.metadata.isReady()) {
            throw new VeniceClientException(this.metadata.getStoreName() + " metadata is not ready, attempting to re-initialize");
        }
        if (this.keySerializer == null) {
            this.keySerializer = getKeySerializer(getKeySchema());
        }
    }

    public void start() throws VeniceClientException {
        this.metadata.start();
        this.multiGetSerializer = FastSerializerDeserializerFactory.getAvroGenericSerializer(MultiGetRouterRequestKeyV1.SCHEMA$);
    }

    protected RecordSerializer getKeySerializer(Schema schema) {
        return FastSerializerDeserializerFactory.getAvroGenericSerializer(schema);
    }

    public void close() {
        try {
            this.metadata.close();
        } catch (Exception e) {
            throw new VeniceClientException("Failed to close store metadata", e);
        }
    }

    public String getStoreName() {
        return this.metadata.getStoreName();
    }

    public Schema getKeySchema() {
        return this.metadata.getKeySchema();
    }

    public Schema getLatestValueSchema() {
        return this.metadata.getLatestValueSchema();
    }
}
