package com.linkedin.davinci.store.rocksdb;

import com.linkedin.venice.exceptions.VeniceChecksumException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.validation.checksum.CheckSum;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.LatencyUtils;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.EnvOptions;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
import org.rocksdb.SstFileWriter;

/* loaded from: input_file:com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriter.class */
public class RocksDBSstFileWriter {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) RocksDBSstFileWriter.class);
    private static final ThreadLocal<ReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(() -> {
        return new ReusableObjects();
    });
    protected static final String ROCKSDB_LAST_FINISHED_SST_FILE_NO = "rocksdb_last_finished_sst_file_no";
    protected static final String ROCKSDB_LAST_FINISHED_RMD_SST_FILE_NO = "rocksdb_last_finished_rmd_sst_file_no";
    protected static final int REPLICATION_METADATA_COLUMN_FAMILY_INDEX = 1;
    private SstFileWriter currentSSTFileWriter;
    private String fullPathForTempSSTFileDir;
    private Optional<Supplier<byte[]>> expectedChecksumSupplier;
    private final String storeName;
    private final int partitionId;
    private final EnvOptions envOptions;
    private final Options options;
    private final boolean isRMD;
    private final RocksDBServerConfig rocksDBServerConfig;
    private final String lastCheckPointedSSTFileNum;
    private int lastFinishedSSTFileNo = -1;
    private int currentSSTFileNo = 0;
    private long recordNumInCurrentSSTFile = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriter$ReusableObjects.class */
    public static class ReusableObjects {
        public ByteBuffer directKeyBuffer = ByteBuffer.allocateDirect(1048576);
        public ByteBuffer directValueBuffer = ByteBuffer.allocateDirect(1048704);
    }

    public RocksDBSstFileWriter(String str, int i, String str2, EnvOptions envOptions, Options options, String str3, boolean z, RocksDBServerConfig rocksDBServerConfig) {
        this.storeName = str;
        this.partitionId = i;
        this.envOptions = envOptions;
        this.options = options;
        this.fullPathForTempSSTFileDir = str3;
        this.isRMD = z;
        this.lastCheckPointedSSTFileNum = z ? ROCKSDB_LAST_FINISHED_RMD_SST_FILE_NO : ROCKSDB_LAST_FINISHED_SST_FILE_NO;
        this.rocksDBServerConfig = rocksDBServerConfig;
    }

    public void put(byte[] bArr, ByteBuffer byteBuffer) throws RocksDBException {
        if (this.currentSSTFileWriter == null) {
            throw new VeniceException("currentSSTFileWriter is null for store: " + this.storeName + ", partition id: " + this.partitionId + ", 'beginBatchWrite' should be invoked before any write");
        }
        if (this.rocksDBServerConfig.isPutReuseByteBufferEnabled()) {
            ReusableObjects reusableObjects = threadLocalReusableObjects.get();
            reusableObjects.directKeyBuffer.clear();
            if (bArr.length > reusableObjects.directKeyBuffer.capacity()) {
                reusableObjects.directKeyBuffer = ByteBuffer.allocateDirect(bArr.length);
            }
            reusableObjects.directKeyBuffer.put(bArr);
            reusableObjects.directKeyBuffer.flip();
            reusableObjects.directValueBuffer.clear();
            if (byteBuffer.remaining() > reusableObjects.directValueBuffer.capacity()) {
                reusableObjects.directValueBuffer = ByteBuffer.allocateDirect(byteBuffer.remaining());
            }
            byteBuffer.mark();
            reusableObjects.directValueBuffer.put(byteBuffer);
            byteBuffer.reset();
            reusableObjects.directValueBuffer.flip();
            this.currentSSTFileWriter.put(reusableObjects.directKeyBuffer, reusableObjects.directValueBuffer);
        } else {
            this.currentSSTFileWriter.put(bArr, ByteUtils.extractByteArray(byteBuffer));
        }
        this.recordNumInCurrentSSTFile++;
    }

    public void open(Map<String, String> map, Optional<Supplier<byte[]>> optional) {
        LOGGER.info("'beginBatchWrite' got invoked for RocksDB store: {}, partition: {} with checkpointed info: {} ", this.storeName, Integer.valueOf(this.partitionId), map);
        File file = new File(this.fullPathForTempSSTFileDir);
        if (!file.exists()) {
            file.mkdirs();
        }
        if (map.containsKey(this.lastCheckPointedSSTFileNum)) {
            this.lastFinishedSSTFileNo = Integer.parseInt(map.get(this.lastCheckPointedSSTFileNum));
            LOGGER.info("Received last finished sst file no: {} for store: {}, partition id: {}", Integer.valueOf(this.lastFinishedSSTFileNo), this.storeName, Integer.valueOf(this.partitionId));
            if (this.lastFinishedSSTFileNo < 0) {
                throw new VeniceException("Last finished sst file no: " + this.lastFinishedSSTFileNo + " shouldn't be negative");
            }
            makeSureAllPreviousSSTFilesBeforeCheckpointingExist();
            removeSSTFilesAfterCheckpointing();
            this.currentSSTFileNo = this.lastFinishedSSTFileNo + 1;
        } else {
            LOGGER.info("No checkpointed info for store: {}, partition id: {} so RocksDB will start building sst file from beginning", this.storeName, Integer.valueOf(this.partitionId));
            this.lastFinishedSSTFileNo = -1;
            this.currentSSTFileNo = 0;
        }
        String composeFullPathForSSTFile = composeFullPathForSSTFile(this.currentSSTFileNo);
        this.currentSSTFileWriter = new SstFileWriter(this.envOptions, this.options);
        try {
            this.currentSSTFileWriter.open(composeFullPathForSSTFile);
            this.recordNumInCurrentSSTFile = 0L;
            this.expectedChecksumSupplier = optional;
        } catch (RocksDBException e) {
            throw new VeniceException("Failed to open file: " + composeFullPathForSSTFile + " with SstFileWriter");
        }
    }

    public void close() {
        if (this.currentSSTFileWriter != null) {
            this.currentSSTFileWriter.close();
        }
    }

    public Map<String, String> sync() {
        try {
            if (this.recordNumInCurrentSSTFile > 0) {
                this.currentSSTFileWriter.finish();
                this.lastFinishedSSTFileNo = this.currentSSTFileNo;
                this.currentSSTFileNo++;
                String composeFullPathForSSTFile = composeFullPathForSSTFile(this.lastFinishedSSTFileNo);
                String composeFullPathForSSTFile2 = composeFullPathForSSTFile(this.currentSSTFileNo);
                this.currentSSTFileWriter.open(composeFullPathForSSTFile2);
                LOGGER.info("Sync gets invoked for store: {}, partition id: {}, last finished sst file: {} current sst file: {}", this.storeName, Integer.valueOf(this.partitionId), composeFullPathForSSTFile, composeFullPathForSSTFile2);
                long j = this.recordNumInCurrentSSTFile;
                this.recordNumInCurrentSSTFile = 0L;
                if (!this.isRMD && this.expectedChecksumSupplier.isPresent()) {
                    byte[] bArr = this.expectedChecksumSupplier.get().get();
                    long currentTimeMillis = System.currentTimeMillis();
                    if (!verifyChecksum(composeFullPathForSSTFile, j, bArr)) {
                        throw new VeniceChecksumException("verifyChecksum: failure. last sstFile checksum didn't match for store: " + this.storeName + ", partition: " + this.partitionId + ", sstFile: " + composeFullPathForSSTFile + ", records: " + j + ", latency(ms): " + LatencyUtils.getElapsedTimeInMs(currentTimeMillis));
                    }
                }
            } else {
                LOGGER.warn("Sync gets invoked for store: {}, partition id: {}, but the last sst file: {} is empty", this.storeName, Integer.valueOf(this.partitionId), composeFullPathForSSTFile(this.currentSSTFileNo));
            }
            HashMap hashMap = new HashMap();
            if (this.lastFinishedSSTFileNo >= 0) {
                hashMap.put(this.lastCheckPointedSSTFileNum, Integer.toString(this.lastFinishedSSTFileNo));
            }
            return hashMap;
        } catch (RocksDBException e) {
            throw new VeniceException("Failed to sync SstFileWriter", e);
        }
    }

    private void removeSSTFilesAfterCheckpointing() {
        File file = new File(this.fullPathForTempSSTFileDir);
        String[] list = file.list((file2, str) -> {
            return RocksDBUtils.isTempSSTFile(str);
        });
        if (list == null) {
            throw new VeniceException("Failed to list sst files in " + file.getAbsolutePath());
        }
        for (String str2 : list) {
            if (RocksDBUtils.extractTempSSTFileNo(str2) > this.lastFinishedSSTFileNo) {
                String str3 = this.fullPathForTempSSTFileDir + File.separator + str2;
                if (!new File(str3).delete()) {
                    throw new VeniceException("Failed to delete file: " + str3);
                }
            }
        }
    }

    private void makeSureAllPreviousSSTFilesBeforeCheckpointingExist() {
        if (this.lastFinishedSSTFileNo < 0) {
            LOGGER.info("Since last finished sst file no is negative, there is nothing to verify");
            return;
        }
        for (int i = 0; i <= this.lastFinishedSSTFileNo; i++) {
            String composeFullPathForSSTFile = composeFullPathForSSTFile(i);
            if (!new File(composeFullPathForSSTFile).exists()) {
                throw new VeniceException("SST File: " + composeFullPathForSSTFile + " doesn't exist, but last finished sst file no is: " + this.lastFinishedSSTFileNo);
            }
        }
    }

    private String composeFullPathForSSTFile(int i) {
        return this.fullPathForTempSSTFileDir + File.separator + RocksDBUtils.composeTempSSTFileName(i);
    }

    private boolean verifyChecksum(String str, long j, byte[] bArr) {
        AutoCloseable autoCloseable = null;
        AutoCloseable autoCloseable2 = null;
        try {
            try {
                SstFileReader sstFileReader = new SstFileReader(this.options);
                sstFileReader.open(str);
                ReadOptions readOptions = new ReadOptions();
                readOptions.setVerifyChecksums(false);
                readOptions.setFillCache(false);
                long numEntries = sstFileReader.getTableProperties().getNumEntries();
                if (numEntries != j) {
                    LOGGER.error("verifyChecksum: failure. SSTFile record count does not match expected: {} actual: {}", Long.valueOf(j), Long.valueOf(numEntries));
                    if (0 != 0) {
                        autoCloseable2.close();
                    }
                    if (sstFileReader != null) {
                        sstFileReader.close();
                    }
                    return false;
                }
                long j2 = 0;
                Optional<CheckSum> checkSum = CheckSum.getInstance(CheckSumType.MD5);
                SstFileReaderIterator newIterator = sstFileReader.newIterator(readOptions);
                newIterator.seekToFirst();
                while (newIterator.isValid()) {
                    checkSum.get().update(newIterator.key());
                    checkSum.get().update(newIterator.value());
                    newIterator.next();
                    j2++;
                }
                byte[] checkSum2 = checkSum.get().getCheckSum();
                boolean equals = Arrays.equals(checkSum2, bArr);
                if (!equals) {
                    LOGGER.error("Checksum mismatch in SSTFile. recordCount: {} expectedChecksum: {}, actualChecksum: {}", Long.valueOf(j2), ByteUtils.toHexString(bArr), ByteUtils.toHexString(checkSum2));
                }
                if (newIterator != null) {
                    newIterator.close();
                }
                if (sstFileReader != null) {
                    sstFileReader.close();
                }
                return equals;
            } catch (Exception e) {
                throw new VeniceChecksumException("Checksum mismatch in SST files.", e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                autoCloseable2.close();
            }
            if (0 != 0) {
                autoCloseable.close();
            }
            throw th;
        }
    }

    public boolean validateBatchIngestion() {
        List<String> temporarySSTFilePaths = getTemporarySSTFilePaths();
        if (temporarySSTFilePaths.isEmpty()) {
            return true;
        }
        for (String str : temporarySSTFilePaths) {
            if (new File(str).length() != 0) {
                LOGGER.error("Non-empty sst found when validating batch ingestion: {}", str);
                return false;
            }
        }
        return true;
    }

    public void ingestSSTFiles(RocksDB rocksDB, List<ColumnFamilyHandle> list) {
        List<String> temporarySSTFilePaths = getTemporarySSTFilePaths();
        if (temporarySSTFilePaths.isEmpty()) {
            LOGGER.info("No valid sst file found, so will skip the sst file ingestion for store: {}, partition: {}", this.storeName, Integer.valueOf(this.partitionId));
            return;
        }
        LOGGER.info("Start ingesting to store: " + this.storeName + ", partition id: " + this.partitionId + " from files: " + temporarySSTFilePaths);
        try {
            IngestExternalFileOptions ingestExternalFileOptions = new IngestExternalFileOptions();
            try {
                ingestExternalFileOptions.setMoveFiles(true);
                if (this.isRMD) {
                    rocksDB.ingestExternalFile(list.get(1), temporarySSTFilePaths, ingestExternalFileOptions);
                } else {
                    rocksDB.ingestExternalFile(temporarySSTFilePaths, ingestExternalFileOptions);
                }
                LOGGER.info("Finished ingestion to store: " + this.storeName + ", partition id: " + this.partitionId + " from files: " + temporarySSTFilePaths);
                ingestExternalFileOptions.close();
            } finally {
            }
        } catch (RocksDBException e) {
            throw new VeniceException("Received exception during RocksDB#ingestExternalFile", e);
        }
    }

    private List<String> getTemporarySSTFilePaths() {
        File file = new File(this.fullPathForTempSSTFileDir);
        String[] list = file.list((file2, str) -> {
            return RocksDBUtils.isTempSSTFile(str) && new File(file2, str).length() > 0;
        });
        ArrayList arrayList = new ArrayList();
        if (list == null) {
            return arrayList;
        }
        for (String str2 : list) {
            arrayList.add(file + File.separator + str2);
        }
        return arrayList;
    }
}
