package org.apache.cassandra.db.streaming;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.memory.BufferPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cassandra-all-4.0.1.jar:org/apache/cassandra/db/streaming/CassandraStreamWriter.class */
public class CassandraStreamWriter {
    private static final int DEFAULT_CHUNK_SIZE = 65536;
    private static final Logger logger;
    protected final SSTableReader sstable;
    private final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
    protected final Collection<SSTableReader.PartitionPositionBounds> sections;
    protected final StreamManager.StreamRateLimiter limiter;
    protected final StreamSession session;
    private final long totalSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    public CassandraStreamWriter(SSTableReader sSTableReader, CassandraStreamHeader cassandraStreamHeader, StreamSession streamSession) {
        this.session = streamSession;
        this.sstable = sSTableReader;
        this.sections = cassandraStreamHeader.sections;
        this.limiter = StreamManager.getRateLimiter(streamSession.peer);
        this.totalSize = cassandraStreamHeader.size();
    }

    public void write(DataOutputStreamPlus dataOutputStreamPlus) throws IOException {
        long j = totalSize();
        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", this.session.planId(), this.sstable.getFilename(), this.session.peer, Long.valueOf(this.sstable.getSSTableMetadata().repairedAt), Long.valueOf(j));
        AsyncStreamingOutputPlus asyncStreamingOutputPlus = (AsyncStreamingOutputPlus) dataOutputStreamPlus;
        ChannelProxy newChannel = this.sstable.getDataChannel().newChannel();
        Throwable th = null;
        try {
            DataIntegrityMetadata.ChecksumValidator checksumValidator = new File(this.sstable.descriptor.filenameFor(Component.CRC)).exists() ? DataIntegrityMetadata.checksumValidator(this.sstable.descriptor) : null;
            Throwable th2 = null;
            try {
                int i = checksumValidator == null ? 65536 : checksumValidator.chunkSize;
                long j2 = 0;
                for (SSTableReader.PartitionPositionBounds partitionPositionBounds : this.sections) {
                    long chunkStart = checksumValidator == null ? partitionPositionBounds.lowerPosition : checksumValidator.chunkStart(partitionPositionBounds.lowerPosition);
                    int i2 = (int) (partitionPositionBounds.lowerPosition - chunkStart);
                    if (checksumValidator != null) {
                        checksumValidator.seek(chunkStart);
                    }
                    long j3 = partitionPositionBounds.upperPosition - chunkStart;
                    long j4 = 0;
                    while (j4 < j3) {
                        long write = write(newChannel, checksumValidator, asyncStreamingOutputPlus, chunkStart, i2, (int) Math.min(i, j3 - j4), i);
                        chunkStart += write;
                        j4 += write;
                        j2 += write - i2;
                        this.session.progress(this.sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, j2, j);
                        i2 = 0;
                    }
                    asyncStreamingOutputPlus.flush();
                }
                logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", this.session.planId(), this.sstable.getFilename(), this.session.peer, FBUtilities.prettyPrintMemory(j2), FBUtilities.prettyPrintMemory(j));
                if (checksumValidator != null) {
                    if (0 != 0) {
                        try {
                            checksumValidator.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        checksumValidator.close();
                    }
                }
                if (newChannel != null) {
                    if (0 == 0) {
                        newChannel.close();
                        return;
                    }
                    try {
                        newChannel.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (checksumValidator != null) {
                    if (0 != 0) {
                        try {
                            checksumValidator.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        checksumValidator.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (newChannel != null) {
                if (0 != 0) {
                    try {
                        newChannel.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    newChannel.close();
                }
            }
            throw th7;
        }
    }

    protected long totalSize() {
        return this.totalSize;
    }

    protected long write(ChannelProxy channelProxy, DataIntegrityMetadata.ChecksumValidator checksumValidator, AsyncStreamingOutputPlus asyncStreamingOutputPlus, long j, int i, int i2, int i3) throws IOException {
        int min = (int) Math.min(i3, channelProxy.size() - j);
        ByteBuffer byteBuffer = BufferPools.forNetworking().get(min, BufferType.OFF_HEAP);
        try {
            int read = channelProxy.read(byteBuffer, j);
            if (!$assertionsDisabled && read != min) {
                throw new AssertionError(String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", Integer.valueOf(read), Integer.valueOf(min)));
            }
            byteBuffer.flip();
            if (checksumValidator != null) {
                checksumValidator.validate(byteBuffer);
                byteBuffer.flip();
            }
            byteBuffer.position(i);
            byteBuffer.limit(i + (i2 - i));
            asyncStreamingOutputPlus.writeToChannel(StreamCompressionSerializer.serialize(this.compressor, byteBuffer, 12), this.limiter);
            BufferPools.forNetworking().put(byteBuffer);
            return i2;
        } catch (Throwable th) {
            BufferPools.forNetworking().put(byteBuffer);
            throw th;
        }
    }

    static {
        $assertionsDisabled = !CassandraStreamWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) CassandraStreamWriter.class);
    }
}
