package com.linkedin.venice.fastclient;

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.client.store.streaming.VeniceResponseCompletableFuture;
import com.linkedin.venice.client.store.streaming.VeniceResponseMapImpl;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient.class */
public class RetriableAvroGenericStoreClient<K, V> extends DelegatingAvroStoreClient<K, V> {
    private final boolean longTailRetryEnabledForSingleGet;
    private final boolean longTailRetryEnabledForBatchGet;
    private final int longTailRetryThresholdForSingleGetInMicroSeconds;
    private final int longTailRetryThresholdForBatchGetInMicroSeconds;
    private TimeoutProcessor timeoutProcessor;
    private static final Logger LOGGER = LogManager.getLogger(RetriableAvroGenericStoreClient.class);

    /* loaded from: input_file:com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient$RetryRunnable.class */
    class RetryRunnable implements Runnable {
        private final GetRequestContext requestContext;
        private final RetryType retryType;
        private final Runnable retryTask;

        RetryRunnable(GetRequestContext getRequestContext, RetryType retryType, Runnable runnable) {
            this.requestContext = getRequestContext;
            this.retryType = retryType;
            this.retryTask = runnable;
        }

        @Override // java.lang.Runnable
        public void run() {
            switch (this.retryType) {
                case LONG_TAIL_RETRY:
                    this.requestContext.longTailRetryRequestTriggered = true;
                    break;
                case ERROR_RETRY:
                    this.requestContext.errorRetryRequestTriggered = true;
                    break;
                default:
                    throw new VeniceClientException("Unknown retry type: " + this.retryType);
            }
            this.retryTask.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/fastclient/RetriableAvroGenericStoreClient$RetryType.class */
    public enum RetryType {
        LONG_TAIL_RETRY,
        ERROR_RETRY
    }

    public RetriableAvroGenericStoreClient(InternalAvroStoreClient<K, V> internalAvroStoreClient, ClientConfig clientConfig) {
        super(internalAvroStoreClient);
        if (!clientConfig.isLongTailRetryEnabledForSingleGet() && !clientConfig.isLongTailRetryEnabledForBatchGet()) {
            throw new VeniceException("Long tail retry is not enabled");
        }
        this.longTailRetryEnabledForSingleGet = clientConfig.isLongTailRetryEnabledForSingleGet();
        this.longTailRetryEnabledForBatchGet = clientConfig.isLongTailRetryEnabledForBatchGet();
        this.longTailRetryThresholdForSingleGetInMicroSeconds = clientConfig.getLongTailRetryThresholdForSingleGetInMicroSeconds();
        this.longTailRetryThresholdForBatchGetInMicroSeconds = clientConfig.getLongTailRetryThresholdForBatchGetInMicroSeconds();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.fastclient.DelegatingAvroStoreClient, com.linkedin.venice.fastclient.InternalAvroStoreClient
    public CompletableFuture<V> get(GetRequestContext getRequestContext, K k) throws VeniceClientException {
        CompletableFuture<V> completableFuture = super.get(getRequestContext, k);
        if (!this.longTailRetryEnabledForSingleGet) {
            return completableFuture;
        }
        if (this.timeoutProcessor == null) {
            this.timeoutProcessor = getRequestContext.instanceHealthMonitor.getTimeoutProcessor();
        }
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture<V> completableFuture3 = new CompletableFuture<>();
        Runnable runnable = () -> {
            super.get(getRequestContext, k).whenComplete((obj, th) -> {
                if (th != null) {
                    completableFuture2.completeExceptionally(th);
                    return;
                }
                completableFuture2.complete(obj);
                if (completableFuture3.isDone()) {
                    return;
                }
                getRequestContext.retryWin = true;
                completableFuture3.complete(obj);
            });
        };
        TimeoutProcessor.TimeoutFuture schedule = this.timeoutProcessor.schedule(new RetryRunnable(getRequestContext, RetryType.LONG_TAIL_RETRY, runnable), this.longTailRetryThresholdForSingleGetInMicroSeconds, TimeUnit.MICROSECONDS);
        completableFuture.whenComplete((obj, th) -> {
            if (th != null) {
                if (schedule.isDone()) {
                    return;
                }
                schedule.cancel();
                new RetryRunnable(getRequestContext, RetryType.ERROR_RETRY, runnable).run();
                return;
            }
            if (!schedule.isDone()) {
                schedule.cancel();
            }
            if (completableFuture3.complete(obj)) {
                getRequestContext.retryWin = false;
            }
        });
        CompletableFuture.allOf(completableFuture, completableFuture2).whenComplete((r6, th2) -> {
            if (completableFuture.isCompletedExceptionally() && completableFuture2.isCompletedExceptionally()) {
                completableFuture3.completeExceptionally(th2);
            }
        });
        return completableFuture3;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.fastclient.DelegatingAvroStoreClient, com.linkedin.venice.fastclient.InternalAvroStoreClient
    public CompletableFuture<Map<K, V>> batchGet(final BatchGetRequestContext<K, V> batchGetRequestContext, Set<K> set) throws VeniceClientException {
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
        final VeniceResponseCompletableFuture veniceResponseCompletableFuture = new VeniceResponseCompletableFuture(() -> {
            return new VeniceResponseMapImpl(veniceConcurrentHashMap, concurrentLinkedQueue, false);
        }, set.size(), Optional.empty());
        streamingBatchGet(batchGetRequestContext, set, new StreamingCallback<K, V>() { // from class: com.linkedin.venice.fastclient.RetriableAvroGenericStoreClient.1
            public void onRecordReceived(K k, V v) {
                if (v == null) {
                    concurrentLinkedQueue.add(k);
                } else {
                    veniceConcurrentHashMap.put(k, v);
                }
            }

            public void onCompletion(Optional<Exception> optional) {
                batchGetRequestContext.complete();
                if (optional.isPresent()) {
                    veniceResponseCompletableFuture.completeExceptionally(optional.get());
                } else {
                    veniceResponseCompletableFuture.complete(new VeniceResponseMapImpl(veniceConcurrentHashMap, concurrentLinkedQueue, true));
                }
            }
        });
        CompletableFuture<Map<K, V>> completableFuture = new CompletableFuture<>();
        veniceResponseCompletableFuture.whenComplete((veniceResponseMap, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            if (veniceResponseMap.isFullResponse()) {
                completableFuture.complete(veniceResponseMap);
            } else if (batchGetRequestContext.getPartialResponseException().isPresent()) {
                completableFuture.completeExceptionally(new VeniceClientException("Response was not complete", batchGetRequestContext.getPartialResponseException().get()));
            } else {
                completableFuture.completeExceptionally(new VeniceClientException("Response was not complete"));
            }
        });
        return completableFuture;
    }

    @Override // com.linkedin.venice.fastclient.DelegatingAvroStoreClient, com.linkedin.venice.fastclient.InternalAvroStoreClient
    public void streamingBatchGet(BatchGetRequestContext<K, V> batchGetRequestContext, Set<K> set, StreamingCallback<K, V> streamingCallback) throws VeniceClientException {
        if (!this.longTailRetryEnabledForBatchGet) {
            super.streamingBatchGet(batchGetRequestContext, set, streamingCallback);
            return;
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AtomicReference<Exception> atomicReference = new AtomicReference<>();
        VeniceConcurrentHashMap<K, CompletableFuture<V>> veniceConcurrentHashMap = new VeniceConcurrentHashMap<>();
        for (K k : set) {
            CompletableFuture completableFuture2 = new CompletableFuture();
            completableFuture2.whenComplete((obj, th) -> {
                streamingCallback.onRecordReceived(k, obj);
            });
            veniceConcurrentHashMap.put(k, completableFuture2);
        }
        super.streamingBatchGet(batchGetRequestContext, set, getStreamingCallback(completableFuture, atomicReference, veniceConcurrentHashMap, batchGetRequestContext.numberOfKeysCompletedInOriginalRequest));
        if (this.timeoutProcessor == null) {
            this.timeoutProcessor = batchGetRequestContext.instanceHealthMonitor.getTimeoutProcessor();
        }
        TimeoutProcessor.TimeoutFuture schedule = this.timeoutProcessor.schedule(() -> {
            if (veniceConcurrentHashMap.isEmpty()) {
                LOGGER.debug("Retry triggered with no incomplete keys. Ignoring.");
                return;
            }
            batchGetRequestContext.longTailRetryTriggered = true;
            batchGetRequestContext.numberOfKeysSentInRetryRequest = veniceConcurrentHashMap.size();
            LOGGER.debug("Retrying {} incomplete keys ", Integer.valueOf(veniceConcurrentHashMap.size()));
            BatchGetRequestContext<K, V> batchGetRequestContext2 = new BatchGetRequestContext<>();
            batchGetRequestContext2.setRoutesForPartitionMapping(batchGetRequestContext.getRoutesForPartitionMapping());
            super.streamingBatchGet(batchGetRequestContext2, Collections.unmodifiableSet(veniceConcurrentHashMap.keySet()), getStreamingCallback(completableFuture, atomicReference, veniceConcurrentHashMap, batchGetRequestContext.numberOfKeysCompletedInRetryRequest));
        }, this.longTailRetryThresholdForBatchGetInMicroSeconds, TimeUnit.MICROSECONDS);
        completableFuture.whenComplete((r8, th2) -> {
            if (!schedule.isDone()) {
                schedule.cancel();
            }
            if (th2 == null) {
                streamingCallback.onCompletion(Optional.empty());
            } else {
                streamingCallback.onCompletion(Optional.of(new VeniceClientException("Request failed with exception ", th2)));
            }
        });
    }

    private StreamingCallback<K, V> getStreamingCallback(final CompletableFuture<Void> completableFuture, final AtomicReference<Exception> atomicReference, final VeniceConcurrentHashMap<K, CompletableFuture<V>> veniceConcurrentHashMap, final AtomicInteger atomicInteger) {
        return new StreamingCallback<K, V>() { // from class: com.linkedin.venice.fastclient.RetriableAvroGenericStoreClient.2
            public void onRecordReceived(K k, V v) {
                CompletableFuture completableFuture2 = (CompletableFuture) veniceConcurrentHashMap.remove(k);
                if (completableFuture2 != null) {
                    completableFuture2.complete(v);
                    atomicInteger.incrementAndGet();
                }
                if (!veniceConcurrentHashMap.isEmpty() || completableFuture.isDone()) {
                    return;
                }
                completableFuture.complete(null);
            }

            public void onCompletion(Optional<Exception> optional) {
                if (completableFuture.isDone()) {
                    return;
                }
                if (!optional.isPresent()) {
                    completableFuture.complete(null);
                } else {
                    if (atomicReference.compareAndSet(null, optional.get())) {
                        return;
                    }
                    completableFuture.completeExceptionally(optional.get());
                }
            }
        };
    }
}
