package com.linkedin.r2.netty.client;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.MultiCallback;
import com.linkedin.common.util.None;
import com.linkedin.r2.filter.R2Constants;
import com.linkedin.r2.message.Messages;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamRequestBuilder;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.message.timing.TimingImportance;
import com.linkedin.r2.message.timing.TimingKey;
import com.linkedin.r2.netty.callback.StreamExecutionCallback;
import com.linkedin.r2.netty.common.NettyChannelAttributes;
import com.linkedin.r2.netty.common.NettyClientState;
import com.linkedin.r2.netty.common.ShutdownTimeoutException;
import com.linkedin.r2.netty.common.StreamingTimeout;
import com.linkedin.r2.netty.common.UnknownSchemeException;
import com.linkedin.r2.netty.handler.common.SslHandshakeTimingHandler;
import com.linkedin.r2.transport.common.MessageType;
import com.linkedin.r2.transport.common.WireAttributeHelper;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.client.AsyncPool;
import com.linkedin.r2.transport.http.client.InvokedOnceTransportCallback;
import com.linkedin.r2.transport.http.client.common.ChannelPoolManager;
import com.linkedin.r2.transport.http.client.common.ssl.SslSessionValidator;
import com.linkedin.r2.transport.http.common.HttpBridge;
import com.linkedin.r2.transport.http.common.HttpProtocolVersion;
import com.linkedin.r2.util.Cancellable;
import com.linkedin.r2.util.RequestTimeoutUtil;
import com.linkedin.r2.util.Timeout;
import com.linkedin.util.ArgumentUtil;
import com.linkedin.util.clock.Clock;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/r2/netty/client/HttpNettyClient.class */
public class HttpNettyClient implements TransportClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HttpNettyClient.class);
    private static final TimingKey TIMING_KEY = TimingKey.registerNewKey("dns_resolution_new", TimingImportance.LOW);
    private static final String HTTP_SCHEME = HttpScheme.HTTP.toString();
    private static final String HTTPS_SCHEME = HttpScheme.HTTPS.toString();
    private static final int HTTP_DEFAULT_PORT = 80;
    private static final int HTTPS_DEFAULT_PORT = 443;
    private static final int DEFAULT_STREAMING_TIMEOUT = -1;
    private final EventLoopGroup _eventLoopGroup;
    private final ScheduledExecutorService _scheduler;
    private final ExecutorService _callbackExecutor;
    private final ChannelPoolManager _channelPoolManager;
    private final ChannelPoolManager _sslChannelPoolManager;
    private final Clock _clock;
    private final HttpProtocolVersion _protocolVersion;
    private final long _requestTimeout;
    private final long _streamingTimeout;
    private final long _shutdownTimeout;
    private final AtomicReference<NettyClientState> _state;
    private final Set<TransportCallback<StreamResponse>> _userCallbacks = ConcurrentHashMap.newKeySet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/r2/netty/client/HttpNettyClient$ChannelPoolGetCallback.class */
    public class ChannelPoolGetCallback implements Callback<Channel> {
        private final AsyncPool<Channel> _pool;
        private final Request _request;
        private final RequestContext _requestContext;
        private final TransportCallback<StreamResponse> _callback;
        private final Timeout<None> _timeout;
        private final long _resolvedRequestTimeout;
        private final long _streamingTimeout;

        ChannelPoolGetCallback(AsyncPool<Channel> asyncPool, Request request, RequestContext requestContext, TransportCallback<StreamResponse> transportCallback, Timeout<None> timeout, long j, long j2) {
            this._pool = asyncPool;
            this._request = request;
            this._requestContext = requestContext;
            this._callback = transportCallback;
            this._timeout = timeout;
            this._resolvedRequestTimeout = j;
            this._streamingTimeout = j2;
        }

        @Override // com.linkedin.common.callback.SuccessCallback
        public void onSuccess(Channel channel) {
            this._timeout.getItem();
            channel.attr(NettyChannelAttributes.CHANNEL_POOL).set(this._pool);
            channel.attr(NettyChannelAttributes.RESPONSE_CALLBACK).set(SslHandshakeTimingHandler.getSslTimingCallback(channel, this._requestContext, this._callback));
            channel.attr(NettyChannelAttributes.SSL_SESSION_VALIDATOR).set((SslSessionValidator) this._requestContext.getLocalAttr(R2Constants.REQUESTED_SSL_SESSION_VALIDATOR));
            NettyClientState nettyClientState = (NettyClientState) HttpNettyClient.this._state.get();
            if (nettyClientState == NettyClientState.REQUESTS_STOPPING || nettyClientState == NettyClientState.SHUTDOWN) {
                channel.pipeline().fireExceptionCaught((Throwable) new ShutdownTimeoutException("Operation did not complete before shutdown"));
                return;
            }
            ScheduledFuture schedule = HttpNettyClient.this._scheduler.schedule(() -> {
                return channel.pipeline().fireExceptionCaught((Throwable) new TimeoutException("Exceeded request timeout of " + this._resolvedRequestTimeout + "ms"));
            }, this._resolvedRequestTimeout, TimeUnit.MILLISECONDS);
            if (isStreamingTimeoutEnabled()) {
                channel.attr(NettyChannelAttributes.STREAMING_TIMEOUT_FUTURE).set(new StreamingTimeout(HttpNettyClient.this._scheduler, this._streamingTimeout, channel, HttpNettyClient.this._clock));
            }
            channel.attr(NettyChannelAttributes.TIMEOUT_FUTURE).set(schedule);
            channel.writeAndFlush(this._request).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        }

        private boolean isStreamingTimeoutEnabled() {
            return this._streamingTimeout > -1;
        }

        @Override // com.linkedin.common.callback.Callback
        public void onError(Throwable th) {
            this._callback.onResponse(TransportResponseImpl.error(th));
        }
    }

    public HttpNettyClient(EventLoopGroup eventLoopGroup, ScheduledExecutorService scheduledExecutorService, ExecutorService executorService, ChannelPoolManager channelPoolManager, ChannelPoolManager channelPoolManager2, HttpProtocolVersion httpProtocolVersion, Clock clock, long j, long j2, long j3) {
        ArgumentUtil.notNull(eventLoopGroup, "eventLoopGroup");
        ArgumentUtil.notNull(scheduledExecutorService, "scheduler");
        ArgumentUtil.notNull(executorService, "callbackExecutor");
        ArgumentUtil.notNull(channelPoolManager, "channelPoolManager");
        ArgumentUtil.notNull(channelPoolManager2, "sslChannelPoolManager");
        ArgumentUtil.notNull(clock, "clock");
        ArgumentUtil.checkArgument(j >= 0, "requestTimeout");
        ArgumentUtil.checkArgument(j2 >= -1, "streamingTimeout");
        ArgumentUtil.checkArgument(j3 >= 0, "shutdownTimeout");
        j2 = j2 >= j ? -1L : j2;
        this._eventLoopGroup = eventLoopGroup;
        this._scheduler = scheduledExecutorService;
        this._callbackExecutor = executorService;
        this._channelPoolManager = channelPoolManager;
        this._sslChannelPoolManager = channelPoolManager2;
        this._clock = clock;
        this._protocolVersion = httpProtocolVersion;
        this._requestTimeout = j;
        this._streamingTimeout = j2;
        this._shutdownTimeout = j3;
        this._state = new AtomicReference<>(NettyClientState.RUNNING);
    }

    @Override // com.linkedin.r2.transport.common.bridge.client.TransportClient
    public void restRequest(RestRequest restRequest, RequestContext requestContext, Map<String, String> map, TransportCallback<RestResponse> transportCallback) {
        sendRequest(restRequest, requestContext, map, Messages.toStreamTransportCallback(transportCallback));
    }

    @Override // com.linkedin.r2.transport.common.bridge.client.TransportClient
    public void streamRequest(StreamRequest streamRequest, RequestContext requestContext, Map<String, String> map, TransportCallback<StreamResponse> transportCallback) {
        if (isFullRequest(requestContext)) {
            sendStreamRequestAsRestRequest(streamRequest, requestContext, map, transportCallback);
        } else {
            sendRequest(streamRequest, requestContext, map, transportCallback);
        }
    }

    @Override // com.linkedin.r2.transport.common.bridge.client.TransportClient
    public void shutdown(final Callback<None> callback) {
        LOG.info("Shutdown requested");
        if (this._state.compareAndSet(NettyClientState.RUNNING, NettyClientState.SHUTTING_DOWN)) {
            LOG.info("Shutting down");
            MultiCallback multiCallback = new MultiCallback(new Callback<None>() { // from class: com.linkedin.r2.netty.client.HttpNettyClient.1
                private void releaseCallbacks() {
                    HttpNettyClient.this._userCallbacks.forEach(transportCallback -> {
                        transportCallback.onResponse(TransportResponseImpl.error(new TimeoutException("Operation did not complete before shutdown")));
                    });
                }

                @Override // com.linkedin.common.callback.Callback
                public void onError(Throwable th) {
                    releaseCallbacks();
                    callback.onError(th);
                }

                @Override // com.linkedin.common.callback.SuccessCallback
                public void onSuccess(None none) {
                    releaseCallbacks();
                    callback.onSuccess(none);
                }
            }, 2);
            this._channelPoolManager.shutdown(multiCallback, () -> {
                this._state.set(NettyClientState.REQUESTS_STOPPING);
            }, () -> {
                this._state.set(NettyClientState.SHUTDOWN);
            }, this._shutdownTimeout);
            this._sslChannelPoolManager.shutdown(multiCallback, () -> {
                this._state.set(NettyClientState.REQUESTS_STOPPING);
            }, () -> {
                this._state.set(NettyClientState.SHUTDOWN);
            }, this._shutdownTimeout);
        } else {
            callback.onError(new IllegalStateException("Shutdown has already been requested."));
        }
        TimingKey.unregisterKey(TIMING_KEY);
    }

    private void sendStreamRequestAsRestRequest(StreamRequest streamRequest, final RequestContext requestContext, final Map<String, String> map, final TransportCallback<StreamResponse> transportCallback) {
        Messages.toRestRequest(streamRequest, new Callback<RestRequest>() { // from class: com.linkedin.r2.netty.client.HttpNettyClient.2
            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                transportCallback.onResponse(TransportResponseImpl.error(th));
            }

            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(RestRequest restRequest) {
                HttpNettyClient.this.sendRequest(restRequest, requestContext, map, transportCallback);
            }
        });
    }

    private static boolean isFullRequest(RequestContext requestContext) {
        Object localAttr = requestContext.getLocalAttr(R2Constants.IS_FULL_REQUEST);
        return localAttr != null && ((Boolean) localAttr).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendRequest(Request request, RequestContext requestContext, Map<String, String> map, TransportCallback<StreamResponse> transportCallback) {
        StreamRequest buildRequestWithWireAttributes;
        TransportCallback<StreamResponse> decorateUserCallback = decorateUserCallback(request, transportCallback);
        if (this._state.get() != NettyClientState.RUNNING) {
            decorateUserCallback.onResponse(TransportResponseImpl.error(new IllegalStateException("Client is not running")));
            return;
        }
        long resolveRequestTimeout = resolveRequestTimeout(requestContext, this._requestTimeout);
        Timeout timeout = new Timeout(this._scheduler, resolveRequestTimeout, TimeUnit.MILLISECONDS, None.none());
        timeout.addTimeoutTask(() -> {
            decorateUserCallback.onResponse(TransportResponseImpl.error(new TimeoutException("Exceeded request timeout of " + resolveRequestTimeout + "ms")));
        });
        try {
            TimingContextUtil.markTiming(requestContext, TIMING_KEY);
            SocketAddress resolveAddress = resolveAddress(request, requestContext);
            TimingContextUtil.markTiming(requestContext, TIMING_KEY);
            if (request instanceof StreamRequest) {
                buildRequestWithWireAttributes = buildRequestWithWireAttributes((StreamRequest) request, map);
            } else {
                MessageType.setMessageType(MessageType.Type.REST, map);
                buildRequestWithWireAttributes = buildRequestWithWireAttributes((RestRequest) request, map);
            }
            try {
                AsyncPool<Channel> poolForAddress = getChannelPoolManagerPerRequest(buildRequestWithWireAttributes).getPoolForAddress(resolveAddress);
                requestContext.putLocalAttr(R2Constants.HTTP_PROTOCOL_VERSION, this._protocolVersion);
                Cancellable cancellable = poolForAddress.get(new ChannelPoolGetCallback(poolForAddress, buildRequestWithWireAttributes, requestContext, decorateUserCallback, timeout, resolveRequestTimeout, this._streamingTimeout));
                if (cancellable != null) {
                    cancellable.getClass();
                    timeout.addTimeoutTask(cancellable::cancel);
                }
            } catch (IllegalStateException e) {
                decorateUserCallback.onResponse(TransportResponseImpl.error(e));
            }
        } catch (Exception e2) {
            decorateUserCallback.onResponse(TransportResponseImpl.error(e2));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private StreamRequest buildRequestWithWireAttributes(StreamRequest streamRequest, Map<String, String> map) {
        return ((StreamRequestBuilder) streamRequest.builder().overwriteHeaders(WireAttributeHelper.toWireAttributes(map))).build(streamRequest.getEntityStream());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private RestRequest buildRequestWithWireAttributes(RestRequest restRequest, Map<String, String> map) {
        return ((RestRequestBuilder) new RestRequestBuilder(restRequest).overwriteHeaders(WireAttributeHelper.toWireAttributes(map))).build();
    }

    private TransportCallback<StreamResponse> decorateUserCallback(Request request, TransportCallback<StreamResponse> transportCallback) {
        return getShutdownAwareCallback(getExecutionCallback(HttpBridge.streamToHttpCallback(transportCallback, request)));
    }

    private TransportCallback<StreamResponse> getExecutionCallback(TransportCallback<StreamResponse> transportCallback) {
        return new StreamExecutionCallback(this._callbackExecutor, transportCallback);
    }

    private TransportCallback<StreamResponse> getShutdownAwareCallback(TransportCallback<StreamResponse> transportCallback) {
        InvokedOnceTransportCallback invokedOnceTransportCallback = new InvokedOnceTransportCallback(transportCallback);
        this._userCallbacks.add(invokedOnceTransportCallback);
        return transportResponse -> {
            this._userCallbacks.remove(invokedOnceTransportCallback);
            invokedOnceTransportCallback.onResponse(transportResponse);
        };
    }

    private ChannelPoolManager getChannelPoolManagerPerRequest(Request request) {
        return isSslRequest(request) ? this._sslChannelPoolManager : this._channelPoolManager;
    }

    private static boolean isSslRequest(Request request) {
        return HTTPS_SCHEME.equals(request.getURI().getScheme());
    }

    public static long resolveRequestTimeout(RequestContext requestContext, long j) {
        long j2 = j;
        Number number = (Number) requestContext.getLocalAttr(R2Constants.REQUEST_TIMEOUT);
        if (number != null) {
            j2 = number.longValue();
        }
        Double d = (Double) requestContext.getLocalAttr(R2Constants.PREEMPTIVE_TIMEOUT_RATE);
        if (d != null) {
            j2 = RequestTimeoutUtil.applyPreemptiveTimeoutRate(j2, d.doubleValue());
        }
        return j2;
    }

    public static SocketAddress resolveAddress(Request request, RequestContext requestContext) throws UnknownHostException, UnknownSchemeException {
        URI uri = request.getURI();
        String scheme = uri.getScheme();
        if (!HTTP_SCHEME.equalsIgnoreCase(scheme) && !HTTPS_SCHEME.equalsIgnoreCase(scheme)) {
            throw new UnknownSchemeException("Unknown scheme: " + scheme + " (only http/https is supported)");
        }
        String host = uri.getHost();
        int port = uri.getPort();
        if (port == -1) {
            port = HTTP_SCHEME.equalsIgnoreCase(scheme) ? 80 : 443;
        }
        InetAddress byName = InetAddress.getByName(host);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(byName, port);
        requestContext.putLocalAttr(R2Constants.REMOTE_SERVER_ADDR, byName.getHostAddress());
        requestContext.putLocalAttr(R2Constants.REMOTE_SERVER_PORT, Integer.valueOf(port));
        return inetSocketAddress;
    }
}
