package com.linkedin.venice.hadoop.schema;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.annotation.NotThreadsafe;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.schema.rmd.RmdVersionId;
import com.linkedin.venice.utils.Utils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@NotThreadsafe
/* loaded from: input_file:com/linkedin/venice/hadoop/schema/HDFSRmdSchemaSource.class */
public class HDFSRmdSchemaSource implements RmdSchemaSource, AutoCloseable {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) HDFSRmdSchemaSource.class);
    private static final String UNDERSCORE = "_";
    private static final String SEPARATOR = "/";
    private final String storeName;
    private final FileSystem fs;
    private final Path schemaDir;

    public HDFSRmdSchemaSource(String str, String str2) throws IOException {
        this.fs = FileSystem.get(new Configuration());
        this.schemaDir = new Path(str);
        if (!this.fs.exists(this.schemaDir)) {
            this.fs.mkdirs(this.schemaDir);
        }
        this.storeName = str2;
    }

    public HDFSRmdSchemaSource(String str) throws IOException {
        this(str, null);
    }

    public String getPath() {
        return this.schemaDir.toString();
    }

    public void loadRmdSchemasOnDisk(ControllerClient controllerClient) throws IOException, IllegalStateException {
        LOGGER.info("Starting caching RMD schemas for {} at {}", this.storeName, this.schemaDir.toString());
        for (MultiSchemaResponse.Schema schema : controllerClient.getAllReplicationMetadataSchemas(this.storeName).getSchemas()) {
            Path path = new Path(this.schemaDir + "/" + schema.getId() + "_" + schema.getRmdValueSchemaId());
            if (this.fs.exists(path)) {
                throw new IllegalStateException(String.format("The schema path %s already exists.", path));
            }
            FSDataOutputStream create = this.fs.create(path);
            try {
                OutputStreamWriter outputStreamWriter = new OutputStreamWriter(create, StandardCharsets.UTF_8);
                try {
                    outputStreamWriter.write(schema.getSchemaStr() + "\n");
                    outputStreamWriter.flush();
                    outputStreamWriter.close();
                    if (create != null) {
                        create.close();
                    }
                    LOGGER.info("Finished writing RMD schema with id {} and RMD value schema id {} onto disk", Integer.valueOf(schema.getId()), Integer.valueOf(schema.getRmdValueSchemaId()));
                } finally {
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Override // com.linkedin.venice.hadoop.schema.RmdSchemaSource
    public Map<RmdVersionId, Schema> fetchSchemas() throws IOException {
        HashMap hashMap = new HashMap();
        FileStatus[] listStatus = this.fs.listStatus(this.schemaDir);
        LOGGER.info("Starting fetching RMD schemas at {}", this.schemaDir.toString());
        for (FileStatus fileStatus : listStatus) {
            Path path = fileStatus.getPath();
            FSDataInputStream open = this.fs.open(path);
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open, StandardCharsets.UTF_8));
                try {
                    RmdVersionId parseIdsFromPath = parseIdsFromPath(path);
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        throw new RuntimeException(String.format("Failed to load RMD schema at the path %s", path));
                    }
                    hashMap.put(parseIdsFromPath, AvroCompatibilityHelper.parse(readLine));
                    bufferedReader.close();
                    if (open != null) {
                        open.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (open != null) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return hashMap;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Utils.closeQuietlyWithErrorLogged(this.fs);
    }

    private RmdVersionId parseIdsFromPath(Path path) {
        String[] split = path.getName().split("_");
        return new RmdVersionId(Integer.parseInt(split[1]), Integer.parseInt(split[0]));
    }
}
