package com.linkedin.venice.router.streaming;

import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compute.protocol.response.ComputeResponseRecordV1;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.read.protocol.response.streaming.StreamingFooterRecordV1;
import com.linkedin.venice.router.api.VeniceResponseAggregator;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.router.streaming.VeniceChunkedWriteHandler;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.streaming.StreamingConstants;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GenericProgressiveFutureListener;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.xerces.impl.xs.SchemaSymbols;

/* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse.class */
public class VeniceChunkedResponse {
    private final String storeName;
    private final RequestType requestType;
    private final RouterStats<AggRouterHttpRequestStats> routerStats;
    private final ChannelHandlerContext ctx;
    private final VeniceChunkedWriteHandler chunkedWriteHandler;
    private final ChannelProgressivePromise writeFuture;
    private boolean responseCompleteCalled = false;
    private final AtomicLong totalBytesReceived = new AtomicLong(0);
    private final Queue<Chunk> chunksToWrite = new ConcurrentLinkedQueue();
    private final Queue<Chunk> chunksAwaitingCallback = new ConcurrentLinkedQueue();
    private volatile CompressionStrategy responseCompression = null;
    private static final HttpResponse RESPONSE_META_FOR_COMPUTE;
    public static final ByteBuf EMPTY_BYTE_BUF = new EmptyByteBuf(ByteBufAllocator.DEFAULT);
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) VeniceChunkedResponse.class);
    private static final RecordSerializer<StreamingFooterRecordV1> STREAMING_FOOTER_SERIALIZER = FastSerializerDeserializerFactory.getAvroGenericSerializer(StreamingFooterRecordV1.getClassSchema());
    private static final Map<CharSequence, CharSequence> EMPTY_MAP = new HashMap();
    private static final RecordSerializer<MultiGetResponseRecordV1> MULTI_GET_RESPONSE_SERIALIZER = FastSerializerDeserializerFactory.getAvroGenericSerializer(MultiGetResponseRecordV1.getClassSchema());
    private static final RecordSerializer<ComputeResponseRecordV1> COMPUTE_RESPONSE_SERIALIZER = FastSerializerDeserializerFactory.getAvroGenericSerializer(ComputeResponseRecordV1.getClassSchema());
    protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    private static final Map<CompressionStrategy, HttpResponse> RESPONSE_META_MAP_FOR_MULTI_GET = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$Chunk.class */
    public class Chunk {
        final CompletableFuture<Long> future = new CompletableFuture<>();
        final StreamingCallback<Long> callback;
        final ByteBuf buffer;
        final boolean isLast;
        final long bytesToBeWritten;
        final long writeCompleteThreshold;

        public Chunk(ByteBuf byteBuf, boolean z, StreamingCallback<Long> streamingCallback) {
            this.buffer = byteBuf;
            this.isLast = z;
            this.callback = streamingCallback;
            this.bytesToBeWritten = byteBuf.readableBytes();
            this.writeCompleteThreshold = VeniceChunkedResponse.this.totalBytesReceived.addAndGet(this.bytesToBeWritten);
        }

        void resolveChunk(Exception exc) {
            long j = 0;
            if (exc == null) {
                j = this.bytesToBeWritten;
                this.future.complete(Long.valueOf(this.bytesToBeWritten));
            } else {
                this.future.completeExceptionally(exc);
            }
            if (this.callback != null) {
                this.callback.onCompletion(Long.valueOf(j), exc);
            }
            if (this.buffer.refCnt() > 0) {
                ReferenceCountUtil.release(this.buffer);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$ChunkCallbackListener.class */
    public class ChunkCallbackListener implements GenericProgressiveFutureListener<ChannelProgressiveFuture> {
        private ChunkCallbackListener() {
        }

        @Override // io.netty.util.concurrent.GenericProgressiveFutureListener
        public void operationProgressed(ChannelProgressiveFuture channelProgressiveFuture, long j, long j2) {
            while (VeniceChunkedResponse.this.chunksAwaitingCallback.peek() != null && j >= ((Chunk) VeniceChunkedResponse.this.chunksAwaitingCallback.peek()).writeCompleteThreshold) {
                ((Chunk) VeniceChunkedResponse.this.chunksAwaitingCallback.poll()).resolveChunk(null);
            }
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelProgressiveFuture channelProgressiveFuture) {
            if (channelProgressiveFuture.isSuccess()) {
                VeniceChunkedResponse.this.cleanupChunks(null);
            } else {
                VeniceChunkedResponse.this.handleChannelWriteFailure(channelProgressiveFuture.cause(), true);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$ChunkDispenser.class */
    public class ChunkDispenser implements ChunkedInput<HttpContent> {
        private boolean sentLastChunk;
        private final AtomicInteger progress;

        private ChunkDispenser() {
            this.sentLastChunk = false;
            this.progress = new AtomicInteger(0);
        }

        @Override // io.netty.handler.stream.ChunkedInput
        public boolean isEndOfInput() throws Exception {
            return this.sentLastChunk;
        }

        @Override // io.netty.handler.stream.ChunkedInput
        public void close() throws Exception {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.netty.handler.stream.ChunkedInput
        public HttpContent readChunk(ChannelHandlerContext channelHandlerContext) throws Exception {
            return readChunk(channelHandlerContext.alloc());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.netty.handler.stream.ChunkedInput
        public HttpContent readChunk(ByteBufAllocator byteBufAllocator) throws Exception {
            DefaultHttpContent defaultHttpContent = null;
            Chunk chunk = (Chunk) VeniceChunkedResponse.this.chunksToWrite.poll();
            if (chunk != null) {
                this.progress.addAndGet(chunk.buffer.readableBytes());
                VeniceChunkedResponse.this.chunksAwaitingCallback.add(chunk);
                if (chunk.isLast) {
                    defaultHttpContent = new DefaultLastHttpContent(chunk.buffer);
                    this.sentLastChunk = true;
                } else {
                    defaultHttpContent = new DefaultHttpContent(chunk.buffer);
                }
            }
            return defaultHttpContent;
        }

        @Override // io.netty.handler.stream.ChunkedInput
        public long length() {
            return -1L;
        }

        @Override // io.netty.handler.stream.ChunkedInput
        public long progress() {
            return this.progress.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$CleanupCallback.class */
    public class CleanupCallback implements GenericFutureListener<ChannelFuture> {
        private final Exception exception;

        CleanupCallback(Exception exc) {
            this.exception = exc;
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            Throwable cause = channelFuture.cause() == null ? this.exception : channelFuture.cause();
            if (cause != null) {
                VeniceChunkedResponse.this.handleChannelWriteFailure(cause, false);
            } else {
                VeniceChunkedResponse.this.cleanupChunks(null);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$ResponseMetadataWriteListener.class */
    public class ResponseMetadataWriteListener implements GenericFutureListener<ChannelFuture> {
        private ResponseMetadataWriteListener() {
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                VeniceChunkedResponse.this.writeFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChunkCallbackListener());
                VeniceChunkedResponse.this.chunkedWriteHandler.write(VeniceChunkedResponse.this.ctx, new ChunkDispenser(), VeniceChunkedResponse.this.writeFuture);
                VeniceChunkedResponse.this.chunkedWriteHandler.resumeTransfer();
            } else {
                if (VeniceChunkedResponse.REDUNDANT_LOGGING_FILTER.isRedundantException("Received exception when sending response metadata")) {
                    VeniceChunkedResponse.LOGGER.error("Received exception when sending response metadata");
                } else {
                    VeniceChunkedResponse.LOGGER.error("Received exception when sending response metadata", channelFuture.cause());
                }
                VeniceChunkedResponse.this.handleChannelWriteFailure(channelFuture.cause(), true);
            }
        }
    }

    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$StreamingCallbackOnlyFreeResponseOnSuccess.class */
    private static class StreamingCallbackOnlyFreeResponseOnSuccess<T> implements StreamingCallback<T> {
        private final FullHttpResponse response;
        private final ChannelPromise promise;

        private StreamingCallbackOnlyFreeResponseOnSuccess(FullHttpResponse fullHttpResponse, ChannelPromise channelPromise) {
            this.response = fullHttpResponse;
            this.promise = channelPromise;
        }

        @Override // com.linkedin.venice.router.streaming.StreamingCallback
        public void onCompletion(T t, Exception exc) {
            if (exc != null) {
                this.promise.setFailure2((Throwable) exc);
            } else {
                this.response.release();
                this.promise.setSuccess();
            }
        }
    }

    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$StreamingResponseMeta.class */
    private static class StreamingResponseMeta extends DefaultHttpResponse {
        public StreamingResponseMeta(Map<CharSequence, String> map) {
            super(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            map.forEach((charSequence, str) -> {
                headers().set(charSequence, str);
            });
            HttpUtil.setTransferEncodingChunked(this, true);
            headers().set(HttpConstants.VENICE_STREAMING_RESPONSE, SchemaSymbols.ATTVAL_TRUE_1);
        }
    }

    /* loaded from: input_file:com/linkedin/venice/router/streaming/VeniceChunkedResponse$WriteMessageCallbackImpl.class */
    private class WriteMessageCallbackImpl implements VeniceChunkedWriteHandler.WriteMessageCallback {
        private WriteMessageCallbackImpl() {
        }

        @Override // com.linkedin.venice.router.streaming.VeniceChunkedWriteHandler.WriteMessageCallback
        public boolean whetherToSkipMessage(Object obj, ChannelPromise channelPromise) {
            if ((obj instanceof StreamingResponseMeta) || (obj instanceof ChunkDispenser)) {
                return false;
            }
            if (obj instanceof SuccessfulStreamingResponse) {
                VeniceChunkedResponse.this.finish(new StreamingCallbackOnlyFreeResponseOnSuccess((SuccessfulStreamingResponse) obj, channelPromise));
                return true;
            }
            if (!(obj instanceof FullHttpResponse)) {
                throw new VeniceException("Unexpected message type received: " + (obj == null ? "null" : obj.getClass()));
            }
            if (VeniceChunkedResponse.this.responseCompression == null) {
                return false;
            }
            FullHttpResponse fullHttpResponse = (FullHttpResponse) obj;
            HttpResponseStatus status = fullHttpResponse.status();
            if (status.equals(HttpResponseStatus.OK)) {
                throw new IllegalStateException("Unexpected response status: " + status + ", and only non 200 status is expected here");
            }
            VeniceChunkedResponse.this.finishWithError(fullHttpResponse, new StreamingCallbackOnlyFreeResponseOnSuccess(fullHttpResponse, channelPromise));
            return true;
        }
    }

    public VeniceChunkedResponse(String str, RequestType requestType, ChannelHandlerContext channelHandlerContext, VeniceChunkedWriteHandler veniceChunkedWriteHandler, RouterStats<AggRouterHttpRequestStats> routerStats) {
        this.storeName = str;
        this.requestType = requestType;
        this.routerStats = routerStats;
        if (!requestType.equals(RequestType.MULTI_GET_STREAMING) && !requestType.equals(RequestType.COMPUTE_STREAMING)) {
            throw new VeniceException("Unexpected request type for streaming: " + requestType + ", and currently only the following types are supported: [" + RequestType.MULTI_GET_STREAMING + ", " + RequestType.COMPUTE_STREAMING + "]");
        }
        this.ctx = channelHandlerContext;
        this.chunkedWriteHandler = veniceChunkedWriteHandler;
        this.writeFuture = channelHandlerContext.newProgressivePromise();
        this.chunkedWriteHandler.setWriteMessageCallback(new WriteMessageCallbackImpl());
    }

    public CompletableFuture<Long> write(ByteBuf byteBuf) {
        return write(byteBuf, CompressionStrategy.NO_OP, null);
    }

    public CompletableFuture<Long> write(ByteBuf byteBuf, CompressionStrategy compressionStrategy) {
        return write(byteBuf, compressionStrategy, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v27, types: [io.netty.channel.ChannelPromise] */
    public CompletableFuture<Long> write(ByteBuf byteBuf, CompressionStrategy compressionStrategy, StreamingCallback<Long> streamingCallback) {
        boolean z = false;
        if (this.responseCompression == null) {
            synchronized (this) {
                if (this.responseCompression == null) {
                    this.responseCompression = compressionStrategy;
                    z = true;
                }
            }
        }
        boolean equals = this.requestType.equals(RequestType.MULTI_GET_STREAMING);
        if (equals && !this.responseCompression.equals(compressionStrategy)) {
            LOGGER.error("Received inconsistent compression for the new write: {}, and previous compression: {}", compressionStrategy, this.responseCompression);
            return CompletableFuture.completedFuture(0L);
        }
        if (z) {
            this.chunkedWriteHandler.write(this.ctx, equals ? RESPONSE_META_MAP_FOR_MULTI_GET.get(compressionStrategy) : RESPONSE_META_FOR_COMPUTE, this.ctx.newPromise().addListener2((GenericFutureListener<? extends Future<? super Void>>) new ResponseMetadataWriteListener()));
            this.chunkedWriteHandler.resumeTransfer();
        }
        Chunk chunk = new Chunk(byteBuf, false, streamingCallback);
        if (!maybeAddChunk(chunk)) {
            return chunk.future;
        }
        if (this.ctx.channel().isOpen()) {
            this.chunkedWriteHandler.resumeTransfer();
        } else {
            this.writeFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) new CleanupCallback(new ClosedChannelException()));
        }
        return chunk.future;
    }

    private synchronized boolean maybeAddChunk(Chunk chunk) {
        if (this.responseCompleteCalled) {
            chunk.resolveChunk(null);
            return false;
        }
        this.chunksToWrite.add(chunk);
        if (!chunk.isLast) {
            return true;
        }
        this.responseCompleteCalled = true;
        return true;
    }

    private void reportResponseSize() {
        this.routerStats.getStatsByType(this.requestType).recordResponseSize(this.storeName, this.totalBytesReceived.get());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void finish(StreamingCallback<Long> streamingCallback) {
        if (!maybeAddChunk(new Chunk(EMPTY_BYTE_BUF, true, streamingCallback))) {
            LOGGER.error("Couldn't add last chunk to the chunk queue");
        } else {
            reportResponseSize();
            this.chunkedWriteHandler.resumeTransfer();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void finishWithError(FullHttpResponse fullHttpResponse, StreamingCallback<Long> streamingCallback) {
        ByteBuf wrappedBuffer;
        LOGGER.debug("Finishing the chunked transfer response with status: {}", fullHttpResponse.status());
        StreamingFooterRecordV1 streamingFooterRecordV1 = new StreamingFooterRecordV1();
        streamingFooterRecordV1.status = fullHttpResponse.status().code();
        streamingFooterRecordV1.detail = fullHttpResponse.content().nioBuffer();
        streamingFooterRecordV1.trailerHeaders = EMPTY_MAP;
        ByteBuffer wrap = ByteBuffer.wrap(STREAMING_FOOTER_SERIALIZER.serialize(streamingFooterRecordV1));
        if (this.requestType.equals(RequestType.MULTI_GET_STREAMING)) {
            MultiGetResponseRecordV1 multiGetResponseRecordV1 = new MultiGetResponseRecordV1();
            multiGetResponseRecordV1.keyIndex = StreamingConstants.KEY_ID_FOR_STREAMING_FOOTER;
            multiGetResponseRecordV1.value = wrap;
            multiGetResponseRecordV1.schemaId = StreamingConstants.STREAMING_FOOTER_SCHEMA_ID;
            wrappedBuffer = Unpooled.wrappedBuffer(MULTI_GET_RESPONSE_SERIALIZER.serialize(multiGetResponseRecordV1));
        } else {
            if (!this.requestType.equals(RequestType.COMPUTE_STREAMING)) {
                LOGGER.error("Received unsupported request type: {} for streaming response", this.requestType);
                return;
            }
            ComputeResponseRecordV1 computeResponseRecordV1 = new ComputeResponseRecordV1();
            computeResponseRecordV1.keyIndex = StreamingConstants.KEY_ID_FOR_STREAMING_FOOTER;
            computeResponseRecordV1.value = wrap;
            wrappedBuffer = Unpooled.wrappedBuffer(COMPUTE_RESPONSE_SERIALIZER.serialize(computeResponseRecordV1));
        }
        if (!maybeAddChunk(new Chunk(wrappedBuffer, true, streamingCallback))) {
            LOGGER.error("Couldn't add last chunk with error status: {} to the internal chunk queue", fullHttpResponse.status());
        } else {
            reportResponseSize();
            this.chunkedWriteHandler.resumeTransfer();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handleChannelWriteFailure(Throwable th, boolean z) {
        Exception exc;
        if (REDUNDANT_LOGGING_FILTER.isRedundantException("Encountered a throwable on channel write failure on channel: ")) {
            LOGGER.error("{}{}", "Encountered a throwable on channel write failure on channel: ", this.ctx.channel());
        } else {
            LOGGER.error("{}{}", "Encountered a throwable on channel write failure on channel: ", this.ctx.channel(), th);
        }
        if (th instanceof Exception) {
            exc = (Exception) th;
        } else {
            exc = new IllegalStateException("Encountered a Throwable - " + th.getMessage());
            if (z) {
                this.ctx.fireExceptionCaught(th);
            }
        }
        if (!this.responseCompleteCalled) {
            this.responseCompleteCalled = true;
            LOGGER.debug("Finished responding to current request on channel: {} with exception: ", this.ctx.channel(), exc);
        }
        if (!this.writeFuture.isDone()) {
            this.writeFuture.setFailure2((Throwable) exc);
        }
        closeChannel();
        cleanupChunks(exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupChunks(Exception exc) {
        Chunk poll = this.chunksAwaitingCallback.poll();
        while (true) {
            Chunk chunk = poll;
            if (chunk == null) {
                break;
            }
            chunk.resolveChunk(exc);
            poll = this.chunksAwaitingCallback.poll();
        }
        Chunk poll2 = this.chunksToWrite.poll();
        while (true) {
            Chunk chunk2 = poll2;
            if (chunk2 == null) {
                return;
            }
            chunk2.resolveChunk(exc);
            poll2 = this.chunksToWrite.poll();
        }
    }

    private void closeChannel() {
        if (this.ctx.channel().isOpen()) {
            this.writeFuture.addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE);
        }
    }

    static {
        for (CompressionStrategy compressionStrategy : CompressionStrategy.values()) {
            HashMap hashMap = new HashMap(VeniceResponseAggregator.MULTI_GET_VALID_HEADER_MAP);
            hashMap.put(HttpConstants.VENICE_COMPRESSION_STRATEGY, Integer.toString(compressionStrategy.getValue()));
            RESPONSE_META_MAP_FOR_MULTI_GET.put(compressionStrategy, new StreamingResponseMeta(hashMap));
        }
        RESPONSE_META_FOR_COMPUTE = new StreamingResponseMeta(VeniceResponseAggregator.COMPUTE_VALID_HEADER_MAP);
    }
}
