package com.linkedin.venice.client.store.transport;

import com.linkedin.common.callback.Callback;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.data.ByteString;
import com.linkedin.r2.filter.R2Constants;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestException;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamException;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamRequestBuilder;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.entitystream.ByteStringWriter;
import com.linkedin.r2.message.stream.entitystream.EntityStreams;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.authentication.ClientAuthenticationProvider;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.security.SSLFactory;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.xerces.impl.xs.SchemaSymbols;

/* loaded from: input_file:com/linkedin/venice/client/store/transport/D2TransportClient.class */
public class D2TransportClient extends TransportClient {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) D2TransportClient.class);
    private final D2Client d2Client;
    private final boolean privateD2Client;
    private String d2ServiceName;
    private ClientAuthenticationProvider authenticationProvider;

    /* loaded from: input_file:com/linkedin/venice/client/store/transport/D2TransportClient$D2TransportClientCallback.class */
    private static class D2TransportClientCallback extends TransportClientCallback implements Callback<RestResponse> {
        public D2TransportClientCallback(CompletableFuture<TransportClientResponse> completableFuture) {
            super(completableFuture);
        }

        @Override // com.linkedin.common.callback.Callback
        public void onError(Throwable th) {
            if (th instanceof RestException) {
                onSuccess(((RestException) th).getResponse());
            } else {
                D2TransportClient.LOGGER.error("", th);
                getValueFuture().completeExceptionally(new VeniceClientException(th));
            }
        }

        @Override // com.linkedin.common.callback.SuccessCallback
        public void onSuccess(RestResponse restResponse) {
            String header;
            int status = restResponse.getStatus();
            int i = -1;
            if (200 == status && (header = restResponse.getHeader(HttpConstants.VENICE_SCHEMA_ID)) != null) {
                i = Integer.parseInt(header);
            }
            CompressionStrategy compressionStrategy = CompressionStrategy.NO_OP;
            String header2 = restResponse.getHeader(HttpConstants.VENICE_COMPRESSION_STRATEGY);
            if (header2 != null) {
                compressionStrategy = CompressionStrategy.valueOf(Integer.parseInt(header2));
            }
            completeFuture(status, i, compressionStrategy, restResponse.getEntity().copyBytes());
        }
    }

    public D2TransportClient(String str, D2Client d2Client, ClientAuthenticationProvider clientAuthenticationProvider) {
        this.d2ServiceName = str;
        this.d2Client = d2Client;
        this.privateD2Client = false;
        this.authenticationProvider = clientAuthenticationProvider;
    }

    public D2TransportClient(String str, ClientConfig clientConfig) {
        D2ClientBuilder basePath = new D2ClientBuilder().setZkHosts(clientConfig.getVeniceURL()).setZkSessionTimeout(clientConfig.getD2ZkTimeout(), TimeUnit.MILLISECONDS).setZkStartupTimeout(clientConfig.getD2ZkTimeout(), TimeUnit.MILLISECONDS).setLbWaitTimeout(clientConfig.getD2ZkTimeout(), TimeUnit.MILLISECONDS).setBasePath(clientConfig.getD2BasePath());
        SSLFactory sslFactory = clientConfig.getSslFactory();
        if (sslFactory != null) {
            basePath.setIsSSLEnabled(sslFactory.isSslEnabled());
            basePath.setSSLContext(sslFactory.getSSLContext());
            basePath.setSSLParameters(sslFactory.getSSLParameters());
        }
        this.d2ServiceName = str;
        this.d2Client = basePath.build();
        this.privateD2Client = true;
        this.authenticationProvider = clientConfig.getAuthenticationProvider();
        D2ClientUtils.startClient(this.d2Client);
    }

    public String getServiceName() {
        return this.d2ServiceName;
    }

    public void setServiceName(String str) {
        this.d2ServiceName = str;
    }

    @Override // com.linkedin.venice.client.store.transport.TransportClient
    public CompletableFuture<TransportClientResponse> get(String str, Map<String, String> map) {
        RestRequest restGetRequest = getRestGetRequest(str, map);
        CompletableFuture<TransportClientResponse> completableFuture = new CompletableFuture<>();
        RequestContext requestContext = new RequestContext();
        requestContext.putLocalAttr(R2Constants.OPERATION, "get");
        restRequest(restGetRequest, requestContext, new D2TransportClientCallback(completableFuture));
        return completableFuture;
    }

    @Override // com.linkedin.venice.client.store.transport.TransportClient
    public CompletableFuture<TransportClientResponse> post(String str, Map<String, String> map, byte[] bArr) {
        RestRequest restPostRequest = getRestPostRequest(str, map, bArr);
        CompletableFuture<TransportClientResponse> completableFuture = new CompletableFuture<>();
        restRequest(restPostRequest, getRequestContextForPost(), new D2TransportClientCallback(completableFuture));
        return completableFuture;
    }

    private RequestContext getRequestContextForPost() {
        RequestContext requestContext = new RequestContext();
        requestContext.putLocalAttr(R2Constants.OPERATION, "batchget");
        return requestContext;
    }

    @Override // com.linkedin.venice.client.store.transport.TransportClient
    public void streamPost(String str, Map<String, String> map, byte[] bArr, final TransportClientStreamingCallback transportClientStreamingCallback, int i) {
        try {
            StreamRequestBuilder streamRequestBuilder = new StreamRequestBuilder(URI.create(getD2RequestUrl(str)));
            map.forEach((str2, str3) -> {
                streamRequestBuilder.addHeaderValue(str2, str3);
            });
            streamRequestBuilder.addHeaderValue(HttpConstants.VENICE_KEY_COUNT, Integer.toString(i));
            streamRequestBuilder.setMethod("POST");
            StreamRequest build = streamRequestBuilder.build(EntityStreams.newEntityStream(new ByteStringWriter(ByteString.unsafeWrap(bArr))));
            RequestContext requestContext = new RequestContext();
            requestContext.putLocalAttr(R2Constants.IS_FULL_REQUEST, true);
            streamRequest(build, requestContext, new Callback<StreamResponse>() { // from class: com.linkedin.venice.client.store.transport.D2TransportClient.1
                @Override // com.linkedin.common.callback.SuccessCallback
                public void onSuccess(StreamResponse streamResponse) {
                    transportClientStreamingCallback.onHeaderReceived(streamResponse.getHeaders());
                    streamResponse.getEntityStream().setReader(new Reader() { // from class: com.linkedin.venice.client.store.transport.D2TransportClient.1.1
                        private boolean isDone = false;
                        private ReadHandle rh;

                        @Override // com.linkedin.r2.message.stream.entitystream.Reader
                        public void onInit(ReadHandle readHandle) {
                            this.rh = readHandle;
                            readHandle.request(10);
                        }

                        @Override // com.linkedin.r2.message.stream.entitystream.Observer
                        public synchronized void onDataAvailable(ByteString byteString) {
                            if (this.isDone) {
                                D2TransportClient.LOGGER.warn("Received data after completion and data length: {}", Integer.valueOf(byteString.length()));
                            } else {
                                transportClientStreamingCallback.onDataReceived(byteString.asByteBuffer());
                                this.rh.request(1);
                            }
                        }

                        @Override // com.linkedin.r2.message.stream.entitystream.Observer
                        public synchronized void onDone() {
                            if (this.isDone) {
                                D2TransportClient.LOGGER.warn("onDone got invoked after completion");
                            } else {
                                transportClientStreamingCallback.onCompletion(Optional.empty());
                                this.isDone = true;
                            }
                        }

                        @Override // com.linkedin.r2.message.stream.entitystream.Observer
                        public synchronized void onError(Throwable th) {
                            if (this.isDone) {
                                D2TransportClient.LOGGER.warn("onError got invoked after completion");
                            } else {
                                transportClientStreamingCallback.onCompletion(Optional.of(new VeniceClientException(th)));
                            }
                        }
                    });
                }

                @Override // com.linkedin.common.callback.Callback
                public void onError(Throwable th) {
                    if (!(th instanceof StreamException)) {
                        transportClientStreamingCallback.onCompletion(Optional.of(new VeniceClientException(th)));
                        return;
                    }
                    StreamException streamException = (StreamException) th;
                    transportClientStreamingCallback.onCompletion(Optional.of(new VeniceClientHttpException(streamException.getResponse().toString(), streamException.getResponse().getStatus())));
                    ((StreamException) th).getResponse().getStatus();
                }
            });
        } catch (Throwable th) {
            transportClientStreamingCallback.onCompletion(Optional.of(new VeniceClientException("Received exception when sending out request", th)));
        }
    }

    private String getD2RequestUrl(String str) {
        return "d2://" + this.d2ServiceName + "/" + str;
    }

    private RestRequest getRestGetRequest(String str, Map<String, String> map) {
        return D2ClientUtils.createD2GetRequest(getD2RequestUrl(str), injectSecurityHeaders(map));
    }

    private Map<String, String> injectSecurityHeaders(Map<String, String> map) {
        if (this.authenticationProvider == null) {
            return map;
        }
        if (map == null) {
            return this.authenticationProvider.getHTTPAuthenticationHeaders();
        }
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(this.authenticationProvider.getHTTPAuthenticationHeaders());
        return hashMap;
    }

    private RestRequest getRestPostRequest(String str, Map<String, String> map, byte[] bArr) {
        return D2ClientUtils.createD2PostRequest(getD2RequestUrl(str), injectSecurityHeaders(map), bArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void restRequest(final RestRequest restRequest, final RequestContext requestContext, final Callback<RestResponse> callback) {
        Callback<RestResponse> callback2 = new Callback<RestResponse>() { // from class: com.linkedin.venice.client.store.transport.D2TransportClient.2
            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                try {
                    if (th instanceof RestException) {
                        RestResponse response = ((RestException) th).getResponse();
                        if (response.getStatus() == 301) {
                            String header = response.getHeader("Location");
                            if (header != null) {
                                URI uri = new URI(header);
                                D2TransportClient.this.d2ServiceName = uri.getAuthority();
                                D2TransportClient.LOGGER.info("update d2ServiceName to {}", D2TransportClient.this.d2ServiceName);
                                D2TransportClient.this.d2Client.restRequest(restRequest.builder().setURI(uri).build(), requestContext.m2316clone(), callback);
                                return;
                            }
                            D2TransportClient.LOGGER.error("location header is null");
                        }
                    }
                    callback.onError(th);
                } catch (Exception e) {
                    D2TransportClient.LOGGER.error("cannot redirect request", (Throwable) e);
                    callback.onError(e);
                }
            }

            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(RestResponse restResponse) {
                callback.onSuccess(restResponse);
            }
        };
        this.d2Client.restRequest(((RestRequestBuilder) restRequest.builder().addHeaderValue(HttpConstants.VENICE_ALLOW_REDIRECT, SchemaSymbols.ATTVAL_TRUE_1)).build(), requestContext, callback2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void streamRequest(final StreamRequest streamRequest, final RequestContext requestContext, final Callback<StreamResponse> callback) {
        Callback<StreamResponse> callback2 = new Callback<StreamResponse>() { // from class: com.linkedin.venice.client.store.transport.D2TransportClient.3
            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                try {
                    if (th instanceof StreamException) {
                        StreamResponse response = ((StreamException) th).getResponse();
                        if (response.getStatus() == 301) {
                            String header = response.getHeader("Location");
                            if (header != null) {
                                URI uri = new URI(header);
                                D2TransportClient.this.d2ServiceName = uri.getAuthority();
                                D2TransportClient.LOGGER.info("update d2ServiceName to {}", D2TransportClient.this.d2ServiceName);
                                D2TransportClient.this.d2Client.streamRequest(streamRequest.builder().setURI(uri).build(streamRequest.getEntityStream()), requestContext.m2316clone(), callback);
                                return;
                            }
                            D2TransportClient.LOGGER.error("location header is null");
                        }
                    }
                    callback.onError(th);
                } catch (Exception e) {
                    D2TransportClient.LOGGER.error("cannot follow redirection", (Throwable) e);
                    callback.onError(e);
                }
            }

            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(StreamResponse streamResponse) {
                callback.onSuccess(streamResponse);
            }
        };
        this.d2Client.streamRequest(((StreamRequestBuilder) streamRequest.builder().addHeaderValue(HttpConstants.VENICE_ALLOW_REDIRECT, SchemaSymbols.ATTVAL_TRUE_1)).build(streamRequest.getEntityStream()), requestContext, callback2);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.privateD2Client) {
            D2ClientUtils.shutdownClient(this.d2Client);
        } else {
            LOGGER.info("This is a shared D2Client. TransportClient is not responsible to shut it down. Please do it manually.");
        }
    }

    public D2Client getD2Client() {
        return this.d2Client;
    }

    public String toString() {
        return getClass().getSimpleName() + "(d2ServiceName: " + this.d2ServiceName + ")";
    }
}
