package io.prestosql.spiller;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.OutputStreamSliceOutput;
import io.prestosql.execution.buffer.PagesSerde;
import io.prestosql.execution.buffer.PagesSerdeUtil;
import io.prestosql.execution.buffer.SerializedPage;
import io.prestosql.memory.context.LocalMemoryContext;
import io.prestosql.operator.SpillContext;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.StandardErrorCode;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:io/prestosql/spiller/FileSingleStreamSpiller.class */
public class FileSingleStreamSpiller implements SingleStreamSpiller {

    @VisibleForTesting
    static final int BUFFER_SIZE = 4096;
    private final FileHolder targetFile;
    private final PagesSerde serde;
    private final SpillerStats spillerStats;
    private final SpillContext localSpillContext;
    private final LocalMemoryContext memoryContext;
    private final ListeningExecutorService executor;
    private long spilledPagesInMemorySize;
    private final Closer closer = Closer.create();
    private boolean writable = true;
    private ListenableFuture<?> spillInProgress = Futures.immediateFuture((Object) null);

    public FileSingleStreamSpiller(PagesSerde pagesSerde, ListeningExecutorService listeningExecutorService, Path path, SpillerStats spillerStats, SpillContext spillContext, LocalMemoryContext localMemoryContext, Optional<SpillCipher> optional) {
        this.serde = (PagesSerde) Objects.requireNonNull(pagesSerde, "serde is null");
        this.executor = (ListeningExecutorService) Objects.requireNonNull(listeningExecutorService, "executor is null");
        this.spillerStats = (SpillerStats) Objects.requireNonNull(spillerStats, "spillerStats is null");
        this.localSpillContext = spillContext.newLocalSpillContext();
        this.memoryContext = (LocalMemoryContext) Objects.requireNonNull(localMemoryContext, "memoryContext is null");
        if (((Optional) Objects.requireNonNull(optional, "spillCipher is null")).isPresent()) {
            Closer closer = this.closer;
            SpillCipher spillCipher = optional.get();
            Objects.requireNonNull(spillCipher);
            closer.register(spillCipher::close);
        }
        this.memoryContext.setBytes(4096L);
        try {
            this.targetFile = (FileHolder) this.closer.register(new FileHolder(Files.createTempFile(path, "spill", ".bin", new FileAttribute[0])));
        } catch (IOException e) {
            throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to create spill file", e);
        }
    }

    @Override // io.prestosql.spiller.SingleStreamSpiller
    public ListenableFuture<?> spill(Iterator<Page> it) {
        Objects.requireNonNull(it, "pageIterator is null");
        checkNoSpillInProgress();
        this.spillInProgress = this.executor.submit(() -> {
            writePages(it);
        });
        return this.spillInProgress;
    }

    @Override // io.prestosql.spiller.SingleStreamSpiller
    public long getSpilledPagesInMemorySize() {
        return this.spilledPagesInMemorySize;
    }

    @Override // io.prestosql.spiller.SingleStreamSpiller
    public Iterator<Page> getSpilledPages() {
        checkNoSpillInProgress();
        return readPages();
    }

    @Override // io.prestosql.spiller.SingleStreamSpiller
    public ListenableFuture<List<Page>> getAllSpilledPages() {
        return this.executor.submit(() -> {
            return ImmutableList.copyOf(getSpilledPages());
        });
    }

    private void writePages(Iterator<Page> it) {
        Preconditions.checkState(this.writable, "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent");
        try {
            OutputStreamSliceOutput outputStreamSliceOutput = new OutputStreamSliceOutput(this.targetFile.newOutputStream(StandardOpenOption.APPEND), BUFFER_SIZE);
            while (it.hasNext()) {
                try {
                    Page next = it.next();
                    this.spilledPagesInMemorySize += next.getSizeInBytes();
                    SerializedPage serialize = this.serde.serialize(next);
                    long sizeInBytes = serialize.getSizeInBytes();
                    this.localSpillContext.updateBytes(sizeInBytes);
                    this.spillerStats.addToTotalSpilledBytes(sizeInBytes);
                    PagesSerdeUtil.writeSerializedPage(outputStreamSliceOutput, serialize);
                } finally {
                }
            }
            outputStreamSliceOutput.close();
        } catch (IOException | UncheckedIOException e) {
            throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to spill pages", e);
        }
    }

    private Iterator<Page> readPages() {
        Preconditions.checkState(this.writable, "Repeated reads are disallowed to prevent potential resource leaks");
        this.writable = false;
        try {
            InputStream inputStream = (InputStream) this.closer.register(this.targetFile.newInputStream(new OpenOption[0]));
            return closeWhenExhausted(PagesSerdeUtil.readPages(this.serde, new InputStreamSliceInput(inputStream, BUFFER_SIZE)), inputStream);
        } catch (IOException e) {
            throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", e);
        }
    }

    @Override // io.prestosql.spiller.SingleStreamSpiller, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closer.register(this.localSpillContext);
        this.closer.register(() -> {
            this.memoryContext.setBytes(0L);
        });
        try {
            this.closer.close();
        } catch (IOException e) {
            throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to close spiller", e);
        }
    }

    private void checkNoSpillInProgress() {
        Preconditions.checkState(this.spillInProgress.isDone(), "spill in progress");
    }

    private static <T> Iterator<T> closeWhenExhausted(final Iterator<T> it, final Closeable closeable) {
        Objects.requireNonNull(it, "iterator is null");
        Objects.requireNonNull(closeable, "resource is null");
        return new AbstractIterator<T>() { // from class: io.prestosql.spiller.FileSingleStreamSpiller.1
            protected T computeNext() {
                if (it.hasNext()) {
                    return (T) it.next();
                }
                try {
                    closeable.close();
                    return (T) endOfData();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        };
    }
}
