package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputRDD;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputRDD;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;

/* loaded from: input_file:org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.class */
public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
    public SparkGraphComputer(HadoopGraph hadoopGraph) {
        super(hadoopGraph);
    }

    public Future<ComputerResult> submit() {
        super.validateStatePriorToExecution();
        HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(this.hadoopGraph.m15configuration());
        hadoopConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, Boolean.valueOf(this.persist.equals(GraphComputer.Persist.EDGES)));
        Configuration makeHadoopConfiguration = ConfUtil.makeHadoopConfiguration(hadoopConfiguration);
        if (FileInputFormat.class.isAssignableFrom(makeHadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
            try {
                String path = FileSystem.get(makeHadoopConfiguration).getFileStatus(new Path(makeHadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
                hadoopConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, path);
                makeHadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, path);
            } catch (IOException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        return CompletableFuture.supplyAsync(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            Memory memory = null;
            String str = makeHadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, (String) null);
            if (null != str) {
                try {
                    FileSystem.get(makeHadoopConfiguration).delete(new Path(str), true);
                } catch (IOException e2) {
                    throw new IllegalStateException(e2.getMessage(), e2);
                }
            }
            SparkConf sparkConf = new SparkConf();
            sparkConf.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + ((Object) (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram)) + "[" + this.mapReducers + "]");
            makeHadoopConfiguration.forEach(entry -> {
                sparkConf.set((String) entry.getKey(), (String) entry.getValue());
            });
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            Throwable th = null;
            try {
                loadJars(javaSparkContext, makeHadoopConfiguration);
                try {
                    JavaPairRDD<Object, VertexWritable> cache = ((InputRDD) makeHadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class).newInstance()).readGraphRDD(hadoopConfiguration, javaSparkContext).setName("graphRDD").cache();
                    JavaPairRDD javaPairRDD = null;
                    if (null != this.vertexProgram) {
                        memory = new SparkMemory(this.vertexProgram, this.mapReducers, javaSparkContext);
                        this.vertexProgram.setup(memory);
                        memory.broadcastMemory(javaSparkContext);
                        HadoopConfiguration hadoopConfiguration2 = new HadoopConfiguration();
                        this.vertexProgram.storeState(hadoopConfiguration2);
                        ConfigurationUtils.copy(hadoopConfiguration2, hadoopConfiguration);
                        ConfUtil.mergeApacheIntoHadoopConfiguration(hadoopConfiguration2, makeHadoopConfiguration);
                        while (true) {
                            memory.setInTask(true);
                            javaPairRDD = SparkExecutor.executeVertexProgramIteration(cache, javaPairRDD, memory, hadoopConfiguration2);
                            memory.setInTask(false);
                            if (this.vertexProgram.terminate(memory)) {
                                break;
                            }
                            memory.incrIteration();
                            memory.broadcastMemory(javaSparkContext);
                        }
                        if (!this.persist.equals(GraphComputer.Persist.NOTHING)) {
                            try {
                                ((OutputRDD) makeHadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class).newInstance()).writeGraphRDD(hadoopConfiguration, cache);
                            } catch (IllegalAccessException | InstantiationException e3) {
                                throw new IllegalStateException(e3.getMessage(), e3);
                            }
                        }
                    }
                    MapMemory mapMemory = null == memory ? new MapMemory() : new MapMemory(memory);
                    if (!this.mapReducers.isEmpty()) {
                        JavaPairRDD cache2 = SparkExecutor.prepareGraphRDDForMapReduce(cache, javaPairRDD, this.vertexProgram == null ? new String[0] : (String[]) this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()])).setName("mapReduceGraphRDD").cache();
                        for (MapReduce mapReduce : this.mapReducers) {
                            HadoopConfiguration hadoopConfiguration3 = new HadoopConfiguration(hadoopConfiguration);
                            mapReduce.storeState(hadoopConfiguration3);
                            JavaPairRDD name = SparkExecutor.executeMap(cache2, mapReduce, hadoopConfiguration3).setName("mapRDD");
                            JavaPairRDD name2 = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(name, mapReduce, hadoopConfiguration3).setName("reduceRDD") : null;
                            SparkExecutor.saveMapReduceRDD(null == name2 ? name : name2, mapReduce, mapMemory, makeHadoopConfiguration);
                        }
                    }
                    mapMemory.setRuntime(System.currentTimeMillis() - currentTimeMillis);
                    DefaultComputerResult defaultComputerResult = new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), mapMemory.asImmutable());
                    if (javaSparkContext != null) {
                        if (0 != 0) {
                            try {
                                javaSparkContext.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            javaSparkContext.close();
                        }
                    }
                    return defaultComputerResult;
                } catch (IllegalAccessException | InstantiationException e4) {
                    throw new IllegalStateException(e4.getMessage(), e4);
                }
            } catch (Throwable th3) {
                if (javaSparkContext != null) {
                    if (0 != 0) {
                        try {
                            javaSparkContext.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        javaSparkContext.close();
                    }
                }
                throw th3;
            }
        });
    }

    private void loadJars(JavaSparkContext javaSparkContext, Configuration configuration) {
        if (configuration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
            String str = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
            if (null == str) {
                this.logger.warn("HADOOP_GREMLIN_LIBS is not set -- proceeding regardless");
                return;
            }
            for (String str2 : str.split(":")) {
                File file = new File(str2);
                if (file.exists()) {
                    Stream.of((Object[]) file.listFiles()).filter(file2 -> {
                        return file2.getName().endsWith(Constants.DOT_JAR);
                    }).forEach(file3 -> {
                        javaSparkContext.addJar(file3.getAbsolutePath());
                    });
                } else {
                    this.logger.warn(str2 + " does not reference a valid directory -- proceeding regardless");
                }
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(strArr[0]);
        new SparkGraphComputer(HadoopGraph.open(propertiesConfiguration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(propertiesConfiguration), propertiesConfiguration)).submit().get();
    }
}
