package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.ssl.SslHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.DiskOptimizationStrategy;
import org.apache.cassandra.net.SharedDefaultFileRegion;
import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.utils.memory.BufferPool;
import org.apache.cassandra.utils.memory.BufferPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/net/AsyncStreamingOutputPlus.class */
public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus implements StreamingDataOutputPlus {
    private static final Logger logger = LoggerFactory.getLogger(AsyncStreamingOutputPlus.class);
    private final BufferPool bufferPool;
    final int defaultLowWaterMark;
    final int defaultHighWaterMark;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.cassandra.net.AsyncStreamingOutputPlus$1Holder, reason: invalid class name */
    /* loaded from: input_file:org/apache/cassandra/net/AsyncStreamingOutputPlus$1Holder.class */
    public class C1Holder {
        ChannelPromise promise;
        ByteBuffer buffer;

        C1Holder() {
        }
    }

    public AsyncStreamingOutputPlus(Channel channel) {
        super(channel);
        this.bufferPool = BufferPools.forNetworking();
        WriteBufferWaterMark writeBufferWaterMark = channel.config().getWriteBufferWaterMark();
        this.defaultLowWaterMark = writeBufferWaterMark.low();
        this.defaultHighWaterMark = writeBufferWaterMark.high();
        allocateBuffer();
    }

    private void allocateBuffer() {
        this.buffer = this.bufferPool.getAtLeast(8192, BufferType.OFF_HEAP);
    }

    @Override // org.apache.cassandra.net.AsyncChannelOutputPlus, org.apache.cassandra.io.util.BufferedDataOutputStreamPlus
    protected void doFlush(int i) throws IOException {
        if (!this.channel.isOpen()) {
            throw new ClosedChannelException();
        }
        ByteBuffer byteBuffer = this.buffer;
        if (byteBuffer.position() == 0) {
            return;
        }
        byteBuffer.flip();
        this.channel.writeAndFlush(GlobalBufferPoolAllocator.wrap(byteBuffer), beginFlush(byteBuffer.limit(), 0L, 2147483647L));
        allocateBuffer();
    }

    @Override // org.apache.cassandra.net.AsyncChannelOutputPlus, org.apache.cassandra.io.util.DataOutputPlus
    public long position() {
        return flushed() + this.buffer.position();
    }

    @Override // org.apache.cassandra.streaming.StreamingDataOutputPlus
    public int writeToChannel(StreamingDataOutputPlus.Write write, StreamingDataOutputPlus.RateLimiter rateLimiter) throws IOException {
        doFlush(0);
        C1Holder c1Holder = new C1Holder();
        try {
            write.write(i -> {
                if (c1Holder.buffer != null) {
                    throw new IllegalStateException("Can only allocate one ByteBuffer");
                }
                rateLimiter.acquire(i);
                c1Holder.promise = beginFlush(i, this.defaultLowWaterMark, this.defaultHighWaterMark);
                c1Holder.buffer = this.bufferPool.get(i, BufferType.OFF_HEAP);
                return c1Holder.buffer;
            });
            ByteBuffer byteBuffer = c1Holder.buffer;
            this.bufferPool.putUnusedPortion(byteBuffer);
            int limit = byteBuffer.limit();
            this.channel.writeAndFlush(GlobalBufferPoolAllocator.wrap(byteBuffer), c1Holder.promise);
            return limit;
        } catch (Throwable th) {
            if (c1Holder.buffer != null) {
                this.bufferPool.put(c1Holder.buffer);
            }
            if (c1Holder.promise != null) {
                c1Holder.promise.tryFailure(th);
            }
            throw th;
        }
    }

    @Override // org.apache.cassandra.streaming.StreamingDataOutputPlus
    public long writeFileToChannel(FileChannel fileChannel, StreamingDataOutputPlus.RateLimiter rateLimiter) throws IOException {
        return this.channel.pipeline().get(SslHandler.class) != null ? writeFileToChannel(fileChannel, rateLimiter, DiskOptimizationStrategy.MAX_BUFFER_SIZE) : writeFileToChannelZeroCopy(fileChannel, rateLimiter, 1048576, 1048576, 2097152);
    }

    @VisibleForTesting
    long writeFileToChannel(FileChannel fileChannel, StreamingDataOutputPlus.RateLimiter rateLimiter, int i) throws IOException {
        long size = fileChannel.size();
        long j = 0;
        while (j < size) {
            try {
                int min = (int) Math.min(i, size - j);
                long j2 = j;
                writeToChannel(bufferSupplier -> {
                    ByteBuffer byteBuffer = bufferSupplier.get(min);
                    long read = fileChannel.read(byteBuffer, j2);
                    if (read != min) {
                        throw new IOException(String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", Long.valueOf(read), Integer.valueOf(min)));
                    }
                    byteBuffer.flip();
                }, rateLimiter);
                if (logger.isTraceEnabled()) {
                    logger.trace("Writing {} bytes at position {} of {}", new Object[]{Integer.valueOf(min), Long.valueOf(j), Long.valueOf(size)});
                }
                j += min;
            } finally {
                fileChannel.close();
            }
        }
        return j;
    }

    @VisibleForTesting
    long writeFileToChannelZeroCopy(FileChannel fileChannel, StreamingDataOutputPlus.RateLimiter rateLimiter, int i, int i2, int i3) throws IOException {
        return !rateLimiter.isRateLimited() ? writeFileToChannelZeroCopyUnthrottled(fileChannel) : writeFileToChannelZeroCopyThrottled(fileChannel, rateLimiter, i, i2, i3);
    }

    private long writeFileToChannelZeroCopyUnthrottled(FileChannel fileChannel) throws IOException {
        long size = fileChannel.size();
        if (logger.isTraceEnabled()) {
            logger.trace("Writing {} bytes", Long.valueOf(size));
        }
        ChannelPromise beginFlush = beginFlush(size, 0L, size);
        this.channel.writeAndFlush(new DefaultFileRegion(fileChannel, 0L, size), beginFlush);
        return size;
    }

    private long writeFileToChannelZeroCopyThrottled(FileChannel fileChannel, StreamingDataOutputPlus.RateLimiter rateLimiter, int i, int i2, int i3) throws IOException {
        long size = fileChannel.size();
        long j = 0;
        SharedDefaultFileRegion.SharedFileChannel share = SharedDefaultFileRegion.share(fileChannel);
        while (j < size) {
            try {
                int min = (int) Math.min(i, size - j);
                rateLimiter.acquire(min);
                this.channel.writeAndFlush(new SharedDefaultFileRegion(share, j, min), beginFlush(min, i2, i3));
                if (logger.isTraceEnabled()) {
                    logger.trace("Writing {} bytes at position {} of {}", new Object[]{Integer.valueOf(min), Long.valueOf(j), Long.valueOf(size)});
                }
                j += min;
            } finally {
                share.release();
            }
        }
        return j;
    }

    @Override // org.apache.cassandra.net.AsyncChannelOutputPlus
    public void discard() {
        if (this.buffer != null) {
            this.bufferPool.put(this.buffer);
            this.buffer = null;
        }
    }
}
