package alluxio.job.plan.replicate;

import alluxio.AlluxioURI;
import alluxio.ClientContext;
import alluxio.client.block.AlluxioBlockStore;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.block.stream.TestBlockInStream;
import alluxio.client.block.stream.TestBlockOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.Pair;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.job.JobServerContext;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.util.SerializableVoid;
import alluxio.underfs.UfsManager;
import alluxio.util.io.BufferUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.FileInfo;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({AlluxioBlockStore.class, FileSystemContext.class, JobServerContext.class, BlockInStream.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:alluxio/job/plan/replicate/ReplicateDefinitionTest.class */
public final class ReplicateDefinitionTest {
    private static final long TEST_BLOCK_ID = 1;
    private static final long TEST_BLOCK_SIZE = 512;
    private static final int MAX_BYTES = 1000;
    private static final WorkerNetAddress ADDRESS_1 = new WorkerNetAddress().setHost("host1").setDataPort(10);
    private static final WorkerNetAddress ADDRESS_2 = new WorkerNetAddress().setHost("host2").setDataPort(10);
    private static final WorkerNetAddress ADDRESS_3 = new WorkerNetAddress().setHost("host3").setDataPort(10);
    private static final WorkerNetAddress LOCAL_ADDRESS = new WorkerNetAddress().setHost(NetworkAddressUtils.getLocalHostName((int) ServerConfiguration.getMs(PropertyKey.NETWORK_HOST_RESOLUTION_TIMEOUT_MS))).setDataPort(10);
    private static final WorkerInfo WORKER_INFO_1 = new WorkerInfo().setAddress(ADDRESS_1);
    private static final WorkerInfo WORKER_INFO_2 = new WorkerInfo().setAddress(ADDRESS_2);
    private static final WorkerInfo WORKER_INFO_3 = new WorkerInfo().setAddress(ADDRESS_3);
    private static final String TEST_PATH = "/test";
    private FileSystemContext mMockFileSystemContext;
    private AlluxioBlockStore mMockBlockStore;
    private FileSystem mMockFileSystem;
    private JobServerContext mMockJobServerContext;
    private UfsManager mMockUfsManager;
    private BlockInfo mTestBlockInfo;
    private URIStatus mTestStatus;

    @Rule
    public final ExpectedException mThrown = ExpectedException.none();

    @Before
    public void before() throws Exception {
        this.mMockFileSystemContext = (FileSystemContext) PowerMockito.mock(FileSystemContext.class);
        Mockito.when(this.mMockFileSystemContext.getClientContext()).thenReturn(ClientContext.create(ServerConfiguration.global()));
        Mockito.when(this.mMockFileSystemContext.getClusterConf()).thenReturn(ServerConfiguration.global());
        this.mMockBlockStore = (AlluxioBlockStore) PowerMockito.mock(AlluxioBlockStore.class);
        this.mMockFileSystem = (FileSystem) Mockito.mock(FileSystem.class);
        this.mMockUfsManager = (UfsManager) Mockito.mock(UfsManager.class);
        this.mMockJobServerContext = new JobServerContext(this.mMockFileSystem, this.mMockFileSystemContext, this.mMockUfsManager);
        PowerMockito.mockStatic(AlluxioBlockStore.class, new Class[0]);
        Mockito.when(AlluxioBlockStore.create(this.mMockFileSystemContext)).thenReturn(this.mMockBlockStore);
        this.mTestBlockInfo = new BlockInfo().setBlockId(TEST_BLOCK_ID).setLength(TEST_BLOCK_SIZE);
        Mockito.when(this.mMockBlockStore.getInfo(TEST_BLOCK_ID)).thenReturn(this.mTestBlockInfo);
        this.mTestStatus = new URIStatus(new FileInfo().setPath(TEST_PATH).setBlockIds(Lists.newArrayList(new Long[]{Long.valueOf(TEST_BLOCK_ID)})).setPersisted(true).setFileBlockInfos(Lists.newArrayList(new FileBlockInfo[]{new FileBlockInfo().setBlockInfo(this.mTestBlockInfo)})));
    }

    private Set<Pair<WorkerInfo, SerializableVoid>> selectExecutorsTestHelper(int i, List<WorkerInfo> list) throws Exception {
        return new ReplicateDefinition().selectExecutors(new ReplicateConfig(TEST_PATH, TEST_BLOCK_ID, i), list, new SelectExecutorsContext(TEST_BLOCK_ID, this.mMockJobServerContext));
    }

    private void runTaskReplicateTestHelper(List<BlockWorkerInfo> list, BlockInStream blockInStream, BlockOutStream blockOutStream) throws Exception {
        Mockito.when(this.mMockFileSystem.getStatus((AlluxioURI) ArgumentMatchers.any(AlluxioURI.class))).thenReturn(this.mTestStatus);
        Mockito.when(this.mMockFileSystemContext.getCachedWorkers()).thenReturn(list);
        Mockito.when(this.mMockBlockStore.getInStream(ArgumentMatchers.anyLong(), (InStreamOptions) ArgumentMatchers.any(InStreamOptions.class))).thenReturn(blockInStream);
        Mockito.when(this.mMockBlockStore.getInStream((BlockInfo) ArgumentMatchers.any(BlockInfo.class), (InStreamOptions) ArgumentMatchers.any(InStreamOptions.class), (Map) ArgumentMatchers.any(Map.class))).thenReturn(blockInStream);
        PowerMockito.mockStatic(BlockInStream.class, new Class[0]);
        Mockito.when(BlockInStream.create((FileSystemContext) ArgumentMatchers.any(FileSystemContext.class), (BlockInfo) ArgumentMatchers.any(BlockInfo.class), (WorkerNetAddress) ArgumentMatchers.any(WorkerNetAddress.class), (BlockInStream.BlockInStreamSource) ArgumentMatchers.any(BlockInStream.BlockInStreamSource.class), (InStreamOptions) ArgumentMatchers.any(InStreamOptions.class))).thenReturn(blockInStream);
        Mockito.when(this.mMockBlockStore.getOutStream(ArgumentMatchers.eq(TEST_BLOCK_ID), ArgumentMatchers.eq(TEST_BLOCK_SIZE), (WorkerNetAddress) ArgumentMatchers.eq(LOCAL_ADDRESS), (OutStreamOptions) ArgumentMatchers.any(OutStreamOptions.class))).thenReturn(blockOutStream);
        Mockito.when(this.mMockBlockStore.getInfo(TEST_BLOCK_ID)).thenReturn(this.mTestBlockInfo.setLocations(Lists.newArrayList(new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)})));
        PowerMockito.mockStatic(AlluxioBlockStore.class, new Class[0]);
        Mockito.when(AlluxioBlockStore.create((FileSystemContext) ArgumentMatchers.any(FileSystemContext.class))).thenReturn(this.mMockBlockStore);
        new ReplicateDefinition().runTask(new ReplicateConfig(TEST_PATH, TEST_BLOCK_ID, 1), (SerializableVoid) null, new RunTaskContext(TEST_BLOCK_ID, TEST_BLOCK_ID, this.mMockJobServerContext));
    }

    @Test
    public void selectExecutorsOnlyOneWorkerAvailable() throws Exception {
        this.mTestBlockInfo.setLocations(Lists.newArrayList());
        Set<Pair<WorkerInfo, SerializableVoid>> selectExecutorsTestHelper = selectExecutorsTestHelper(1, Lists.newArrayList(new WorkerInfo[]{WORKER_INFO_1}));
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new Pair(WORKER_INFO_1, (Object) null));
        Assert.assertEquals(newHashSet, selectExecutorsTestHelper);
    }

    @Test
    public void selectExecutorsOnlyOneWorkerValid() throws Exception {
        this.mTestBlockInfo.setLocations(Lists.newArrayList(new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SerializableVoid>> selectExecutorsTestHelper = selectExecutorsTestHelper(1, Lists.newArrayList(new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2}));
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new Pair(WORKER_INFO_2, (Object) null));
        Assert.assertEquals(newHashSet, selectExecutorsTestHelper);
    }

    @Test
    public void selectExecutorsTwoWorkersValid() throws Exception {
        this.mTestBlockInfo.setLocations(Lists.newArrayList(new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SerializableVoid>> selectExecutorsTestHelper = selectExecutorsTestHelper(2, Lists.newArrayList(new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2, WORKER_INFO_3}));
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new Pair(WORKER_INFO_2, (Object) null));
        newHashSet.add(new Pair(WORKER_INFO_3, (Object) null));
        Assert.assertEquals(newHashSet, selectExecutorsTestHelper);
    }

    @Test
    public void selectExecutorsOneOutOFTwoWorkersValid() throws Exception {
        this.mTestBlockInfo.setLocations(Lists.newArrayList(new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SerializableVoid>> selectExecutorsTestHelper = selectExecutorsTestHelper(1, Lists.newArrayList(new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2, WORKER_INFO_3}));
        Assert.assertEquals(TEST_BLOCK_ID, selectExecutorsTestHelper.size());
        Assert.assertEquals((Object) null, selectExecutorsTestHelper.iterator().next().getSecond());
    }

    @Test
    public void selectExecutorsNoWorkerValid() throws Exception {
        this.mTestBlockInfo.setLocations(Lists.newArrayList(new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Assert.assertEquals(ImmutableSet.of(), selectExecutorsTestHelper(1, Lists.newArrayList(new WorkerInfo[]{WORKER_INFO_1})));
    }

    @Test
    public void selectExecutorsInsufficientWorkerValid() throws Exception {
        this.mTestBlockInfo.setLocations(Lists.newArrayList(new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SerializableVoid>> selectExecutorsTestHelper = selectExecutorsTestHelper(2, Lists.newArrayList(new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2}));
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new Pair(WORKER_INFO_2, (Object) null));
        Assert.assertEquals(newHashSet, selectExecutorsTestHelper);
    }

    @Test
    public void runTaskNoBlockWorker() throws Exception {
        TestBlockInStream testBlockInStream = new TestBlockInStream(BufferUtils.getIncreasingByteArray(0, 512), TEST_BLOCK_ID, r0.length, false, BlockInStream.BlockInStreamSource.NODE_LOCAL);
        TestBlockOutStream testBlockOutStream = new TestBlockOutStream(ByteBuffer.allocate(MAX_BYTES), TEST_BLOCK_SIZE);
        this.mThrown.expect(NotFoundException.class);
        this.mThrown.expectMessage(ExceptionMessage.NO_LOCAL_BLOCK_WORKER_LOAD_TASK.getMessage(new Object[]{Long.valueOf(TEST_BLOCK_ID)}));
        runTaskReplicateTestHelper(Lists.newArrayList(), testBlockInStream, testBlockOutStream);
    }

    @Test
    public void runTaskLocalBlockWorkerDifferentFileStatus() throws Exception {
        for (boolean z : new boolean[]{true, false}) {
            boolean[] zArr = {true, false};
            int length = zArr.length;
            for (int i = 0; i < length; i++) {
                boolean z2 = zArr[i];
                this.mTestStatus.getFileInfo().setPersisted(z).setMediumTypes(z2 ? Sets.newHashSet(new String[]{"MEM"}) : Collections.emptySet());
                byte[] increasingByteArray = BufferUtils.getIncreasingByteArray(0, 512);
                TestBlockInStream testBlockInStream = new TestBlockInStream(increasingByteArray, TEST_BLOCK_ID, increasingByteArray.length, false, BlockInStream.BlockInStreamSource.NODE_LOCAL);
                TestBlockOutStream testBlockOutStream = new TestBlockOutStream(ByteBuffer.allocate(MAX_BYTES), TEST_BLOCK_SIZE);
                runTaskReplicateTestHelper(Lists.newArrayList(new BlockWorkerInfo[]{new BlockWorkerInfo(LOCAL_ADDRESS, TEST_BLOCK_SIZE, 0L)}), testBlockInStream, testBlockOutStream);
                Assert.assertEquals(TEST_BLOCK_SIZE, testBlockInStream.getBytesRead());
                if (!z || z2) {
                    Assert.assertArrayEquals(String.format("input-output mismatched: pinned=%s, persisted=%s", Boolean.valueOf(z2), Boolean.valueOf(z)), increasingByteArray, testBlockOutStream.getWrittenData());
                }
            }
        }
    }

    @Test
    public void runTaskInputIOException() throws Exception {
        this.mTestStatus.getFileInfo().setMediumTypes(Sets.newHashSet(new String[]{"MEM"}));
        BlockInStream blockInStream = (BlockInStream) Mockito.mock(BlockInStream.class);
        BlockOutStream blockOutStream = (BlockOutStream) Mockito.mock(BlockOutStream.class);
        BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo(LOCAL_ADDRESS, TEST_BLOCK_SIZE, 0L);
        ((BlockInStream) Mockito.doThrow(new Throwable[]{new IOException("test")}).when(blockInStream)).read((byte[]) ArgumentMatchers.any(byte[].class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        ((BlockInStream) Mockito.doThrow(new Throwable[]{new IOException("test")}).when(blockInStream)).read((byte[]) ArgumentMatchers.any(byte[].class));
        try {
            runTaskReplicateTestHelper(Lists.newArrayList(new BlockWorkerInfo[]{blockWorkerInfo}), blockInStream, blockOutStream);
            Assert.fail("Expected the task to throw and IOException");
        } catch (IOException e) {
            Assert.assertEquals("test", e.getMessage());
        }
        ((BlockOutStream) Mockito.verify(blockOutStream)).cancel();
    }
}
