package org.apache.mahout.graph.linkanalysis;

import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.antlr.stringtemplate.language.ASTExpr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
import org.apache.mahout.graph.AdjacencyMatrixJob;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.function.Functions;
import org.apache.mahout.math.hadoop.DistributedRowMatrix;

/* loaded from: input_file:org/apache/mahout/graph/linkanalysis/RandomWalk.class */
abstract class RandomWalk extends AbstractJob {
    static final String RANK_VECTOR = "rankVector";
    static final String NUM_VERTICES_PARAM = AdjacencyMatrixJob.class.getName() + ".numVertices";
    static final String STAYING_PROBABILITY_PARAM = AdjacencyMatrixJob.class.getName() + ".stayingProbability";

    /* loaded from: input_file:org/apache/mahout/graph/linkanalysis/RandomWalk$RankPerVertexMapper.class */
    public static class RankPerVertexMapper extends Mapper<IntWritable, IntWritable, IntWritable, DoubleWritable> {
        static final String RANK_PATH_PARAM = RankPerVertexMapper.class.getName() + ".pageRankPath";
        private Vector ranks;

        protected void setup(Mapper<IntWritable, IntWritable, IntWritable, DoubleWritable>.Context context) throws IOException, InterruptedException {
            Path path = new Path(context.getConfiguration().get(RANK_PATH_PARAM));
            FSDataInputStream fSDataInputStream = null;
            try {
                fSDataInputStream = FileSystem.get(path.toUri(), context.getConfiguration()).open(path);
                this.ranks = VectorWritable.readVector(fSDataInputStream);
                Closeables.closeQuietly(fSDataInputStream);
            } catch (Throwable th) {
                Closeables.closeQuietly(fSDataInputStream);
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void map(IntWritable intWritable, IntWritable intWritable2, Mapper.Context context) throws IOException, InterruptedException {
            context.write(intWritable2, new DoubleWritable(this.ranks.get(intWritable.get())));
        }
    }

    /* loaded from: input_file:org/apache/mahout/graph/linkanalysis/RandomWalk$TransposeMapper.class */
    static class TransposeMapper extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
        private int numVertices;
        private double stayingProbability;

        TransposeMapper() {
        }

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            this.stayingProbability = Double.parseDouble(context.getConfiguration().get(RandomWalk.STAYING_PROBABILITY_PARAM));
            this.numVertices = Integer.parseInt(context.getConfiguration().get(RandomWalk.NUM_VERTICES_PARAM));
        }

        protected void map(IntWritable intWritable, VectorWritable vectorWritable, Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context context) throws IOException, InterruptedException {
            int i = intWritable.get();
            Vector normalize = vectorWritable.get().normalize(1.0d);
            if (this.stayingProbability != 1.0d) {
                normalize.assign(Functions.MULT, this.stayingProbability);
            }
            Iterator<Vector.Element> iterateNonZero = normalize.iterateNonZero();
            while (iterateNonZero.hasNext()) {
                Vector.Element next = iterateNonZero.next();
                RandomAccessSparseVector randomAccessSparseVector = new RandomAccessSparseVector(this.numVertices, 1);
                randomAccessSparseVector.setQuick(i, next.get());
                intWritable.set(next.index());
                context.write(intWritable, new VectorWritable(randomAccessSparseVector));
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((IntWritable) obj, (VectorWritable) obj2, (Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context) context);
        }
    }

    protected abstract Vector createDampingVector(int i, double d);

    protected void addSpecificOptions() {
    }

    protected void evaluateSpecificOptions(Map<String, String> map) {
    }

    public final int run(String[] strArr) throws Exception {
        addOutputOption();
        addOption("vertices", (String) null, "a text file containing all vertices of the graph (one per line)", true);
        addOption("edges", (String) null, "edges of the graph", true);
        addOption("numIterations", ASTExpr.DEFAULT_ATTRIBUTE_NAME, "number of numIterations", String.valueOf(10));
        addOption("stayingProbability", "tp", "probability not to teleport to a random vertex", String.valueOf(0.85d));
        addSpecificOptions();
        Map<String, String> parseArguments = parseArguments(strArr);
        if (parseArguments == null) {
            return -1;
        }
        evaluateSpecificOptions(parseArguments);
        int parseInt = Integer.parseInt(parseArguments.get("--numIterations"));
        double parseDouble = Double.parseDouble(parseArguments.get("--stayingProbability"));
        Preconditions.checkArgument(parseInt > 0);
        Preconditions.checkArgument(parseDouble > 0.0d && parseDouble <= 1.0d);
        Path tempPath = getTempPath(AdjacencyMatrixJob.ADJACENCY_MATRIX);
        Path tempPath2 = getTempPath("transitionMatrix");
        Path tempPath3 = getTempPath(AdjacencyMatrixJob.VERTEX_INDEX);
        Path tempPath4 = getTempPath(AdjacencyMatrixJob.NUM_VERTICES);
        ToolRunner.run(getConf(), new AdjacencyMatrixJob(), new String[]{"--vertices", parseArguments.get("--vertices"), "--edges", parseArguments.get("--edges"), "--output", getTempPath().toString()});
        int readInt = HadoopUtil.readInt(tempPath4, getConf());
        Preconditions.checkArgument(readInt > 0);
        Job prepareJob = prepareJob(tempPath, tempPath2, TransposeMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
        prepareJob.setCombinerClass(MergeVectorsCombiner.class);
        prepareJob.getConfiguration().set(NUM_VERTICES_PARAM, String.valueOf(readInt));
        prepareJob.getConfiguration().set(STAYING_PROBABILITY_PARAM, String.valueOf(parseDouble));
        prepareJob.waitForCompletion(true);
        DistributedRowMatrix distributedRowMatrix = new DistributedRowMatrix(tempPath2, getTempPath(), readInt, readInt);
        distributedRowMatrix.setConf(getConf());
        Vector assign = new DenseVector(readInt).assign(1.0d / readInt);
        Vector createDampingVector = createDampingVector(readInt, parseDouble);
        while (true) {
            int i = parseInt;
            parseInt--;
            if (i <= 0) {
                persistVector(getConf(), getTempPath(RANK_VECTOR), assign);
                Job prepareJob2 = prepareJob(tempPath3, getOutputPath(), SequenceFileInputFormat.class, RankPerVertexMapper.class, LongWritable.class, DoubleWritable.class, TextOutputFormat.class);
                prepareJob2.getConfiguration().set(RankPerVertexMapper.RANK_PATH_PARAM, getTempPath(RANK_VECTOR).toString());
                prepareJob2.waitForCompletion(true);
                return 1;
            }
            assign = distributedRowMatrix.times(assign).plus(createDampingVector);
        }
    }

    static void persistVector(Configuration configuration, Path path, Vector vector) throws IOException {
        FSDataOutputStream fSDataOutputStream = null;
        try {
            fSDataOutputStream = FileSystem.get(path.toUri(), configuration).create(path, true);
            VectorWritable.writeVector(fSDataOutputStream, vector);
            Closeables.closeQuietly(fSDataOutputStream);
        } catch (Throwable th) {
            Closeables.closeQuietly(fSDataOutputStream);
            throw th;
        }
    }
}
