package org.apache.giraph.comm.netty.handler;

import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

/* loaded from: input_file:org/apache/giraph/comm/netty/handler/RequestServerHandler.class */
public abstract class RequestServerHandler<R> extends SimpleChannelUpstreamHandler {
    public static final int RESPONSE_BYTES = 13;
    private static Time TIME = SystemTime.get();
    private static final Logger LOG = Logger.getLogger(RequestServerHandler.class);
    private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
    private final boolean closeFirstRequest;
    private final WorkerRequestReservedMap workerRequestReservedMap;
    private final TaskInfo myTaskInfo;
    private long startProcessingNanoseconds = -1;

    /* loaded from: input_file:org/apache/giraph/comm/netty/handler/RequestServerHandler$Factory.class */
    public interface Factory {
        RequestServerHandler newHandler(WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, TaskInfo taskInfo);
    }

    public RequestServerHandler(WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, TaskInfo taskInfo) {
        this.workerRequestReservedMap = workerRequestReservedMap;
        this.closeFirstRequest = GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(immutableClassesGiraphConfiguration);
        this.myTaskInfo = taskInfo;
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("messageReceived: Got " + messageEvent.getMessage().getClass());
        }
        WritableRequest writableRequest = (WritableRequest) messageEvent.getMessage();
        if (this.closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
            LOG.info("messageReceived: Simulating closing channel on first request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId());
            ALREADY_CLOSED_FIRST_REQUEST = true;
            channelHandlerContext.getChannel().close();
            return;
        }
        int i = 1;
        if (this.workerRequestReservedMap.reserveRequest(Integer.valueOf(writableRequest.getClientId()), writableRequest.getRequestId())) {
            if (LOG.isDebugEnabled()) {
                this.startProcessingNanoseconds = TIME.getNanoseconds();
            }
            processRequest(writableRequest);
            if (LOG.isDebugEnabled()) {
                LOG.debug("messageReceived: Processing client " + writableRequest.getClientId() + ", requestId " + writableRequest.getRequestId() + ", " + writableRequest.getType() + " took " + Times.getNanosSince(TIME, this.startProcessingNanoseconds) + " ns");
            }
            i = 0;
        } else {
            LOG.info("messageReceived: Request id " + writableRequest.getRequestId() + " from client " + writableRequest.getClientId() + " was already processed, not processing again.");
        }
        ChannelBuffer directBuffer = ChannelBuffers.directBuffer(13);
        directBuffer.writeInt(this.myTaskInfo.getTaskId());
        directBuffer.writeLong(writableRequest.getRequestId());
        directBuffer.writeByte(i);
        messageEvent.getChannel().write(directBuffer);
    }

    public abstract void processRequest(R r);

    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("channelConnected: Connected the channel on " + channelHandlerContext.getChannel().getRemoteAddress());
        }
    }

    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("channelClosed: Closed the channel on " + channelHandlerContext.getChannel().getRemoteAddress() + " with event " + channelStateEvent);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        LOG.warn("exceptionCaught: Channel failed with remote address " + channelHandlerContext.getChannel().getRemoteAddress(), exceptionEvent.getCause());
    }
}
