package org.apache.giraph.comm.netty.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.util.ReferenceCountUtil;
import java.io.DataInput;
import java.io.IOException;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.SaslNettyClient;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.SaslCompleteRequest;
import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/netty/handler/SaslClientHandler.class */
public class SaslClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = Logger.getLogger(SaslClientHandler.class);
    private final Configuration conf;

    public SaslClientHandler(Configuration configuration) {
        this.conf = configuration;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        WritableRequest decode = decode(channelHandlerContext, obj);
        SaslNettyClient saslNettyClient = (SaslNettyClient) channelHandlerContext.attr(NettyClient.SASL).get();
        if (saslNettyClient == null) {
            throw new Exception("handleUpstream: saslNettyClient was unexpectedly null for channel: " + channelHandlerContext.channel());
        }
        if (decode.getClass() == SaslCompleteRequest.class) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("handleUpstream: Server has sent us the SaslComplete message. Allowing normal work to proceed.");
            }
            synchronized (saslNettyClient.getAuthenticated()) {
                saslNettyClient.getAuthenticated().notify();
            }
            if (!saslNettyClient.isComplete()) {
                LOG.error("handleUpstream: Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.");
                throw new Exception("handleUpstream: Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.");
            }
            channelHandlerContext.pipeline().remove(this);
            channelHandlerContext.pipeline().replace("length-field-based-frame-decoder", "fixed-length-frame-decoder", new FixedLengthFrameDecoder(16));
            return;
        }
        SaslTokenMessageRequest saslTokenMessageRequest = (SaslTokenMessageRequest) decode;
        if (LOG.isDebugEnabled()) {
            LOG.debug("handleUpstream: Responding to server's token of length: " + saslTokenMessageRequest.getSaslToken().length);
        }
        byte[] saslResponse = saslNettyClient.saslResponse(saslTokenMessageRequest);
        if (saslResponse != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("handleUpstream: Response to server token has length:" + saslResponse.length);
            }
            channelHandlerContext.channel().writeAndFlush(new SaslTokenMessageRequest(saslResponse));
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("handleUpstream: Response to server is null: authentication should now be complete.");
        }
        if (saslNettyClient.isComplete()) {
            return;
        }
        LOG.warn("handleUpstream: Generated a null response, but authentication is not complete.");
    }

    protected WritableRequest decode(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof ByteBuf)) {
            throw new IllegalStateException("decode: Got illegal message " + obj);
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        DataInput byteBufInputStream = new ByteBufInputStream(byteBuf);
        RequestType requestType = RequestType.values()[byteBufInputStream.readByte()];
        if (LOG.isDebugEnabled()) {
            LOG.debug("decode: Got a response of type " + requestType + " from server:" + channelHandlerContext.channel().remoteAddress());
        }
        WritableRequest writableRequest = (WritableRequest) ReflectionUtils.newInstance(requestType.getRequestClass(), this.conf);
        try {
            writableRequest.readFields(byteBufInputStream);
        } catch (IOException e) {
            LOG.error("decode: Exception when trying to read server response: " + e);
        }
        ReferenceCountUtil.release(byteBuf);
        return writableRequest;
    }
}
