package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/operator/TestStreamingExchangeClientBuffer.class */
public class TestStreamingExchangeClientBuffer {
    private static final StageId STAGE_ID = new StageId(new QueryId("query"), 0);
    private static final TaskId TASK_0 = new TaskId(STAGE_ID, 0, 0);
    private static final TaskId TASK_1 = new TaskId(STAGE_ID, 1, 0);
    private static final Slice PAGE_0 = Slices.utf8Slice("page0");
    private static final Slice PAGE_1 = Slices.utf8Slice("page-1");
    private static final Slice PAGE_2 = Slices.utf8Slice("page-_2");

    @Test
    public void testHappyPath() {
        StreamingExchangeClientBuffer streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertNull(streamingExchangeClientBuffer.pollPage());
            streamingExchangeClientBuffer.addTask(TASK_0);
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertNull(streamingExchangeClientBuffer.pollPage());
            streamingExchangeClientBuffer.addTask(TASK_1);
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertNull(streamingExchangeClientBuffer.pollPage());
            streamingExchangeClientBuffer.noMoreTasks();
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertNull(streamingExchangeClientBuffer.pollPage());
            streamingExchangeClientBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            Assert.assertEquals(streamingExchangeClientBuffer.getBufferedPageCount(), 1);
            Assert.assertEquals(streamingExchangeClientBuffer.getRetainedSizeInBytes(), PAGE_0.getRetainedSize());
            Assert.assertEquals(streamingExchangeClientBuffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize());
            Assert.assertEquals(streamingExchangeClientBuffer.getRemainingCapacityInBytes(), DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes() - PAGE_0.getRetainedSize());
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertEquals(streamingExchangeClientBuffer.pollPage(), PAGE_0);
            Assert.assertEquals(streamingExchangeClientBuffer.getRetainedSizeInBytes(), 0L);
            Assert.assertEquals(streamingExchangeClientBuffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize());
            Assert.assertEquals(streamingExchangeClientBuffer.getRemainingCapacityInBytes(), DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes());
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            streamingExchangeClientBuffer.taskFinished(TASK_0);
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            streamingExchangeClientBuffer.addPages(TASK_1, ImmutableList.of(PAGE_1, PAGE_2));
            Assert.assertEquals(streamingExchangeClientBuffer.getBufferedPageCount(), 2);
            Assert.assertEquals(streamingExchangeClientBuffer.getRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assert.assertEquals(streamingExchangeClientBuffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assert.assertEquals(streamingExchangeClientBuffer.getRemainingCapacityInBytes(), (DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes() - PAGE_1.getRetainedSize()) - PAGE_2.getRetainedSize());
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertEquals(streamingExchangeClientBuffer.pollPage(), PAGE_1);
            Assert.assertEquals(streamingExchangeClientBuffer.pollPage(), PAGE_2);
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            Assert.assertEquals(streamingExchangeClientBuffer.getRetainedSizeInBytes(), 0L);
            Assert.assertEquals(streamingExchangeClientBuffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assert.assertEquals(streamingExchangeClientBuffer.getRemainingCapacityInBytes(), DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes());
            streamingExchangeClientBuffer.taskFinished(TASK_1);
            Assert.assertTrue(streamingExchangeClientBuffer.isFinished());
            Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
            streamingExchangeClientBuffer.close();
        } catch (Throwable th) {
            try {
                streamingExchangeClientBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testClose() {
        StreamingExchangeClientBuffer streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        streamingExchangeClientBuffer.addTask(TASK_0);
        streamingExchangeClientBuffer.addTask(TASK_1);
        Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
        Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
        Assert.assertNull(streamingExchangeClientBuffer.pollPage());
        streamingExchangeClientBuffer.close();
        Assert.assertTrue(streamingExchangeClientBuffer.isFinished());
        Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
        Assert.assertNull(streamingExchangeClientBuffer.pollPage());
    }

    @Test
    public void testIsFinished() {
        StreamingExchangeClientBuffer streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
            streamingExchangeClientBuffer.noMoreTasks();
            Assert.assertTrue(streamingExchangeClientBuffer.isFinished());
            Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
            streamingExchangeClientBuffer.close();
            streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
            try {
                Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
                Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
                streamingExchangeClientBuffer.addTask(TASK_0);
                streamingExchangeClientBuffer.noMoreTasks();
                Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
                Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
                streamingExchangeClientBuffer.taskFinished(TASK_0);
                Assert.assertTrue(streamingExchangeClientBuffer.isFinished());
                Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
                streamingExchangeClientBuffer.close();
                streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
                try {
                    Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
                    Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
                    streamingExchangeClientBuffer.addTask(TASK_0);
                    Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
                    Assert.assertFalse(streamingExchangeClientBuffer.isBlocked().isDone());
                    RuntimeException runtimeException = new RuntimeException();
                    streamingExchangeClientBuffer.taskFailed(TASK_0, runtimeException);
                    Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
                    Assert.assertTrue(streamingExchangeClientBuffer.isFailed());
                    Assert.assertTrue(streamingExchangeClientBuffer.isBlocked().isDone());
                    Objects.requireNonNull(streamingExchangeClientBuffer);
                    Assertions.assertThatThrownBy(streamingExchangeClientBuffer::pollPage).isEqualTo(runtimeException);
                    streamingExchangeClientBuffer.close();
                } finally {
                    try {
                        streamingExchangeClientBuffer.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testFutureCancellationDoesNotAffectOtherFutures() {
        StreamingExchangeClientBuffer streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            ListenableFuture isBlocked = streamingExchangeClientBuffer.isBlocked();
            ListenableFuture isBlocked2 = streamingExchangeClientBuffer.isBlocked();
            ListenableFuture isBlocked3 = streamingExchangeClientBuffer.isBlocked();
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertFalse(isBlocked2.isDone());
            Assert.assertFalse(isBlocked3.isDone());
            isBlocked3.cancel(true);
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertFalse(isBlocked2.isDone());
            streamingExchangeClientBuffer.noMoreTasks();
            Assert.assertTrue(streamingExchangeClientBuffer.isFinished());
            Assert.assertTrue(isBlocked.isDone());
            Assert.assertTrue(isBlocked2.isDone());
            streamingExchangeClientBuffer.close();
        } catch (Throwable th) {
            try {
                streamingExchangeClientBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testRemoteTaskFailedError() {
        StreamingExchangeClientBuffer streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            streamingExchangeClientBuffer.addTask(TASK_0);
            streamingExchangeClientBuffer.taskFailed(TASK_0, new TrinoException(StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            streamingExchangeClientBuffer.noMoreTasks();
            Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
            Assert.assertFalse(streamingExchangeClientBuffer.isFailed());
            Assert.assertNull(streamingExchangeClientBuffer.pollPage());
            streamingExchangeClientBuffer.close();
            streamingExchangeClientBuffer = new StreamingExchangeClientBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
            try {
                streamingExchangeClientBuffer.addTask(TASK_0);
                streamingExchangeClientBuffer.noMoreTasks();
                streamingExchangeClientBuffer.taskFailed(TASK_0, new TrinoException(StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
                Assert.assertFalse(streamingExchangeClientBuffer.isFinished());
                Assert.assertFalse(streamingExchangeClientBuffer.isFailed());
                Assert.assertNull(streamingExchangeClientBuffer.pollPage());
                streamingExchangeClientBuffer.close();
            } finally {
            }
        } finally {
        }
    }
}
