/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.network.TestManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportRequestHandler;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TransportRequestHandlerSuite {
    @Test
    public void handleFetchRequestAndStreamRequest() throws Exception {
        NoOpRpcHandler rpcHandler = new NoOpRpcHandler();
        OneForOneStreamManager streamManager = (OneForOneStreamManager)rpcHandler.getStreamManager();
        Channel channel = (Channel)Mockito.mock(Channel.class);
        ArrayList responseAndPromisePairs = new ArrayList();
        Mockito.when((Object)channel.writeAndFlush(Mockito.any())).thenAnswer(invocationOnMock0 -> {
            Object response = invocationOnMock0.getArguments()[0];
            ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
            responseAndPromisePairs.add(ImmutablePair.of((Object)response, (Object)((Object)channelFuture)));
            return channelFuture;
        });
        ArrayList<TestManagedBuffer> managedBuffers = new ArrayList<TestManagedBuffer>();
        managedBuffers.add(new TestManagedBuffer(10));
        managedBuffers.add(new TestManagedBuffer(20));
        managedBuffers.add(new TestManagedBuffer(30));
        managedBuffers.add(new TestManagedBuffer(40));
        long streamId = streamManager.registerStream("test-app", managedBuffers.iterator());
        streamManager.registerChannel(channel, streamId);
        TransportClient reverseClient = (TransportClient)Mockito.mock(TransportClient.class);
        TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient, (RpcHandler)rpcHandler, Long.valueOf(2L));
        ChunkFetchRequest request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
        requestHandler.handle((RequestMessage)request0);
        assert (responseAndPromisePairs.size() == 1);
        assert (((Pair)responseAndPromisePairs.get(0)).getLeft() instanceof ChunkFetchSuccess);
        assert (((ChunkFetchSuccess)((Pair)responseAndPromisePairs.get(0)).getLeft()).body() == managedBuffers.get(0));
        ChunkFetchRequest request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1));
        requestHandler.handle((RequestMessage)request1);
        assert (responseAndPromisePairs.size() == 2);
        assert (((Pair)responseAndPromisePairs.get(1)).getLeft() instanceof ChunkFetchSuccess);
        assert (((ChunkFetchSuccess)((Pair)responseAndPromisePairs.get(1)).getLeft()).body() == managedBuffers.get(1));
        ((ExtendedChannelPromise)((Object)((Pair)responseAndPromisePairs.get(0)).getRight())).finish(true);
        StreamRequest request2 = new StreamRequest(String.format("%d_%d", streamId, 2));
        requestHandler.handle((RequestMessage)request2);
        assert (responseAndPromisePairs.size() == 3);
        assert (((Pair)responseAndPromisePairs.get(2)).getLeft() instanceof StreamResponse);
        assert (((StreamResponse)((Pair)responseAndPromisePairs.get(2)).getLeft()).body() == managedBuffers.get(2));
        StreamRequest request3 = new StreamRequest(String.format("%d_%d", streamId, 3));
        requestHandler.handle((RequestMessage)request3);
        ((Channel)Mockito.verify((Object)channel, (VerificationMode)Mockito.times((int)1))).close();
        assert (responseAndPromisePairs.size() == 3);
    }

    private class ExtendedChannelPromise
    extends DefaultChannelPromise {
        private List<GenericFutureListener<Future<Void>>> listeners;
        private boolean success;

        ExtendedChannelPromise(Channel channel) {
            super(channel);
            this.listeners = new ArrayList<GenericFutureListener<Future<Void>>>();
            this.success = false;
        }

        public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            GenericFutureListener<? extends Future<? super Void>> gfListener = listener;
            this.listeners.add(gfListener);
            return super.addListener(listener);
        }

        public boolean isSuccess() {
            return this.success;
        }

        public void finish(boolean success) {
            this.success = success;
            this.listeners.forEach(listener -> {
                try {
                    listener.operationComplete((Future)this);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
        }
    }
}

