package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.annotations.VisibleForTesting;
import org.spark-project.guava.collect.Lists;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.class */
public class ExternalShuffleBlockHandler extends RpcHandler {
    private final Logger logger;

    @VisibleForTesting
    final ExternalShuffleBlockResolver blockManager;
    private final OneForOneStreamManager streamManager;

    public ExternalShuffleBlockHandler(TransportConf transportConf, File file) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(transportConf, file));
    }

    @VisibleForTesting
    public ExternalShuffleBlockHandler(OneForOneStreamManager oneForOneStreamManager, ExternalShuffleBlockResolver externalShuffleBlockResolver) {
        this.logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
        this.streamManager = oneForOneStreamManager;
        this.blockManager = externalShuffleBlockResolver;
    }

    public void receive(TransportClient transportClient, byte[] bArr, RpcResponseCallback rpcResponseCallback) {
        handleMessage(BlockTransferMessage.Decoder.fromByteArray(bArr), transportClient, rpcResponseCallback);
    }

    protected void handleMessage(BlockTransferMessage blockTransferMessage, TransportClient transportClient, RpcResponseCallback rpcResponseCallback) {
        if (!(blockTransferMessage instanceof OpenBlocks)) {
            if (!(blockTransferMessage instanceof RegisterExecutor)) {
                throw new UnsupportedOperationException("Unexpected message: " + blockTransferMessage);
            }
            RegisterExecutor registerExecutor = (RegisterExecutor) blockTransferMessage;
            checkAuth(transportClient, registerExecutor.appId);
            this.blockManager.registerExecutor(registerExecutor.appId, registerExecutor.execId, registerExecutor.executorInfo);
            rpcResponseCallback.onSuccess(new byte[0]);
            return;
        }
        OpenBlocks openBlocks = (OpenBlocks) blockTransferMessage;
        checkAuth(transportClient, openBlocks.appId);
        ArrayList newArrayList = Lists.newArrayList();
        for (String str : openBlocks.blockIds) {
            newArrayList.add(this.blockManager.getBlockData(openBlocks.appId, openBlocks.execId, str));
        }
        long registerStream = this.streamManager.registerStream(transportClient.getClientId(), newArrayList.iterator());
        this.logger.trace("Registered streamId {} with {} buffers", Long.valueOf(registerStream), Integer.valueOf(openBlocks.blockIds.length));
        rpcResponseCallback.onSuccess(new StreamHandle(registerStream, openBlocks.blockIds.length).toByteArray());
    }

    public StreamManager getStreamManager() {
        return this.streamManager;
    }

    public void applicationRemoved(String str, boolean z) {
        this.blockManager.applicationRemoved(str, z);
    }

    public void reregisterExecutor(ExternalShuffleBlockResolver.AppExecId appExecId, ExecutorShuffleInfo executorShuffleInfo) {
        this.blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorShuffleInfo);
    }

    public void close() {
        this.blockManager.close();
    }

    private void checkAuth(TransportClient transportClient, String str) {
        if (transportClient.getClientId() != null && !transportClient.getClientId().equals(str)) {
            throw new SecurityException(String.format("Client for %s not authorized for application %s.", transportClient.getClientId(), str));
        }
    }
}
