package org.apache.flink.runtime.query.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.netty.message.KvStateRequest;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateServerHandler.class */
public class KvStateServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
    private final KvStateRegistry registry;
    private final ExecutorService queryExecutor;
    private final KvStateRequestStats stats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateServerHandler$AsyncKvStateQueryTask.class */
    public static class AsyncKvStateQueryTask implements Runnable {
        private final ChannelHandlerContext ctx;
        private final KvStateRequest request;
        private final InternalKvState<?> kvState;
        private final KvStateRequestStats stats;
        private final long creationNanos = System.nanoTime();

        /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateServerHandler$AsyncKvStateQueryTask$QueryResultWriteListener.class */
        private class QueryResultWriteListener implements ChannelFutureListener {
            private QueryResultWriteListener() {
            }

            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - AsyncKvStateQueryTask.this.creationNanos, TimeUnit.NANOSECONDS);
                if (channelFuture.isSuccess()) {
                    AsyncKvStateQueryTask.this.stats.reportSuccessfulRequest(convert);
                    return;
                }
                if (KvStateServerHandler.LOG.isDebugEnabled()) {
                    KvStateServerHandler.LOG.debug("Query " + AsyncKvStateQueryTask.this.request + " failed after " + convert + " ms", channelFuture.cause());
                }
                AsyncKvStateQueryTask.this.stats.reportFailedRequest();
            }
        }

        public AsyncKvStateQueryTask(ChannelHandlerContext channelHandlerContext, KvStateRequest kvStateRequest, InternalKvState<?> internalKvState, KvStateRequestStats kvStateRequestStats) {
            this.ctx = (ChannelHandlerContext) Objects.requireNonNull(channelHandlerContext, "Channel handler context");
            this.request = (KvStateRequest) Objects.requireNonNull(kvStateRequest, "State query");
            this.kvState = (InternalKvState) Objects.requireNonNull(internalKvState, "KvState");
            this.stats = (KvStateRequestStats) Objects.requireNonNull(kvStateRequestStats, "State query stats");
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            try {
                try {
                    if (!this.ctx.channel().isActive()) {
                        if (0 == 0) {
                            this.stats.reportFailedRequest();
                            return;
                        }
                        return;
                    }
                    byte[] serializedValue = this.kvState.getSerializedValue(this.request.getSerializedKeyAndNamespace());
                    if (serializedValue != null) {
                        ByteBuf serializeKvStateRequestResult = KvStateRequestSerializer.serializeKvStateRequestResult(this.ctx.alloc(), this.request.getRequestId(), serializedValue);
                        int writeBufferHighWaterMark = this.ctx.channel().config().getWriteBufferHighWaterMark();
                        (serializeKvStateRequestResult.readableBytes() <= writeBufferHighWaterMark ? this.ctx.writeAndFlush(serializeKvStateRequestResult) : this.ctx.writeAndFlush(new ChunkedByteBuf(serializeKvStateRequestResult, writeBufferHighWaterMark))).addListener(new QueryResultWriteListener());
                        z = true;
                    } else {
                        this.ctx.writeAndFlush(KvStateRequestSerializer.serializeKvStateRequestFailure(this.ctx.alloc(), this.request.getRequestId(), new UnknownKeyOrNamespace()));
                    }
                    if (z) {
                        return;
                    }
                    this.stats.reportFailedRequest();
                } catch (Throwable th) {
                    try {
                        this.ctx.writeAndFlush(KvStateRequestSerializer.serializeKvStateRequestFailure(this.ctx.alloc(), this.request.getRequestId(), new RuntimeException("Failed to query state backend for query " + this.request.getRequestId() + ". Caused by: " + ExceptionUtils.stringifyException(th))));
                    } catch (IOException e) {
                        KvStateServerHandler.LOG.error("Failed to respond with the error after failed to query state backend", e);
                    }
                    if (0 == 0) {
                        this.stats.reportFailedRequest();
                    }
                }
            } catch (Throwable th2) {
                if (0 == 0) {
                    this.stats.reportFailedRequest();
                }
                throw th2;
            }
        }

        public String toString() {
            return "AsyncKvStateQueryTask{, request=" + this.request + ", creationNanos=" + this.creationNanos + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KvStateServerHandler(KvStateRegistry kvStateRegistry, ExecutorService executorService, KvStateRequestStats kvStateRequestStats) {
        this.registry = (KvStateRegistry) Objects.requireNonNull(kvStateRegistry, "KvStateRegistry");
        this.queryExecutor = (ExecutorService) Objects.requireNonNull(executorService, "Query thread pool");
        this.stats = (KvStateRequestStats) Objects.requireNonNull(kvStateRequestStats, "KvStateRequestStats");
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.stats.reportActiveConnection();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.stats.reportInactiveConnection();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuf serializeServerFailure;
        KvStateRequest kvStateRequest = null;
        try {
            try {
                ByteBuf byteBuf = (ByteBuf) obj;
                KvStateRequestType deserializeHeader = KvStateRequestSerializer.deserializeHeader(byteBuf);
                if (deserializeHeader == KvStateRequestType.REQUEST) {
                    KvStateRequest deserializeKvStateRequest = KvStateRequestSerializer.deserializeKvStateRequest(byteBuf);
                    this.stats.reportRequest();
                    InternalKvState<?> kvState = this.registry.getKvState(deserializeKvStateRequest.getKvStateId());
                    if (kvState != null) {
                        this.queryExecutor.submit(new AsyncKvStateQueryTask(channelHandlerContext, deserializeKvStateRequest, kvState, this.stats));
                    } else {
                        channelHandlerContext.writeAndFlush(KvStateRequestSerializer.serializeKvStateRequestFailure(channelHandlerContext.alloc(), deserializeKvStateRequest.getRequestId(), new UnknownKvStateID(deserializeKvStateRequest.getKvStateId())));
                        this.stats.reportFailedRequest();
                    }
                } else {
                    channelHandlerContext.writeAndFlush(KvStateRequestSerializer.serializeServerFailure(channelHandlerContext.alloc(), new IllegalArgumentException("Unexpected message type " + deserializeHeader + ". KvStateServerHandler expects " + KvStateRequestType.REQUEST + " messages.")));
                }
                ReferenceCountUtil.release(obj);
            } catch (Throwable th) {
                String stringifyException = ExceptionUtils.stringifyException(th);
                if (0 != 0) {
                    serializeServerFailure = KvStateRequestSerializer.serializeKvStateRequestFailure(channelHandlerContext.alloc(), kvStateRequest.getRequestId(), new RuntimeException("Failed to handle incoming request with ID " + kvStateRequest.getRequestId() + ". Caused by: " + stringifyException));
                    this.stats.reportFailedRequest();
                } else {
                    serializeServerFailure = KvStateRequestSerializer.serializeServerFailure(channelHandlerContext.alloc(), new RuntimeException("Failed to handle incoming message. Caused by: " + stringifyException));
                }
                channelHandlerContext.writeAndFlush(serializeServerFailure);
                ReferenceCountUtil.release(obj);
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.release(obj);
            throw th2;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        channelHandlerContext.writeAndFlush(KvStateRequestSerializer.serializeServerFailure(channelHandlerContext.alloc(), new RuntimeException("Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(th)))).addListener(ChannelFutureListener.CLOSE);
    }
}
