package com.linkedin.alpini.router;

import com.linkedin.alpini.base.concurrency.AsyncFuture;
import com.linkedin.alpini.base.concurrency.AsyncFutureListener;
import com.linkedin.alpini.base.concurrency.AsyncPromise;
import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.concurrency.NamedThreadFactory;
import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.alpini.base.concurrency.impl.CancelledAsyncFuture;
import com.linkedin.alpini.base.concurrency.impl.SuccessAsyncFuture;
import com.linkedin.alpini.base.misc.BasicRequest;
import com.linkedin.alpini.base.misc.ExceptionWithStatus;
import com.linkedin.alpini.base.misc.HeaderNames;
import com.linkedin.alpini.base.misc.Headers;
import com.linkedin.alpini.base.misc.Http2TooManyStreamsException;
import com.linkedin.alpini.base.misc.Metrics;
import com.linkedin.alpini.base.misc.Msg;
import com.linkedin.alpini.base.misc.Time;
import com.linkedin.alpini.base.misc.TimeValue;
import com.linkedin.alpini.netty4.misc.Http2Utils;
import com.linkedin.alpini.router.api.HostHealthMonitor;
import com.linkedin.alpini.router.api.MetricNames;
import com.linkedin.alpini.router.api.ResourcePath;
import com.linkedin.alpini.router.api.ResourcePathParser;
import com.linkedin.alpini.router.api.RouterException;
import com.linkedin.alpini.router.api.RouterTimeoutProcessor;
import com.linkedin.alpini.router.api.Scatter;
import com.linkedin.alpini.router.api.ScatterGatherHelper;
import com.linkedin.alpini.router.api.ScatterGatherRequest;
import com.linkedin.alpini.router.monitoring.ScatterGatherStats;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.concurrent.EventExecutor;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/linkedin/alpini/router/ScatterGatherRequestHandlerImpl.class */
public abstract class ScatterGatherRequestHandlerImpl<H, P extends ResourcePath<K>, K, R, CHC, BHS extends BasicRequest, HR, HRS, SCATTER_GATHER_HELPER extends ScatterGatherHelper<H, P, K, R, BHS, HR, HRS>> extends ScatterGatherRequestHandler<H, P, K, R> {

    @Nonnull
    private final SCATTER_GATHER_HELPER _scatterGatherHelper;
    private static final ThreadFactory CANCEL_FACTORY = new NamedThreadFactory("scatterGather-cancel");
    private static final Semaphore CANCEL_CONCURRENCY = new Semaphore(4);
    private static final ThreadLocal<Executor> CANCEL_EXECUTOR = ThreadLocal.withInitial(() -> {
        return new Executor() { // from class: com.linkedin.alpini.router.ScatterGatherRequestHandlerImpl.1
            private final Executor _executor = Executors.newSingleThreadExecutor(ScatterGatherRequestHandlerImpl.CANCEL_FACTORY);

            @Override // java.util.concurrent.Executor
            public void execute(@Nonnull Runnable runnable) {
                this._executor.execute(() -> {
                    ScatterGatherRequestHandlerImpl.CANCEL_CONCURRENCY.acquireUninterruptibly();
                    try {
                        runnable.run();
                        ScatterGatherRequestHandlerImpl.CANCEL_CONCURRENCY.release();
                    } catch (Throwable th) {
                        ScatterGatherRequestHandlerImpl.CANCEL_CONCURRENCY.release();
                        throw th;
                    }
                });
            }
        };
    });
    private static final AsyncFuture<?> COMPLETED = AsyncFuture.success(null);

    /* JADX INFO: Access modifiers changed from: protected */
    public ScatterGatherRequestHandlerImpl(@Nonnull SCATTER_GATHER_HELPER scatter_gather_helper, @Nonnull TimeoutProcessor timeoutProcessor) {
        super(timeoutProcessor);
        this._scatterGatherHelper = (SCATTER_GATHER_HELPER) Objects.requireNonNull(scatter_gather_helper, "scatterGatherHelper");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ScatterGatherRequestHandlerImpl(@Nonnull SCATTER_GATHER_HELPER scatter_gather_helper, @Nonnull RouterTimeoutProcessor routerTimeoutProcessor) {
        super(routerTimeoutProcessor);
        this._scatterGatherHelper = (SCATTER_GATHER_HELPER) Objects.requireNonNull(scatter_gather_helper, "scatterGatherHelper");
    }

    @Override // com.linkedin.alpini.router.ScatterGatherRequestHandler
    @Nonnull
    public final SCATTER_GATHER_HELPER getScatterGatherHelper() {
        return this._scatterGatherHelper;
    }

    public static <M extends Enum<M>> void setMetric(Metrics metrics, @Nonnull M m, @Nonnull Supplier<TimeValue> supplier) {
        if (metrics != null) {
            metrics.setMetric(m.name(), supplier.get());
        }
    }

    protected abstract Runnable timeout(CHC chc, Runnable runnable);

    /* JADX WARN: Multi-variable type inference failed */
    protected Runnable timeout(CHC chc, String str, AsyncPromise<Void> asyncPromise) {
        return timeout(chc, str, asyncPromise, null);
    }

    protected <T> Runnable timeout(CHC chc, String str, AsyncPromise<T> asyncPromise, T t) {
        return timeout(chc, () -> {
            LOG.debug(str);
            asyncPromise.setSuccess(t);
        });
    }

    protected abstract Executor executor(CHC chc);

    private Executor stageExecutor(CHC chc) {
        return stageExecutor(executor(chc));
    }

    private Executor stageExecutor(Executor executor) {
        if (!(executor instanceof EventExecutor)) {
            return executor;
        }
        EventExecutor eventExecutor = (EventExecutor) executor;
        return runnable -> {
            if (eventExecutor.inEventLoop()) {
                runnable.run();
            } else {
                eventExecutor.execute(runnable);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setFailure(AsyncPromise<?> asyncPromise, Throwable th, String str) {
        if (asyncPromise.setFailure(th)) {
            return;
        }
        LOG.warn("{}", str, th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nonnull
    public AsyncFuture<HR> handler(@Nonnull CHC chc, @Nonnull BHS bhs) throws Exception {
        AsyncPromise deferred = AsyncFuture.deferred(false);
        try {
            LOG.debug("[{}] handler", bhs.getRequestId());
            Metrics initializeMetrics = this._scatterGatherHelper.initializeMetrics(bhs);
            CompletableFuture.completedFuture(retainRequest(bhs)).thenCompose(basicRequest -> {
                return handler0(chc, initializeMetrics, basicRequest);
            }).exceptionally(th -> {
                try {
                    HR handlerException = handlerException(bhs, initializeMetrics, th);
                    LOG.debug("after handlerException, released={}", Boolean.valueOf(releaseRequest(bhs)));
                    return handlerException;
                } catch (Throwable th) {
                    LOG.debug("after handlerException, released={}", Boolean.valueOf(releaseRequest(bhs)));
                    throw th;
                }
            }).whenComplete((obj, th2) -> {
                if (obj == 0 || !deferred.setSuccess(obj)) {
                    if (th2 != null) {
                        setFailure(deferred, unwrapCompletion(th2), "unexpected exception");
                        return;
                    }
                    if (obj != 0) {
                        releaseResponse(obj);
                    }
                    if (deferred.isDone()) {
                        return;
                    }
                    setFailure(deferred, new IllegalStateException(), "unexpected exception");
                }
            });
        } catch (Throwable th3) {
            setFailure(deferred, unwrapCompletion(th3), "unexpected exception");
        }
        return deferred;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Throwable unwrapCompletion(Throwable th) {
        while ((th instanceof CompletionException) && th.getCause() != null) {
            th = th.getCause();
        }
        return th;
    }

    private HR handlerException(@Nonnull BHS bhs, Metrics metrics, @Nonnull Throwable th) {
        LOG.debug("[{}] handlerException", bhs.getRequestId(), th);
        return (HR) this._scatterGatherHelper.aggregateResponse(bhs, metrics, Collections.singletonList(buildExceptionResponse(bhs, th)), this::buildResponse);
    }

    @Nonnull
    private P parseResourceUri(@Nonnull String str, @Nonnull BHS bhs) {
        try {
            return (P) this._scatterGatherHelper.parseResourceUri(str, bhs);
        } catch (RouterException e) {
            LOG.debug("Failed to parse URI: {}", str, e);
            throw new CompletionException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nonnull
    public CompletionStage<Scatter<H, P, K>> scatter(@Nonnull String str, @Nonnull P p, @Nonnull Headers headers, @Nonnull HostHealthMonitor<H> hostHealthMonitor, Metrics metrics, String str2) {
        try {
            return getScatterGatherHelper().scatter(str, p, headers, hostHealthMonitor, metrics, str2);
        } catch (RouterException e) {
            LOG.debug("Exception in scatter", (Throwable) e);
            throw new CompletionException(e);
        }
    }

    @Nonnull
    private CompletionStage<HR> handler0(@Nonnull CHC chc, Metrics metrics, @Nonnull BHS bhs) {
        HostHealthMonitor<H> hostHealthMonitor;
        long nanoTime = Time.nanoTime();
        P parseResourceUri = parseResourceUri(bhs.getUri(), bhs);
        long nanoTime2 = Time.nanoTime();
        if (metrics != null) {
            metrics.setPath(parseResourceUri);
        }
        ScatterGatherStats scatterGatherStatsByPath = this._scatterGatherHelper.getScatterGatherStatsByPath(parseResourceUri);
        Objects.requireNonNull(scatterGatherStatsByPath);
        ScatterGatherStats.Delta delta = new ScatterGatherStats.Delta();
        delta.incrementTotalRequestsReceived();
        long requestTimestamp = bhs.getRequestTimestamp() + this._scatterGatherHelper.getRequestTimeout(bhs.getRequestHeaders());
        AsyncFuture<LongSupplier> longTailRetryMilliseconds = this._scatterGatherHelper.getLongTailRetryMilliseconds(parseResourceUri, bhs);
        long nanoTime3 = Time.nanoTime();
        if (this._scatterGatherHelper.isReqRedirectionAllowedForQuery() || !isQueryRequest(bhs.getUri())) {
            SCATTER_GATHER_HELPER scatter_gather_helper = this._scatterGatherHelper;
            Objects.requireNonNull(scatter_gather_helper);
            hostHealthMonitor = scatter_gather_helper::isHostHealthy;
        } else {
            hostHealthMonitor = (obj, str) -> {
                return true;
            };
        }
        return (CompletionStage<HR>) scatter(bhs.getMethodName(), parseResourceUri, bhs.getRequestHeaders(), hostHealthMonitor, metrics, null).thenComposeAsync(scatter -> {
            return handler1(chc, metrics, bhs, requestTimestamp, delta, longTailRetryMilliseconds, nanoTime2, nanoTime, nanoTime3, scatter);
        }, stageExecutor((ScatterGatherRequestHandlerImpl<H, P, K, R, CHC, BHS, HR, HRS, SCATTER_GATHER_HELPER>) chc));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Nonnull
    private CompletionStage<HR> handler1(@Nonnull CHC chc, Metrics metrics, @Nonnull BHS bhs, long j, @Nonnull ScatterGatherStats.Delta delta, @Nonnull AsyncFuture<LongSupplier> asyncFuture, long j2, long j3, long j4, @Nonnull Scatter<H, P, K> scatter) {
        AsyncPromise deferred;
        AsyncPromise deferred2;
        Runnable runnable;
        long nanoTime = Time.nanoTime();
        setMetric(metrics, MetricNames.ROUTER_PARSE_URI, () -> {
            return new TimeValue(j2 - j3, TimeUnit.NANOSECONDS);
        });
        setMetric(metrics, MetricNames.ROUTER_ROUTING_TIME, () -> {
            return new TimeValue(nanoTime - j4, TimeUnit.NANOSECONDS);
        });
        boolean z = !asyncFuture.isDone() || asyncFuture.isSuccess();
        long dispatchMinimumMillis = this._scatterGatherHelper.getDispatchMinimumMillis();
        long currentTimeMillis = j - Time.currentTimeMillis();
        long currentTimeMillis2 = Time.currentTimeMillis() - bhs.getRequestTimestamp();
        if (currentTimeMillis2 > 100) {
            LOG.debug("unexpected lag from receive to handler: {}, {}, requestTimeoutMillis: {}", bhs.getMethodName(), Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis));
        }
        if (currentTimeMillis <= dispatchMinimumMillis) {
            LOG.debug("requestTimeoutMillis={} dispatchMinimumMillis={}, requestDeadline={}", Long.valueOf(currentTimeMillis), Long.valueOf(dispatchMinimumMillis), Long.valueOf(j));
            deferred = new SuccessAsyncFuture(null);
            deferred2 = CancelledAsyncFuture.getInstance();
            runnable = NOP;
        } else {
            deferred = AsyncFuture.deferred(true);
            deferred2 = AsyncFuture.deferred(true);
            RouterTimeoutProcessor.TimeoutFuture schedule = this._timeoutProcessor.schedule(timeout(chc, "request timeout", deferred), currentTimeMillis - dispatchMinimumMillis, TimeUnit.MILLISECONDS);
            Objects.requireNonNull(schedule);
            runnable = schedule::cancel;
        }
        Executor executor = executor(chc);
        ArrayList arrayList = new ArrayList(scatter.getOnlineRequestCount() + scatter.getOfflineRequestCount());
        appendErrorsForOfflinePartitions(bhs, scatter, arrayList);
        for (ScatterGatherRequest<H, K> scatterGatherRequest : scatter.getOnlineRequests()) {
            P pathFor = scatter.pathFor(scatterGatherRequest);
            AsyncPromise deferred3 = AsyncFuture.deferred(false);
            AsyncPromise deferred4 = AsyncFuture.deferred(false);
            arrayList.add(deferred4);
            AsyncPromise cancelledAsyncFuture = (!z || deferred.isDone()) ? CancelledAsyncFuture.getInstance() : AsyncFuture.deferred(false);
            if (AsyncFuture.getStatus(cancelledAsyncFuture) != AsyncFuture.Status.INFLIGHT || deferred.isDone()) {
                LOG.debug("handler1 before dispatch(...): retryableRequest: {}, timeoutFuture: {}, retryFuture: {}", Boolean.valueOf(z), AsyncFuture.getStatus(deferred), AsyncFuture.getStatus(cancelledAsyncFuture));
            }
            dispatch(scatter, scatterGatherRequest, pathFor, bhs, deferred3, deferred4, cancelledAsyncFuture, deferred, executor);
            if (!cancelledAsyncFuture.isCancelled()) {
                if (!asyncFuture.isDone() || (asyncFuture.isSuccess() && asyncFuture.getNow().getAsLong() >= 0)) {
                    deferred2.addListener(cancelledAsyncFuture);
                }
                deferred4.addListener(asyncFuture2 -> {
                    if (cancelledAsyncFuture.isDone()) {
                        return;
                    }
                    cancelledAsyncFuture.setSuccess(statusOf(200));
                });
                SCATTER_GATHER_HELPER scatter_gather_helper = this._scatterGatherHelper;
                Objects.requireNonNull(scatter_gather_helper);
                cancelledAsyncFuture.addListener(prepareRetry(deferred3, pathFor, bhs, scatter.getRole(), deferred4, deferred, executor, scatter_gather_helper::isHostHealthy, delta, metrics));
            }
        }
        delta.incrementFanoutRequestsSent(scatter.getOnlineRequestCount());
        releaseRequest(bhs);
        return gatherResponses(chc, bhs, arrayList, asyncFuture, metrics, delta, j, runnable, deferred2);
    }

    CompletableFuture<HR> gatherResponses(CHC chc, @Nonnull BHS bhs, List<AsyncFuture<List<HR>>> list, AsyncFuture<LongSupplier> asyncFuture, Metrics metrics, ScatterGatherStats.Delta delta, long j, Runnable runnable, AsyncPromise<HRS> asyncPromise) {
        LOG.debug("[{}] gatherResponses", bhs.getRequestId());
        AsyncFuture collect = AsyncFuture.collect(list, false);
        collect.addListener(asyncFuture2 -> {
            asyncPromise.setFailure(CancelledAsyncFuture.getInstance().getCause());
        });
        if (!collect.isDone() && !asyncPromise.isCancelled() && (!asyncFuture.isDone() || (asyncFuture.isSuccess() && asyncFuture.getNow().getAsLong() > 0))) {
            long longTailMinimumMillis = this._scatterGatherHelper.getLongTailMinimumMillis();
            asyncFuture.addListener(asyncFuture3 -> {
                if (!asyncFuture3.isSuccess() || collect.isDone()) {
                    return;
                }
                long asLong = ((LongSupplier) asyncFuture3.getNow()).getAsLong();
                if (asLong < 0) {
                    return;
                }
                long requestTimestamp = bhs.getRequestTimestamp() + Math.max(asLong, longTailMinimumMillis);
                if (requestTimestamp >= j - this._scatterGatherHelper.getDispatchMinimumMillis()) {
                    return;
                }
                RouterTimeoutProcessor.TimeoutFuture schedule = this._timeoutProcessor.schedule(timeout(chc, "long tail timeout", asyncPromise, gatewayTimeout()), Math.max(1L, requestTimestamp - Time.currentTimeMillis()), TimeUnit.MILLISECONDS);
                collect.addListener(asyncFuture3 -> {
                    Executor executor = CANCEL_EXECUTOR.get();
                    Objects.requireNonNull(schedule);
                    executor.execute(schedule::cancel);
                });
            });
        }
        CompletableFuture<HR> completableFuture = new CompletableFuture<>();
        long requestNanos = bhs.getRequestNanos();
        long nanoTime = Time.nanoTime();
        collect.whenCompleteAsync((list2, th) -> {
            try {
                try {
                    CANCEL_EXECUTOR.get().execute(runnable);
                    long nanoTime2 = Time.nanoTime();
                    setMetric(metrics, MetricNames.ROUTER_SERVER_TIME, () -> {
                        return new TimeValue(nanoTime2 - requestNanos, TimeUnit.NANOSECONDS);
                    });
                    setMetric(metrics, MetricNames.ROUTER_RESPONSE_WAIT_TIME, () -> {
                        return new TimeValue(nanoTime2 - nanoTime, TimeUnit.NANOSECONDS);
                    });
                    if (completableFuture.isDone()) {
                        LOG.debug("[{}] response discarded", bhs.getRequestId());
                        releaseResponses((List) collect.getNow());
                    } else if (th == null) {
                        responseComplete(completableFuture, this._scatterGatherHelper.aggregateResponse(bhs, metrics, list2, this::buildResponse));
                    } else {
                        responseComplete(completableFuture, handlerException(bhs, metrics, th));
                    }
                    delta.apply();
                } catch (Throwable th) {
                    if (completableFuture.isCompletedExceptionally()) {
                        LOG.warn("unhandled double exception: {} {}", bhs.getMethodName(), bhs.getUri(), th);
                        releaseResponses((List) collect.getNow());
                        delta.apply();
                        return;
                    }
                    if (completableFuture.isDone()) {
                        LOG.warn("unhandled exception: {} {}", bhs.getMethodName(), bhs.getUri(), th);
                        if (!completableFuture.isCompletedExceptionally()) {
                            delta.apply();
                            return;
                        }
                    } else {
                        responseComplete(completableFuture, handlerException(bhs, metrics, th));
                    }
                    releaseResponses((List) collect.getNow());
                    delta.apply();
                }
            } catch (Throwable th2) {
                delta.apply();
                throw th2;
            }
        }, stageExecutor((ScatterGatherRequestHandlerImpl<H, P, K, R, CHC, BHS, HR, HRS, SCATTER_GATHER_HELPER>) chc));
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseResponses(List<HR> list) {
        if (list != null) {
            list.forEach(this::releaseResponse);
        }
    }

    private void responseComplete(CompletableFuture<HR> completableFuture, @Nonnull HR hr) {
        if (completableFuture.complete(hr)) {
            return;
        }
        releaseResponse(hr);
    }

    private boolean isQueryRequest(String str) {
        int indexOf = str.indexOf(63);
        if (indexOf < 0) {
            return false;
        }
        return str.regionMatches(indexOf, "?query=", 0, 7) || str.indexOf("&query=", indexOf) > 0;
    }

    protected abstract HRS statusOf(int i);

    protected abstract HRS multiStatus();

    protected abstract HRS badRequest();

    protected abstract HRS gatewayTimeout();

    protected abstract HRS tooManyRequests();

    protected abstract HRS serviceUnavailable();

    protected abstract HRS internalServerError();

    protected abstract boolean isSuccessStatus(HRS hrs);

    protected abstract boolean isRequestRetriable(P p, R r, HRS hrs);

    protected abstract boolean isServiceUnavailable(HRS hrs);

    protected abstract String getReasonPhrase(HRS hrs);

    /* JADX INFO: Access modifiers changed from: protected */
    @Nonnull
    public BHS prepareRequest(@Nonnull BHS bhs) {
        return retainRequest(bhs);
    }

    @Nonnull
    protected BHS retainRequest(@Nonnull BHS bhs) {
        return bhs;
    }

    protected boolean releaseRequest(BHS bhs) {
        return false;
    }

    protected boolean releaseResponse(HR hr) {
        return false;
    }

    protected boolean isLastAttempt(HRS hrs) {
        return !gatewayTimeout().equals(hrs);
    }

    @Nonnull
    protected AsyncFutureListener<HRS> prepareRetry(@Nonnull final AsyncFuture<H> asyncFuture, @Nonnull final P p, @Nonnull BHS bhs, @Nonnull final R r, @Nonnull final AsyncPromise<List<HR>> asyncPromise, @Nonnull final AsyncFuture<Void> asyncFuture2, @Nonnull final Executor executor, @Nonnull final HostHealthMonitor<H> hostHealthMonitor, @Nonnull final ScatterGatherStats.Delta delta, final Metrics metrics) {
        final BHS retainRequest = retainRequest(bhs);
        final Executor stageExecutor = stageExecutor(executor);
        return new AsyncFutureListener<HRS>() { // from class: com.linkedin.alpini.router.ScatterGatherRequestHandlerImpl.2
            @Override // com.linkedin.alpini.base.concurrency.AsyncFutureListener
            public void operationComplete(AsyncFuture<HRS> asyncFuture3) {
                CompletableFuture<Void> thenAcceptAsync = CompletableFuture.completedFuture(asyncFuture3).thenAcceptAsync(this::operationComplete0, executor);
                AsyncPromise asyncPromise2 = asyncPromise;
                thenAcceptAsync.exceptionally(th -> {
                    ScatterGatherRequestHandlerImpl.setFailure(asyncPromise2, th, "unhandled exception");
                    return null;
                });
            }

            /* JADX WARN: Multi-variable type inference failed */
            private void operationComplete0(AsyncFuture<HRS> asyncFuture3) {
                if (asyncPromise.isDone() || (asyncFuture3.isSuccess() && ScatterGatherRequestHandlerImpl.this.isSuccessStatus(asyncFuture3.getNow()))) {
                    ScatterGatherRequestHandlerImpl.this.releaseRequest(retainRequest);
                    return;
                }
                try {
                    prepareRetry(asyncFuture3);
                } catch (Throwable th) {
                    ScatterGatherRequestHandlerImpl.setFailure(asyncPromise, th, "Exception when trying to perform retry");
                }
            }

            private HRS extractResponseStatus(Throwable th) {
                HRS hrs = (HRS) ScatterGatherRequestHandlerImpl.this.internalServerError();
                if (th != null) {
                    Class<?> cls = hrs.getClass();
                    while (!(th instanceof ExceptionWithStatus)) {
                        th = th.getCause();
                        if (th == null) {
                        }
                    }
                    return (HRS) ((ExceptionWithStatus) th).status(cls).orElse(hrs);
                }
                return hrs;
            }

            /* JADX WARN: Multi-variable type inference failed */
            private void prepareRetry(AsyncFuture<HRS> asyncFuture3) {
                HostHealthMonitor hostHealthMonitor2;
                Object now = asyncFuture3.isSuccess() ? asyncFuture3.getNow() : extractResponseStatus(asyncFuture3.getCause());
                boolean isLastAttempt = ScatterGatherRequestHandlerImpl.this.isLastAttempt(now);
                ScatterGatherRequestHandler.LOG.debug("[{}] prepareRetry {} lastAttempt={}", retainRequest.getRequestId(), p.getLocation(), Boolean.valueOf(isLastAttempt));
                boolean isRequestRetriable = ScatterGatherRequestHandlerImpl.this.isRequestRetriable(p, r, now);
                p.setRetryRequest();
                if (ScatterGatherRequestHandlerImpl.this._scatterGatherHelper.isEnableRetryRequestAlwaysUseADifferentHost()) {
                    AsyncFuture asyncFuture4 = asyncFuture;
                    HostHealthMonitor hostHealthMonitor3 = hostHealthMonitor;
                    hostHealthMonitor2 = (obj, str) -> {
                        return !(asyncFuture4.isSuccess() && obj.equals(asyncFuture4.getNow())) && hostHealthMonitor3.isHostHealthy(obj, str);
                    };
                } else {
                    AsyncFuture asyncFuture5 = asyncFuture;
                    HostHealthMonitor hostHealthMonitor4 = hostHealthMonitor;
                    hostHealthMonitor2 = (obj2, str2) -> {
                        return (asyncFuture5.isSuccess() && obj2.equals(asyncFuture5.getNow())) || hostHealthMonitor4.isHostHealthy(obj2, str2);
                    };
                }
                HostHealthMonitor hostHealthMonitor5 = hostHealthMonitor2;
                String obj3 = asyncFuture.isSuccess() ? asyncFuture.getNow().toString() : null;
                ScatterGatherRequestHandler.LOG.debug("Prepare Retry for request with initial host: {}", obj3);
                CompletionStage scatter = ScatterGatherRequestHandlerImpl.this.scatter(retainRequest.getMethodName(), p, retainRequest.getRequestHeaders(), hostHealthMonitor5, metrics, obj3);
                AsyncPromise asyncPromise2 = asyncPromise;
                BasicRequest basicRequest = retainRequest;
                AsyncFuture asyncFuture6 = asyncFuture;
                AsyncFuture asyncFuture7 = asyncFuture2;
                ScatterGatherStats.Delta delta2 = delta;
                ResourcePath resourcePath = p;
                Executor executor2 = executor;
                Executor executor3 = stageExecutor;
                scatter.whenCompleteAsync((scatter2, th) -> {
                    if (th != null) {
                        if (isLastAttempt) {
                            ScatterGatherRequestHandlerImpl.setFailure(asyncPromise2, th, "error in scatter");
                        }
                        ScatterGatherRequestHandler.LOG.debug("Retry after scatter, released={}", Boolean.valueOf(ScatterGatherRequestHandlerImpl.this.releaseRequest(basicRequest)));
                        return;
                    }
                    boolean z = scatter2.getOnlineRequestCount() == 0 || !isRequestRetriable;
                    if (!z && asyncFuture6.isSuccess()) {
                        Iterator<ScatterGatherRequest<H, K>> it2 = scatter2.getOnlineRequests().iterator();
                        while (true) {
                            if (!it2.hasNext()) {
                                break;
                            }
                            ScatterGatherRequest<H, K> next = it2.next();
                            if (next.getHosts().size() == 1 && next.getHosts().contains(asyncFuture6.getNow())) {
                                z = true;
                                break;
                            }
                            next.removeHost(asyncFuture6.getNow());
                        }
                    }
                    if (asyncFuture7.isDone() && ScatterGatherRequestHandlerImpl.this._scatterGatherHelper.disableRetryOnTimeout()) {
                        ScatterGatherRequestHandler.LOG.debug("cannot retry: timeoutFuture: {}, retryFuture: {}", AsyncFuture.getStatus(asyncFuture7), now);
                        z = true;
                    }
                    if (z) {
                        CompletionStage completionStage = ScatterGatherRequestHandlerImpl.COMPLETED;
                        if (isLastAttempt) {
                            LinkedList linkedList = new LinkedList();
                            for (ScatterGatherRequest<H, K> scatterGatherRequest : scatter2.getOnlineRequests()) {
                                completionStage = completionStage.thenCompose(obj4 -> {
                                    return ScatterGatherRequestHandlerImpl.this.appendErrorForEveryKey(basicRequest, linkedList, now, ScatterGatherRequestHandlerImpl.this.getReasonPhrase(now), asyncFuture3.getCause(), scatter2.getPathParser(), scatterGatherRequest, scatter2.pathFor(scatterGatherRequest));
                                });
                            }
                            for (ScatterGatherRequest<H, K> scatterGatherRequest2 : scatter2.getOfflineRequests()) {
                                completionStage = completionStage.thenCompose(obj5 -> {
                                    return ScatterGatherRequestHandlerImpl.this.appendErrorForEveryKey(basicRequest, linkedList, ScatterGatherRequestHandlerImpl.this.serviceUnavailable(), ScatterGatherRequestHandlerImpl.this.getReasonPhrase(ScatterGatherRequestHandlerImpl.this.serviceUnavailable()), null, scatter2.getPathParser(), scatterGatherRequest2, scatter2.pathFor(scatterGatherRequest2));
                                });
                            }
                            completionStage.whenComplete((obj6, th) -> {
                                if (th != null) {
                                    ScatterGatherRequestHandlerImpl.setFailure(asyncPromise2, th, "failure building response");
                                }
                                if (asyncPromise2.setSuccess(linkedList)) {
                                    return;
                                }
                                ScatterGatherRequestHandlerImpl.this.releaseResponses(linkedList);
                            });
                        }
                        completionStage.whenComplete((obj7, th2) -> {
                            ScatterGatherRequestHandler.LOG.debug("cannotRetry released={}", Boolean.valueOf(ScatterGatherRequestHandlerImpl.this.releaseRequest(basicRequest)));
                        });
                        return;
                    }
                    ScatterGatherRequestHandlerImpl.this.incrementTotalRetries(delta2, now);
                    delta2.incrementTotalRetriedKeys(resourcePath.getPartitionKeys().size());
                    if (HttpResponseStatus.TOO_MANY_REQUESTS.equals(now)) {
                        ScatterGatherRequestHandler.LOG.info("Long tail retry on TOO_MANY_REQUESTS for initial request {}", basicRequest);
                        delta2.incrementTotalRetriesOn429();
                    }
                    ArrayList arrayList = new ArrayList(scatter2.getOnlineRequestCount() + scatter2.getOfflineRequestCount());
                    try {
                        ScatterGatherRequestHandlerImpl.this.appendErrorsForOfflinePartitions(basicRequest, scatter2, arrayList);
                        for (ScatterGatherRequest<H, K> scatterGatherRequest3 : scatter2.getOnlineRequests()) {
                            ResourcePath pathFor = scatter2.pathFor(scatterGatherRequest3);
                            pathFor.setRetryRequest();
                            AsyncPromise deferred = AsyncFuture.deferred(false);
                            AsyncPromise deferred2 = AsyncFuture.deferred(false);
                            arrayList.add(deferred2);
                            AsyncPromise cancelledAsyncFuture = CancelledAsyncFuture.getInstance();
                            if (asyncFuture7.isDone()) {
                                ScatterGatherRequestHandler.LOG.debug("retry before dispatch(...):, unexpected timeoutFuture: {}, retryFuture: {}", AsyncFuture.getStatus(asyncFuture7), AsyncFuture.getStatus(cancelledAsyncFuture));
                            }
                            ScatterGatherRequestHandlerImpl.this.dispatch(scatter2, scatterGatherRequest3, pathFor, basicRequest, deferred, deferred2, cancelledAsyncFuture, asyncFuture7, executor2);
                        }
                    } finally {
                        ScatterGatherRequestHandler.LOG.debug("Retry after dispatch, released={}", Boolean.valueOf(ScatterGatherRequestHandlerImpl.this.releaseRequest(basicRequest)));
                        AsyncFuture.collect(arrayList, false).whenCompleteAsync((list, th3) -> {
                            if (th3 != null) {
                                ScatterGatherRequestHandlerImpl.this.incrementTotalRetriesError(delta2, now);
                                if (isLastAttempt) {
                                    ScatterGatherRequestHandlerImpl.setFailure(asyncPromise2, th3, "Retry failure");
                                    return;
                                }
                                return;
                            }
                            boolean z2 = !list.isEmpty() && list.stream().allMatch(obj8 -> {
                                return ScatterGatherRequestHandlerImpl.this.isSuccessStatus(ScatterGatherRequestHandlerImpl.this.statusOf(ScatterGatherRequestHandlerImpl.this.getResponseCode(obj8)));
                            });
                            if (!z2) {
                                ScatterGatherRequestHandlerImpl.this.incrementTotalRetriesError(delta2, now);
                            }
                            if ((isLastAttempt || z2) && asyncPromise2.setSuccess(list)) {
                                if (z2) {
                                    ScatterGatherRequestHandlerImpl.this.incrementTotalRetriesWinner(delta2, now);
                                }
                            } else {
                                Stream stream = list.stream();
                                ScatterGatherRequestHandlerImpl scatterGatherRequestHandlerImpl = ScatterGatherRequestHandlerImpl.this;
                                long sum = stream.mapToInt(scatterGatherRequestHandlerImpl::getResponseReadable).sum();
                                ScatterGatherRequestHandlerImpl.this.releaseResponses(list);
                                delta2.incrementTotalRetriesDiscarded(sum);
                                ScatterGatherRequestHandler.LOG.debug("Long tail response discarded, contentBytes={}", Long.valueOf(sum));
                            }
                        }, executor3);
                    }
                }, stageExecutor);
            }
        };
    }

    protected void incrementTotalRetries(ScatterGatherStats.Delta delta, HRS hrs) {
        delta.incrementTotalRetries();
        if (isServiceUnavailable(hrs)) {
            delta.incrementTotalRetriesOn503();
        }
    }

    protected void incrementTotalRetriesError(ScatterGatherStats.Delta delta, HRS hrs) {
        delta.incrementTotalRetriesError();
        if (isServiceUnavailable(hrs)) {
            delta.incrementTotalRetriesOn503Error();
        }
    }

    protected void incrementTotalRetriesWinner(ScatterGatherStats.Delta delta, HRS hrs) {
        delta.incrementTotalRetriesWinner();
        if (isServiceUnavailable(hrs)) {
            delta.incrementTotalRetriesOn503Winner();
        }
    }

    protected abstract int getResponseCode(HR hr);

    protected abstract int getResponseReadable(HR hr);

    protected abstract boolean hasErrorInStorageNodeResponse(HR hr);

    protected abstract Headers getResponseHeaders(HR hr);

    protected abstract void setKeepAlive(HR hr, boolean z);

    @Nonnull
    protected abstract HR buildResponse(@Nonnull BHS bhs, Metrics metrics, @Nonnull List<HR> list);

    /* JADX WARN: Multi-variable type inference failed */
    protected void appendErrorsForOfflinePartitions(@Nonnull BHS bhs, @Nonnull Scatter<H, P, K> scatter, @Nonnull List<AsyncFuture<List<HR>>> list) {
        if (scatter.getOfflineRequestCount() > 0) {
            CompletionStage completionStage = COMPLETED;
            LinkedList linkedList = new LinkedList();
            for (ScatterGatherRequest<H, K> scatterGatherRequest : scatter.getOfflineRequests()) {
                completionStage = completionStage.thenCompose(obj -> {
                    return appendErrorForEveryKey(bhs, linkedList, serviceUnavailable(), "No replica is available for this partition", null, scatter.getPathParser(), scatterGatherRequest, scatter.pathFor(scatterGatherRequest));
                });
            }
            list.add(AsyncFuture.of(completionStage.thenApply(obj2 -> {
                return linkedList;
            }), false));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.util.concurrent.CompletionStage] */
    protected CompletionStage<?> appendErrorForEveryKey(@Nonnull BHS bhs, @Nonnull List<HR> list, @Nonnull HRS hrs, String str, Throwable th, @Nonnull ResourcePathParser<P, K> resourcePathParser, @Nonnull ScatterGatherRequest<H, K> scatterGatherRequest, @Nonnull P p) {
        LOG.debug("appendErrorForEveryKey");
        AsyncFuture<?> asyncFuture = COMPLETED;
        Object parseRoles = this._scatterGatherHelper.parseRoles(bhs.getMethodName(), bhs.getRequestHeaders());
        StringBuilder sb = new StringBuilder(str);
        if (scatterGatherRequest.getPartitionKeys().isEmpty()) {
            HashMap hashMap = new HashMap();
            if (!scatterGatherRequest.getPartitionNamesToQuery().isEmpty()) {
                hashMap.put(HeaderNames.X_PARTITION, (String) scatterGatherRequest.getPartitionNamesToQuery().stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(",")));
            }
            List<H> hosts = scatterGatherRequest.getHosts();
            if (hosts != null && !hosts.isEmpty()) {
                hashMap.put(HeaderNames.X_SERVED_BY, hosts.get(0).toString());
            }
            appendError(bhs, list, hrs, sb.append(", RoutingPolicy=").append(parseRoles).toString(), th, hashMap);
        } else {
            for (K k : scatterGatherRequest.getPartitionKeys()) {
                HashMap hashMap2 = new HashMap();
                asyncFuture = asyncFuture.thenApply(obj -> {
                    return resourcePathParser.substitutePartitionKey((ResourcePathParser) p, (ResourcePath) k);
                }).thenCompose(resourcePath -> {
                    hashMap2.put("Content-Location", resourcePath.getLocation());
                    return this._scatterGatherHelper.findPartitionName(resourcePath.getResourceName(), k);
                }).thenApply(str2 -> {
                    hashMap2.put(HeaderNames.X_PARTITION, str2);
                    sb.append(", PartitionName=").append(str2);
                    List<H> hosts2 = scatterGatherRequest.getHosts();
                    if (hosts2 == null || hosts2.isEmpty()) {
                        return null;
                    }
                    hashMap2.put(HeaderNames.X_SERVED_BY, hosts2.get(0).toString());
                    return null;
                }).exceptionally(th2 -> {
                    LOG.info("Exception in appendErrorForEveryKey, key={}", k, th2);
                    return null;
                }).thenApply(obj2 -> {
                    appendError(bhs, list, hrs, sb.append(", RoutingPolicy=").append(parseRoles).toString(), th, hashMap2);
                    return null;
                });
            }
        }
        return asyncFuture;
    }

    protected void appendError(@Nonnull BHS bhs, @Nonnull List<HR> list, @Nonnull HRS hrs, String str, Throwable th, @Nonnull Map<String, String> map) {
        LOG.debug("appendError");
        list.add(buildErrorResponse(bhs, hrs, str, unwrapCompletion(th), map));
    }

    @Override // com.linkedin.alpini.router.ScatterGatherRequestHandler
    protected abstract boolean isTooLongFrameException(Throwable th);

    @Nonnull
    protected HR buildExceptionResponse(@Nonnull BHS bhs, @Nonnull Throwable th) {
        HR buildErrorResponse;
        boolean z;
        LOG.debug("[{}] handlerException", bhs.getRequestId(), th);
        Throwable unwrapCompletion = unwrapCompletion(th);
        Map<String, String> emptyMap = Collections.emptyMap();
        if (unwrapCompletion instanceof ExceptionWithStatus) {
            ExceptionWithStatus exceptionWithStatus = (ExceptionWithStatus) unwrapCompletion;
            if (exceptionWithStatus.code() >= 500) {
                LOG.warn("RouterException 5XX exception caught", (Throwable) exceptionWithStatus);
            }
            buildErrorResponse = buildErrorResponse(bhs, statusOf(exceptionWithStatus.code()), exceptionWithStatus.getMessage(), exceptionWithStatus, emptyMap);
            z = (exceptionWithStatus instanceof RouterException) && ((RouterException) exceptionWithStatus).shouldCloseChannel();
        } else if (isTooLongFrameException(unwrapCompletion)) {
            buildErrorResponse = buildErrorResponse(bhs, badRequest(), unwrapCompletion.getMessage(), unwrapCompletion, emptyMap);
            z = true;
        } else if (unwrapCompletion instanceof URISyntaxException) {
            URISyntaxException uRISyntaxException = (URISyntaxException) unwrapCompletion;
            buildErrorResponse = buildErrorResponse(bhs, badRequest(), "Bad request path (" + uRISyntaxException.getInput() + "). " + uRISyntaxException.getMessage(), uRISyntaxException, emptyMap);
            z = false;
        } else if (Http2Utils.isTooManyActiveStreamsError(unwrapCompletion)) {
            buildErrorResponse = buildErrorResponse(bhs, serviceUnavailable(), null, Http2TooManyStreamsException.INSTANCE, emptyMap);
            z = false;
        } else {
            LOG.error("Unexpected exception caught in ScatterGatherRequestHandler.exceptionCaught. Sending error and closing client channel. ", unwrapCompletion);
            buildErrorResponse = buildErrorResponse(bhs, internalServerError(), null, unwrapCompletion, emptyMap);
            z = true;
        }
        if (z) {
            setKeepAlive(buildErrorResponse, false);
        }
        return buildErrorResponse;
    }

    @Nonnull
    protected abstract HR buildErrorResponse(@Nonnull BHS bhs, @Nonnull HRS hrs, String str, Throwable th, @Nonnull Map<String, String> map);

    protected final void dispatch(@Nonnull Scatter<H, P, K> scatter, @Nonnull ScatterGatherRequest<H, K> scatterGatherRequest, @Nonnull P p, @Nonnull BHS bhs, @Nonnull AsyncPromise<H> asyncPromise, @Nonnull AsyncPromise<List<HR>> asyncPromise2, @Nonnull AsyncPromise<HRS> asyncPromise3, @Nonnull AsyncFuture<Void> asyncFuture, @Nonnull Executor executor) {
        try {
            LOG.debug("[{}] dispatch partitions={}", bhs.getRequestId(), Msg.make(scatterGatherRequest.getPartitionsNames(), set -> {
                return String.join(",", set);
            }));
            this._scatterGatherHelper.dispatch(scatter, scatterGatherRequest, p, bhs, asyncPromise, asyncPromise2, asyncPromise3, asyncFuture, executor);
        } catch (Throwable th) {
            if (!asyncPromise2.isDone()) {
                List<HR> singletonList = Collections.singletonList(buildExceptionResponse(bhs, th));
                if (asyncPromise2.setSuccess(singletonList)) {
                    return;
                } else {
                    releaseResponses(singletonList);
                }
            }
            LOG.warn("uncaught exception: {} {}", bhs.getMethodName(), bhs.getUri(), th);
        }
    }
}
