package io.trino.operator;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.TaskId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:io/trino/operator/DeduplicationExchangeClientBuffer.class */
public class DeduplicationExchangeClientBuffer implements ExchangeClientBuffer {
    private static final Logger log = Logger.get(DeduplicationExchangeClientBuffer.class);
    private final Executor executor;
    private final long bufferCapacityInBytes;
    private final RetryPolicy retryPolicy;

    @GuardedBy("this")
    private boolean noMoreTasks;

    @GuardedBy("this")
    private boolean inputFinished;

    @GuardedBy("this")
    private Throwable failure;

    @GuardedBy("this")
    private Iterator<Slice> pagesIterator;

    @GuardedBy("this")
    private volatile long bufferRetainedSizeInBytes;

    @GuardedBy("this")
    private volatile long maxBufferRetainedSizeInBytes;

    @GuardedBy("this")
    private int maxAttemptId;

    @GuardedBy("this")
    private boolean closed;
    private final SettableFuture<Void> blocked = SettableFuture.create();

    @GuardedBy("this")
    private final Set<TaskId> allTasks = new HashSet();

    @GuardedBy("this")
    private final Set<TaskId> successfulTasks = new HashSet();

    @GuardedBy("this")
    private final Map<TaskId, Throwable> failedTasks = new HashMap();

    @GuardedBy("this")
    private final ListMultimap<TaskId, Slice> pageBuffer = LinkedListMultimap.create();

    public DeduplicationExchangeClientBuffer(Executor executor, DataSize dataSize, RetryPolicy retryPolicy) {
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        this.bufferCapacityInBytes = ((DataSize) Objects.requireNonNull(dataSize, "bufferCapacity is null")).toBytes();
        Objects.requireNonNull(retryPolicy, "retryPolicy is null");
        Preconditions.checkArgument(retryPolicy == RetryPolicy.QUERY, "retryPolicy is expected to be QUERY: %s", retryPolicy);
        this.retryPolicy = retryPolicy;
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public ListenableFuture<Void> isBlocked() {
        return Futures.nonCancellationPropagating(this.blocked);
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized Slice pollPage() {
        throwIfFailed();
        if (this.closed || !this.inputFinished) {
            return null;
        }
        if (this.pagesIterator == null) {
            this.pagesIterator = this.pageBuffer.values().iterator();
        }
        if (!this.pagesIterator.hasNext()) {
            return null;
        }
        Slice next = this.pagesIterator.next();
        this.pagesIterator.remove();
        this.bufferRetainedSizeInBytes -= next.getRetainedSize();
        return next;
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized void addTask(TaskId taskId) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(!this.noMoreTasks, "no more tasks expected");
        Preconditions.checkState(this.allTasks.add(taskId), "task already registered: %s", taskId);
        if (taskId.getAttemptId() > this.maxAttemptId) {
            this.maxAttemptId = taskId.getAttemptId();
            if (this.retryPolicy == RetryPolicy.QUERY) {
                removePagesForPreviousAttempts(taskId.getAttemptId());
            }
        }
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized void addPages(TaskId taskId, List<Slice> list) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(this.allTasks.contains(taskId), "task is not registered: %s", taskId);
        Preconditions.checkState(!this.successfulTasks.contains(taskId), "task is finished: %s", taskId);
        Preconditions.checkState(!this.failedTasks.containsKey(taskId), "task is failed: %s", taskId);
        if (this.failure != null) {
            return;
        }
        if (this.retryPolicy != RetryPolicy.QUERY || taskId.getAttemptId() >= this.maxAttemptId) {
            long j = 0;
            Iterator<Slice> it = list.iterator();
            while (it.hasNext()) {
                j += it.next().getRetainedSize();
            }
            this.bufferRetainedSizeInBytes += j;
            if (this.bufferRetainedSizeInBytes <= this.bufferCapacityInBytes) {
                this.maxBufferRetainedSizeInBytes = Math.max(this.maxBufferRetainedSizeInBytes, this.bufferRetainedSizeInBytes);
                this.pageBuffer.putAll(taskId, list);
            } else {
                this.failure = new TrinoException(StandardErrorCode.NOT_SUPPORTED, "Retries for queries with large result set currently unsupported");
                this.pageBuffer.clear();
                this.bufferRetainedSizeInBytes = 0L;
                unblock(this.blocked);
            }
        }
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized void taskFinished(TaskId taskId) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(this.allTasks.contains(taskId), "task is not registered: %s", taskId);
        Preconditions.checkState(!this.failedTasks.containsKey(taskId), "task is failed: %s", taskId);
        Preconditions.checkState(this.successfulTasks.add(taskId), "task is finished: %s", taskId);
        if (this.retryPolicy == RetryPolicy.TASK) {
            throw new UnsupportedOperationException("task level retry policy is unsupported");
        }
        checkInputFinished();
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized void taskFailed(TaskId taskId, Throwable th) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(this.allTasks.contains(taskId), "task is not registered: %s", taskId);
        Preconditions.checkState(!this.successfulTasks.contains(taskId), "task is finished: %s", taskId);
        Preconditions.checkState(this.failedTasks.put(taskId, th) == null, "task is already failed: %s", taskId);
        checkInputFinished();
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized void noMoreTasks() {
        if (this.closed) {
            return;
        }
        this.noMoreTasks = true;
        checkInputFinished();
    }

    private synchronized void checkInputFinished() {
        if (this.failure == null && !this.inputFinished && this.noMoreTasks) {
            if (this.allTasks.isEmpty()) {
                this.inputFinished = true;
                unblock(this.blocked);
                return;
            }
            switch (this.retryPolicy) {
                case TASK:
                    throw new UnsupportedOperationException("task level retry policy is unsupported");
                case QUERY:
                    if (this.successfulTasks.containsAll((Set) this.allTasks.stream().filter(taskId -> {
                        return taskId.getAttemptId() == this.maxAttemptId;
                    }).collect(ImmutableSet.toImmutableSet()))) {
                        removePagesForPreviousAttempts(this.maxAttemptId);
                        this.inputFinished = true;
                        unblock(this.blocked);
                        return;
                    }
                    TrinoException trinoException = null;
                    for (Map.Entry<TaskId, Throwable> entry : this.failedTasks.entrySet()) {
                        TaskId key = entry.getKey();
                        TrinoException trinoException2 = (Throwable) entry.getValue();
                        if (key.getAttemptId() == this.maxAttemptId) {
                            if ((trinoException2 instanceof TrinoException) && StandardErrorCode.REMOTE_TASK_FAILED.toErrorCode().equals(trinoException2.getErrorCode())) {
                                log.debug("Task failure discovered while fetching task results: %s", new Object[]{key});
                            } else if (trinoException == null) {
                                trinoException = trinoException2;
                            } else if (trinoException != trinoException2) {
                                trinoException.addSuppressed(trinoException2);
                            }
                        }
                    }
                    if (trinoException != null) {
                        this.pageBuffer.clear();
                        this.bufferRetainedSizeInBytes = 0L;
                        this.failure = trinoException;
                        unblock(this.blocked);
                        return;
                    }
                    return;
                default:
                    throw new UnsupportedOperationException("unexpected retry policy: " + this.retryPolicy);
            }
        }
    }

    private synchronized void removePagesForPreviousAttempts(int i) {
        long j = 0;
        Iterator it = this.pageBuffer.entries().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            if (((TaskId) entry.getKey()).getAttemptId() < i) {
                j += ((Slice) entry.getValue()).getRetainedSize();
                it.remove();
            }
        }
        this.bufferRetainedSizeInBytes -= j;
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized boolean isFinished() {
        return this.failure == null && (this.closed || (this.inputFinished && this.pageBuffer.isEmpty()));
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized boolean isFailed() {
        return this.failure != null;
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public long getRemainingCapacityInBytes() {
        return Math.max(this.bufferCapacityInBytes - this.bufferRetainedSizeInBytes, 0L);
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public long getRetainedSizeInBytes() {
        return this.bufferRetainedSizeInBytes;
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public long getMaxRetainedSizeInBytes() {
        return this.maxBufferRetainedSizeInBytes;
    }

    @Override // io.trino.operator.ExchangeClientBuffer
    public synchronized int getBufferedPageCount() {
        return this.pageBuffer.size();
    }

    @Override // io.trino.operator.ExchangeClientBuffer, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.pageBuffer.clear();
        this.bufferRetainedSizeInBytes = 0L;
        unblock(this.blocked);
    }

    private synchronized void throwIfFailed() {
        if (this.failure != null) {
            Throwables.throwIfUnchecked(this.failure);
            throw new RuntimeException(this.failure);
        }
    }

    private void unblock(SettableFuture<Void> settableFuture) {
        this.executor.execute(() -> {
            settableFuture.set((Object) null);
        });
    }
}
