package org.apache.giraph.comm.netty.handler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.Thread;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/netty/handler/RequestServerHandler.class */
public abstract class RequestServerHandler<R> extends ChannelInboundHandlerAdapter {
    public static final int RESPONSE_BYTES = 16;
    private static Time TIME = SystemTime.get();
    private static final Logger LOG = Logger.getLogger(RequestServerHandler.class);
    private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
    protected FlowControl flowControl;
    private final boolean closeFirstRequest;
    private final WorkerRequestReservedMap workerRequestReservedMap;
    private final TaskInfo myTaskInfo;
    private final Thread.UncaughtExceptionHandler exceptionHandler;
    private final boolean nettyAutoRead;
    private long startProcessingNanoseconds = -1;
    private final AtomicBoolean firstRead = new AtomicBoolean(true);

    /* loaded from: input_file:org/apache/giraph/comm/netty/handler/RequestServerHandler$Factory.class */
    public interface Factory {
        RequestServerHandler newHandler(WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, TaskInfo taskInfo, Thread.UncaughtExceptionHandler uncaughtExceptionHandler);

        void setFlowControl(FlowControl flowControl);
    }

    public RequestServerHandler(WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, TaskInfo taskInfo, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.workerRequestReservedMap = workerRequestReservedMap;
        this.closeFirstRequest = GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(immutableClassesGiraphConfiguration);
        this.myTaskInfo = taskInfo;
        this.exceptionHandler = uncaughtExceptionHandler;
        this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(immutableClassesGiraphConfiguration);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("messageReceived: Got " + obj.getClass());
        }
        WritableRequest writableRequest = (WritableRequest) obj;
        if (this.closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
            LOG.info("messageReceived: Simulating closing channel on first request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId());
            setAlreadyClosedFirstRequest();
            channelHandlerContext.close();
            return;
        }
        AckSignalFlag ackSignalFlag = AckSignalFlag.DUPLICATE_REQUEST;
        if (this.workerRequestReservedMap.reserveRequest(Integer.valueOf(writableRequest.getClientId()), writableRequest.getRequestId())) {
            if (LOG.isDebugEnabled()) {
                this.startProcessingNanoseconds = TIME.getNanoseconds();
            }
            processRequest(writableRequest);
            if (LOG.isDebugEnabled()) {
                LOG.debug("messageReceived: Processing client " + writableRequest.getClientId() + ", requestId " + writableRequest.getRequestId() + ", " + writableRequest.getType() + " took " + Times.getNanosSince(TIME, this.startProcessingNanoseconds) + " ns");
            }
            ackSignalFlag = AckSignalFlag.NEW_REQUEST;
        } else {
            LOG.info("messageReceived: Request id " + writableRequest.getRequestId() + " from client " + writableRequest.getClientId() + " was already processed, not processing again.");
        }
        ByteBuf buffer = channelHandlerContext.alloc().buffer(16);
        buffer.writeInt(this.myTaskInfo.getTaskId());
        buffer.writeLong(writableRequest.getRequestId());
        buffer.writeInt(this.flowControl.calculateResponse(ackSignalFlag, writableRequest.getClientId()));
        channelHandlerContext.write(buffer);
        if (this.nettyAutoRead || !this.firstRead.compareAndSet(true, false)) {
            return;
        }
        channelHandlerContext.channel().config().setAutoRead(false);
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.nettyAutoRead) {
            super.channelReadComplete(channelHandlerContext);
        } else {
            channelHandlerContext.read();
        }
    }

    private static void setAlreadyClosedFirstRequest() {
        ALREADY_CLOSED_FIRST_REQUEST = true;
    }

    public abstract void processRequest(R r);

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("channelActive: Connected the channel on " + channelHandlerContext.channel().remoteAddress());
        }
        channelHandlerContext.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("channelInactive: Closed the channel on " + channelHandlerContext.channel().remoteAddress());
        }
        channelHandlerContext.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.exceptionHandler.uncaughtException(Thread.currentThread(), th);
    }
}
