package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;

import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.class */
public final class SparkGraphComputer implements GraphComputer {
    public static final Logger LOGGER = LoggerFactory.getLogger(SparkGraphComputer.class);
    protected final HadoopGraph hadoopGraph;
    private VertexProgram vertexProgram;
    protected final SparkConf configuration = new SparkConf();
    private boolean executed = false;
    private final Set<MapReduce> mapReducers = new HashSet();
    private Optional<GraphComputer.ResultGraph> resultGraph = Optional.empty();
    private Optional<GraphComputer.Persist> persist = Optional.empty();

    public SparkGraphComputer(HadoopGraph hadoopGraph) {
        this.hadoopGraph = hadoopGraph;
    }

    public GraphComputer isolation(GraphComputer.Isolation isolation) {
        if (isolation.equals(GraphComputer.Isolation.BSP)) {
            return this;
        }
        throw GraphComputer.Exceptions.isolationNotSupported(isolation);
    }

    public GraphComputer result(GraphComputer.ResultGraph resultGraph) {
        this.resultGraph = Optional.of(resultGraph);
        return this;
    }

    public GraphComputer persist(GraphComputer.Persist persist) {
        this.persist = Optional.of(persist);
        return this;
    }

    public GraphComputer program(VertexProgram vertexProgram) {
        this.vertexProgram = vertexProgram;
        return this;
    }

    public GraphComputer mapReduce(MapReduce mapReduce) {
        this.mapReducers.add(mapReduce);
        return this;
    }

    public String toString() {
        return StringFactory.graphComputerString(this);
    }

    public Future<ComputerResult> submit() {
        if (this.executed) {
            throw GraphComputer.Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        }
        this.executed = true;
        if (null == this.vertexProgram && this.mapReducers.isEmpty()) {
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        }
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer(this, this.vertexProgram);
            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
        }
        if (!this.persist.isPresent()) {
            this.persist = Optional.of(null == this.vertexProgram ? GraphComputer.Persist.NOTHING : this.vertexProgram.getPreferredPersist());
        }
        if (!this.resultGraph.isPresent()) {
            this.resultGraph = Optional.of(null == this.vertexProgram ? GraphComputer.ResultGraph.ORIGINAL : this.vertexProgram.getPreferredResultGraph());
        }
        if (this.resultGraph.get().equals(GraphComputer.ResultGraph.ORIGINAL) && !this.persist.get().equals(GraphComputer.Persist.NOTHING)) {
            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraph.get(), this.persist.get());
        }
        HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(this.hadoopGraph.m18configuration());
        hadoopConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, Boolean.valueOf(this.persist.get().equals(GraphComputer.Persist.EDGES)));
        Configuration makeHadoopConfiguration = ConfUtil.makeHadoopConfiguration(hadoopConfiguration);
        return CompletableFuture.supplyAsync(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            Memory memory = null;
            SparkExecutor.deleteOutputLocation(makeHadoopConfiguration);
            SparkConf sparkConf = new SparkConf();
            sparkConf.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + ((Object) (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram)) + "[" + this.mapReducers + "]");
            makeHadoopConfiguration.forEach(entry -> {
                sparkConf.set((String) entry.getKey(), (String) entry.getValue());
            });
            if (FileInputFormat.class.isAssignableFrom(makeHadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
                makeHadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, SparkExecutor.getInputLocation(makeHadoopConfiguration));
            }
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            Throwable th = null;
            try {
                try {
                    loadJars(javaSparkContext, makeHadoopConfiguration);
                    JavaPairRDD cache = javaSparkContext.newAPIHadoopRDD(makeHadoopConfiguration, makeHadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class), NullWritable.class, VertexWritable.class).mapToPair(tuple2 -> {
                        return new Tuple2(((VertexWritable) tuple2._2()).get().id(), new VertexWritable(((VertexWritable) tuple2._2()).get()));
                    }).reduceByKey((vertexWritable, vertexWritable2) -> {
                        return vertexWritable;
                    }).cache();
                    JavaPairRDD javaPairRDD = null;
                    if (null != this.vertexProgram) {
                        memory = new SparkMemory(this.vertexProgram, this.mapReducers, javaSparkContext);
                        this.vertexProgram.setup(memory);
                        memory.broadcastMemory(javaSparkContext);
                        HadoopConfiguration hadoopConfiguration2 = new HadoopConfiguration();
                        this.vertexProgram.storeState(hadoopConfiguration2);
                        ConfigurationUtils.copy(hadoopConfiguration2, hadoopConfiguration);
                        ConfUtil.mergeApacheIntoHadoopConfiguration(hadoopConfiguration2, makeHadoopConfiguration);
                        while (true) {
                            memory.setInTask(true);
                            javaPairRDD = SparkExecutor.executeVertexProgramIteration(cache, javaPairRDD, memory, hadoopConfiguration2);
                            memory.setInTask(false);
                            if (this.vertexProgram.terminate(memory)) {
                                break;
                            }
                            memory.incrIteration();
                            memory.broadcastMemory(javaSparkContext);
                        }
                        if (!this.persist.get().equals(GraphComputer.Persist.NOTHING)) {
                            SparkExecutor.saveGraphRDD(cache, makeHadoopConfiguration);
                        }
                    }
                    MapMemory mapMemory = null == memory ? new MapMemory() : new MapMemory(memory);
                    if (!this.mapReducers.isEmpty()) {
                        JavaPairRDD cache2 = SparkExecutor.prepareGraphRDDForMapReduce(cache, javaPairRDD).cache();
                        for (MapReduce mapReduce : this.mapReducers) {
                            HadoopConfiguration hadoopConfiguration3 = new HadoopConfiguration(hadoopConfiguration);
                            mapReduce.storeState(hadoopConfiguration3);
                            JavaPairRDD executeMap = SparkExecutor.executeMap(cache2, mapReduce, hadoopConfiguration3);
                            JavaPairRDD executeReduce = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(executeMap, mapReduce, hadoopConfiguration3) : null;
                            SparkExecutor.saveMapReduceRDD(null == executeReduce ? executeMap : executeReduce, mapReduce, mapMemory, makeHadoopConfiguration);
                        }
                    }
                    mapMemory.setRuntime(System.currentTimeMillis() - currentTimeMillis);
                    DefaultComputerResult defaultComputerResult = new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph.get(), this.persist.get()), mapMemory.asImmutable());
                    if (javaSparkContext != null) {
                        if (0 != 0) {
                            try {
                                javaSparkContext.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            javaSparkContext.close();
                        }
                    }
                    return defaultComputerResult;
                } finally {
                }
            } catch (Throwable th3) {
                if (javaSparkContext != null) {
                    if (th != null) {
                        try {
                            javaSparkContext.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        javaSparkContext.close();
                    }
                }
                throw th3;
            }
        });
    }

    private static void loadJars(JavaSparkContext javaSparkContext, Configuration configuration) {
        if (configuration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
            String str = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
            if (null == str) {
                LOGGER.warn("HADOOP_GREMLIN_LIBS is not set -- proceeding regardless");
                return;
            }
            for (String str2 : str.split(":")) {
                File file = new File(str2);
                if (file.exists()) {
                    Stream.of((Object[]) file.listFiles()).filter(file2 -> {
                        return file2.getName().endsWith(Constants.DOT_JAR);
                    }).forEach(file3 -> {
                        javaSparkContext.addJar(file3.getAbsolutePath());
                    });
                } else {
                    LOGGER.warn(str2 + " does not reference a valid directory -- proceeding regardless");
                }
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration(strArr[0]);
        new SparkGraphComputer(HadoopGraph.open(propertiesConfiguration)).program(VertexProgram.createVertexProgram(propertiesConfiguration)).submit().get();
    }

    public GraphComputer.Features features() {
        return new GraphComputer.Features() { // from class: org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer.1
            public boolean supportsVertexAddition() {
                return false;
            }

            public boolean supportsVertexRemoval() {
                return false;
            }

            public boolean supportsVertexPropertyRemoval() {
                return false;
            }

            public boolean supportsEdgeAddition() {
                return false;
            }

            public boolean supportsEdgeRemoval() {
                return false;
            }

            public boolean supportsEdgePropertyAddition() {
                return false;
            }

            public boolean supportsEdgePropertyRemoval() {
                return false;
            }

            public boolean supportsIsolation(GraphComputer.Isolation isolation) {
                return isolation.equals(GraphComputer.Isolation.BSP);
            }

            public boolean supportsResultGraphPersistCombination(GraphComputer.ResultGraph resultGraph, GraphComputer.Persist persist) {
                return persist.equals(GraphComputer.Persist.NOTHING) || resultGraph.equals(GraphComputer.ResultGraph.NEW);
            }

            public boolean supportsDirectObjects() {
                return false;
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1061245065:
                if (implMethodName.equals("lambda$null$98fa99ce$1")) {
                    z = false;
                    break;
                }
                break;
            case 1483038734:
                if (implMethodName.equals("lambda$null$37655630$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable;Lorg/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable;)Lorg/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable;")) {
                    return (vertexWritable, vertexWritable2) -> {
                        return vertexWritable;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lscala/Tuple2;")) {
                    return tuple2 -> {
                        return new Tuple2(((VertexWritable) tuple2._2()).get().id(), new VertexWritable(((VertexWritable) tuple2._2()).get()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
