package com.linkedin.venice.hadoop;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.InputDataInfoProvider;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.exceptions.VeniceInconsistentSchemaException;
import com.linkedin.venice.hadoop.exceptions.VeniceSchemaFieldNotFoundException;
import com.linkedin.venice.schema.vson.VsonAvroSchemaAdapter;
import com.linkedin.venice.schema.vson.VsonSchema;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/DefaultInputDataInfoProvider.class */
public class DefaultInputDataInfoProvider implements InputDataInfoProvider {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) DefaultInputDataInfoProvider.class);
    public static final PathFilter PATH_FILTER = path -> {
        return (path.getName().startsWith("_") || path.getName().startsWith(".")) ? false : true;
    };
    public static final String FILE_KEY_SCHEMA = "key.schema";
    public static final String FILE_VALUE_SCHEMA = "value.schema";
    public static final String KEY_FIELD_PROP = "key.field";
    public static final String VALUE_FIELD_PROP = "value.field";
    public static final String COMPRESSION_DICTIONARY_SAMPLE_SIZE = "compression.dictionary.sample.size";
    public static final int DEFAULT_COMPRESSION_DICTIONARY_SAMPLE_SIZE = 209715200;
    public static final String COMPRESSION_DICTIONARY_SIZE_LIMIT = "compression.dictionary.size.limit";
    public static final String HDFS_OPERATIONS_PARALLEL_THREAD_NUM = "hdfs.operations.parallel.thread.num";
    public static final long INPUT_DATA_SIZE_FACTOR = 2;
    protected final VenicePushJob.StoreSetting storeSetting;
    protected final VenicePushJob.PushJobSetting pushJobSetting;
    protected PushJobZstdConfig pushJobZstdConfig;
    protected final VeniceProperties props;
    protected final Lazy<ExecutorService> hdfsExecutorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultInputDataInfoProvider(VenicePushJob.StoreSetting storeSetting, VenicePushJob.PushJobSetting pushJobSetting, VeniceProperties veniceProperties) {
        this.storeSetting = storeSetting;
        this.pushJobSetting = pushJobSetting;
        this.props = veniceProperties;
        this.hdfsExecutorService = Lazy.of(() -> {
            return Executors.newFixedThreadPool(veniceProperties.getInt(HDFS_OPERATIONS_PARALLEL_THREAD_NUM, 20));
        });
    }

    @Override // com.linkedin.venice.hadoop.InputDataInfoProvider
    public InputDataInfoProvider.InputDataInfo validateInputAndGetInfo(String str) throws Exception {
        long inputLastModificationTime = getInputLastModificationTime(str);
        FileSystem fileSystem = FileSystem.get(new Configuration());
        Path path = new Path(str);
        FileStatus[] listStatus = fileSystem.listStatus(path, PATH_FILTER);
        if (listStatus == null || listStatus.length == 0) {
            throw new RuntimeException("No data found at source path: " + path);
        }
        if (!this.pushJobSetting.isIncrementalPush && !this.pushJobSetting.useMapperToBuildDict && this.storeSetting.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
            LOGGER.info("Zstd compression enabled for {}", this.pushJobSetting.storeName);
            initZstdConfig(listStatus.length);
        }
        PushJobSchemaInfo pushJobSchemaInfo = new PushJobSchemaInfo();
        Map<String, String> metadataFromSequenceFile = getMetadataFromSequenceFile(fileSystem, listStatus[0].getPath(), false);
        if (metadataFromSequenceFile.containsKey("key.schema") && metadataFromSequenceFile.containsKey("value.schema")) {
            pushJobSchemaInfo.setAvro(false);
            pushJobSchemaInfo.setVsonFileKeySchema(metadataFromSequenceFile.get("key.schema"));
            pushJobSchemaInfo.setVsonFileValueSchema(metadataFromSequenceFile.get("value.schema"));
        }
        if (listStatus[0].isDirectory()) {
            throw new VeniceException("Input directory: " + listStatus[0].getPath().getParent().getName() + " should not have sub directory: " + listStatus[0].getPath().getName());
        }
        AtomicLong atomicLong = new AtomicLong(0L);
        if (pushJobSchemaInfo.isAvro()) {
            LOGGER.info("Detected Avro input format.");
            pushJobSchemaInfo.setKeyField(this.props.getString("key.field", "key"));
            pushJobSchemaInfo.setValueField(this.props.getString("value.field", "value"));
            if (this.pushJobSetting.useMapperToBuildDict) {
                pushJobSchemaInfo.setAvroSchema(getAvroFileHeader(fileSystem, listStatus[0].getPath(), false));
            } else {
                pushJobSchemaInfo.setAvroSchema(checkAvroSchemaConsistency(fileSystem, listStatus, atomicLong));
            }
            Schema first = pushJobSchemaInfo.getAvroSchema().getFirst();
            Schema second = pushJobSchemaInfo.getAvroSchema().getSecond();
            pushJobSchemaInfo.setFileSchemaString(first.toString());
            pushJobSchemaInfo.setKeySchemaString(extractAvroSubSchema(second, pushJobSchemaInfo.getKeyField()).toString());
            pushJobSchemaInfo.setValueSchemaString(extractAvroSubSchema(second, pushJobSchemaInfo.getValueField()).toString());
        } else {
            LOGGER.info("Detected Vson input format, will convert to Avro automatically.");
            pushJobSchemaInfo.setKeyField(this.props.getString("key.field", ""));
            pushJobSchemaInfo.setValueField(this.props.getString("value.field", ""));
            if (this.pushJobSetting.useMapperToBuildDict) {
                pushJobSchemaInfo.setVsonSchema(getVsonFileHeader(fileSystem, listStatus[0].getPath(), false));
            } else {
                pushJobSchemaInfo.setVsonSchema(checkVsonSchemaConsistency(fileSystem, listStatus, atomicLong));
            }
            VsonSchema first2 = StringUtils.isEmpty(pushJobSchemaInfo.getKeyField()) ? pushJobSchemaInfo.getVsonSchema().getFirst() : pushJobSchemaInfo.getVsonSchema().getFirst().recordSubtype(pushJobSchemaInfo.getKeyField());
            VsonSchema second2 = StringUtils.isEmpty(pushJobSchemaInfo.getValueField()) ? pushJobSchemaInfo.getVsonSchema().getSecond() : pushJobSchemaInfo.getVsonSchema().getSecond().recordSubtype(pushJobSchemaInfo.getValueField());
            pushJobSchemaInfo.setKeySchemaString(VsonAvroSchemaAdapter.parse(first2.toString()).toString());
            pushJobSchemaInfo.setValueSchemaString(VsonAvroSchemaAdapter.parse(second2.toString()).toString());
        }
        return new InputDataInfoProvider.InputDataInfo(pushJobSchemaInfo, atomicLong.get() * 2, listStatus.length, hasRecords(pushJobSchemaInfo.isAvro(), fileSystem, listStatus), inputLastModificationTime, !this.pushJobSetting.useMapperToBuildDict);
    }

    private boolean hasRecords(boolean z, FileSystem fileSystem, FileStatus[] fileStatusArr) {
        for (FileStatus fileStatus : fileStatusArr) {
            if ((z ? getVeniceAvroRecordReader(fileSystem, fileStatus.getPath()) : getVeniceVsonRecordReader(fileSystem, fileStatus.getPath())).iterator().hasNext()) {
                return true;
            }
        }
        return false;
    }

    @Override // com.linkedin.venice.hadoop.InputDataInfoProvider
    public void initZstdConfig(int i) {
        if (this.pushJobZstdConfig != null) {
            return;
        }
        this.pushJobZstdConfig = new PushJobZstdConfig(this.props, i);
    }

    private Pair<VsonSchema, VsonSchema> checkVsonSchemaConsistency(FileSystem fileSystem, FileStatus[] fileStatusArr, AtomicLong atomicLong) {
        Pair<VsonSchema, VsonSchema> vsonFileHeader = getVsonFileHeader(fileSystem, fileStatusArr[0].getPath(), false);
        parallelExecuteHDFSOperation(fileStatusArr, "checkVsonSchemaConsistency", fileStatus -> {
            if (fileStatus.isDirectory()) {
                throw new VeniceException("Input directory: " + fileStatus.getPath().getParent().getName() + " should not have sub directory: " + fileStatus.getPath().getName());
            }
            atomicLong.addAndGet(fileStatus.getLen());
            Pair<VsonSchema, VsonSchema> vsonFileHeader2 = getVsonFileHeader(fileSystem, fileStatus.getPath(), true);
            if (!((VsonSchema) vsonFileHeader.getFirst()).equals(vsonFileHeader2.getFirst()) || !((VsonSchema) vsonFileHeader.getSecond()).equals(vsonFileHeader2.getSecond())) {
                throw new VeniceInconsistentSchemaException(String.format("Inconsistent file Vson schema found. File: %s.\n Expected key schema: %s.\nExpected value schema: %s.\n File key schema: %s.\n File value schema: %s.", fileStatus.getPath().getName(), vsonFileHeader.getFirst(), vsonFileHeader.getSecond(), vsonFileHeader2.getFirst(), vsonFileHeader2.getSecond()));
            }
        });
        return vsonFileHeader;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Pair<VsonSchema, VsonSchema> getVsonFileHeader(FileSystem fileSystem, Path path, boolean z) {
        Map<String, String> metadataFromSequenceFile = getMetadataFromSequenceFile(fileSystem, path, z);
        if (metadataFromSequenceFile.containsKey("key.schema") && metadataFromSequenceFile.containsKey("value.schema")) {
            return new Pair<>(VsonSchema.parse(metadataFromSequenceFile.get("key.schema")), VsonSchema.parse(metadataFromSequenceFile.get("value.schema")));
        }
        throw new VeniceException("Can't find Vson schema from file: " + path.getName());
    }

    private void parallelExecuteHDFSOperation(FileStatus[] fileStatusArr, String str, Consumer<FileStatus> consumer) {
        ExecutorService executorService = this.hdfsExecutorService.get();
        if (executorService.isShutdown()) {
            throw new VeniceException("Unable to execute HDFS operations in parallel, the executor has already been shutdown");
        }
        int length = fileStatusArr.length;
        CompletableFuture[] completableFutureArr = new CompletableFuture[length];
        for (int i = 0; i < length; i++) {
            int i2 = i;
            completableFutureArr[i] = CompletableFuture.runAsync(() -> {
                consumer.accept(fileStatusArr[i2]);
            }, executorService);
        }
        try {
            CompletableFuture.allOf(completableFutureArr).get();
        } catch (Exception e) {
            if (!(e.getCause() instanceof VeniceException)) {
                throw new VeniceException("Failed to execute " + str + " in parallel", e);
            }
            throw ((VeniceException) e.getCause());
        }
    }

    private Map<String, String> getMetadataFromSequenceFile(FileSystem fileSystem, Path path, boolean z) {
        LOGGER.debug("path:{}", path.toUri().getPath());
        VeniceVsonRecordReader veniceVsonRecordReader = getVeniceVsonRecordReader(fileSystem, path);
        if (z && !this.pushJobSetting.isIncrementalPush) {
            if (this.pushJobSetting.useMapperToBuildDict) {
                InputDataInfoProvider.loadZstdTrainingSamples(veniceVsonRecordReader, this.pushJobZstdConfig);
            } else if (this.storeSetting.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
                InputDataInfoProvider.loadZstdTrainingSamples(veniceVsonRecordReader, this.pushJobZstdConfig);
            }
        }
        return veniceVsonRecordReader.getMetadataMap();
    }

    private VeniceVsonRecordReader getVeniceVsonRecordReader(FileSystem fileSystem, Path path) {
        return new VeniceVsonRecordReader((String) null, this.props.getString("key.field", ""), this.props.getString("value.field", ""), fileSystem, path);
    }

    @Override // com.linkedin.venice.hadoop.InputDataInfoProvider
    public byte[] getZstdDictTrainSamples() {
        return this.pushJobZstdConfig.getZstdDictTrainer().trainSamples();
    }

    @Override // com.linkedin.venice.hadoop.InputDataInfoProvider
    public Schema extractAvroSubSchema(Schema schema, String str) {
        Schema.Field field = schema.getField(str);
        if (field == null) {
            throw new VeniceSchemaFieldNotFoundException(str, "Could not find field: " + str + " from " + schema.toString());
        }
        return field.schema();
    }

    @Override // com.linkedin.venice.hadoop.InputDataInfoProvider
    public long getInputLastModificationTime(String str) throws IOException {
        FileSystem fileSystem = FileSystem.get(new Configuration());
        Path path = new Path(str);
        try {
            return fileSystem.getFileStatus(path).getModificationTime();
        } catch (FileNotFoundException e) {
            throw new RuntimeException("No data found at source path: " + path);
        }
    }

    private Pair<Schema, Schema> checkAvroSchemaConsistency(FileSystem fileSystem, FileStatus[] fileStatusArr, AtomicLong atomicLong) {
        Pair<Schema, Schema> avroFileHeader = getAvroFileHeader(fileSystem, fileStatusArr[0].getPath(), false);
        parallelExecuteHDFSOperation(fileStatusArr, "checkAvroSchemaConsistency", fileStatus -> {
            if (fileStatus.isDirectory()) {
                throw new VeniceException("Input directory: " + fileStatus.getPath().getParent().getName() + " should not have sub directory: " + fileStatus.getPath().getName());
            }
            atomicLong.addAndGet(fileStatus.getLen());
            Pair<Schema, Schema> avroFileHeader2 = getAvroFileHeader(fileSystem, fileStatus.getPath(), true);
            if (!avroFileHeader.equals(avroFileHeader2)) {
                throw new VeniceInconsistentSchemaException(String.format("Inconsistent file Avro schema found. File: %s.\n Expected file schema: %s.\n Real File schema: %s.", fileStatus.getPath().getName(), avroFileHeader, avroFileHeader2));
            }
        });
        return avroFileHeader;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Pair<Schema, Schema> getAvroFileHeader(FileSystem fileSystem, Path path, boolean z) {
        LOGGER.debug("path:{}", path.toUri().getPath());
        VeniceAvroRecordReader veniceAvroRecordReader = getVeniceAvroRecordReader(fileSystem, path);
        if (z && !this.pushJobSetting.isIncrementalPush) {
            if (this.pushJobSetting.useMapperToBuildDict) {
                InputDataInfoProvider.loadZstdTrainingSamples(veniceAvroRecordReader, this.pushJobZstdConfig);
            } else if (this.storeSetting.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT) {
                InputDataInfoProvider.loadZstdTrainingSamples(veniceAvroRecordReader, this.pushJobZstdConfig);
            }
        }
        return new Pair<>(veniceAvroRecordReader.getFileSchema(), veniceAvroRecordReader.getStoreSchema());
    }

    private VeniceAvroRecordReader getVeniceAvroRecordReader(FileSystem fileSystem, Path path) {
        return new VeniceAvroRecordReader(null, this.props.getString("key.field", "key"), this.props.getString("value.field", "value"), fileSystem, path, this.pushJobSetting.etlValueSchemaTransformation);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        shutdownHdfsExecutorService();
    }

    private void shutdownHdfsExecutorService() {
        if (!this.hdfsExecutorService.isPresent()) {
            LOGGER.warn("No HDFS executor service to shutdown");
            return;
        }
        ExecutorService executorService = this.hdfsExecutorService.get();
        executorService.shutdownNow();
        try {
            if (!executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOGGER.warn("Unable to shutdown the executor service used for HDFS operations. The job may hang with leaked resources.");
            }
        } catch (InterruptedException e) {
            LOGGER.error(e);
            Thread.currentThread().interrupt();
        }
    }
}
