package org.apache.giraph.edge;

import com.google.common.collect.MapMaker;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/edge/EdgeStore.class */
public class EdgeStore<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {
    private static final Logger LOG = Logger.getLogger(EdgeStore.class);
    private CentralizedServiceWorker<I, V, E, M> service;
    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
    private Progressable progressable;
    private ConcurrentMap<Integer, ConcurrentMap<I, OutEdges<I, E>>> transientEdges;
    private boolean reuseEdgeObjects;
    private boolean useInputOutEdges;

    public EdgeStore(CentralizedServiceWorker<I, V, E, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, V, E, M> immutableClassesGiraphConfiguration, Progressable progressable) {
        this.service = centralizedServiceWorker;
        this.configuration = immutableClassesGiraphConfiguration;
        this.progressable = progressable;
        this.transientEdges = new MapMaker().concurrencyLevel(immutableClassesGiraphConfiguration.getNettyServerExecutionConcurrency()).makeMap();
        this.reuseEdgeObjects = immutableClassesGiraphConfiguration.reuseEdgeObjects();
        this.useInputOutEdges = immutableClassesGiraphConfiguration.useInputOutEdges();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void addPartitionEdges(int i, ByteArrayVertexIdEdges<I, E> byteArrayVertexIdEdges) {
        ConcurrentMap<I, OutEdges<I, E>> concurrentMap = this.transientEdges.get(Integer.valueOf(i));
        if (concurrentMap == null) {
            ConcurrentMap<I, OutEdges<I, E>> makeMap = new MapMaker().concurrencyLevel(this.configuration.getNettyServerExecutionConcurrency()).makeMap();
            concurrentMap = this.transientEdges.putIfAbsent(Integer.valueOf(i), makeMap);
            if (concurrentMap == null) {
                concurrentMap = makeMap;
            }
        }
        ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator = byteArrayVertexIdEdges.getVertexIdEdgeIterator();
        while (vertexIdEdgeIterator.hasNext()) {
            vertexIdEdgeIterator.next();
            Object currentVertexId = vertexIdEdgeIterator.getCurrentVertexId();
            Edge currentEdge = this.reuseEdgeObjects ? vertexIdEdgeIterator.getCurrentEdge() : vertexIdEdgeIterator.releaseCurrentEdge();
            OutEdges<I, E> outEdges = concurrentMap.get(currentVertexId);
            if (outEdges == null) {
                OutEdges<I, E> createAndInitializeInputOutEdges = this.configuration.createAndInitializeInputOutEdges();
                outEdges = (OutEdges) concurrentMap.putIfAbsent(currentVertexId, createAndInitializeInputOutEdges);
                if (outEdges == null) {
                    outEdges = createAndInitializeInputOutEdges;
                    vertexIdEdgeIterator.releaseCurrentVertexId();
                }
            }
            synchronized (outEdges) {
                outEdges.add(currentEdge);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OutEdges<I, E> convertInputToComputeEdges(OutEdges<I, E> outEdges) {
        if (!this.useInputOutEdges) {
            return outEdges;
        }
        OutEdges<I, E> createAndInitializeOutEdges = this.configuration.createAndInitializeOutEdges(outEdges.size());
        Iterator<Edge<I, E>> it = outEdges.iterator();
        while (it.hasNext()) {
            createAndInitializeOutEdges.add((Edge) it.next());
        }
        return createAndInitializeOutEdges;
    }

    public void moveEdgesToVertices() {
        if (this.transientEdges.isEmpty()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("moveEdgesToVertices: No edges to move");
                return;
            }
            return;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
        }
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(this.transientEdges.size());
        arrayBlockingQueue.addAll(this.transientEdges.keySet());
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.edge.EdgeStore.1
            @Override // org.apache.giraph.utils.CallableFactory
            public Callable<Void> newCallable(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.edge.EdgeStore.1.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        while (true) {
                            Integer num = (Integer) arrayBlockingQueue.poll();
                            if (num == null) {
                                return null;
                            }
                            Partition<I, V, E, M> partition = EdgeStore.this.service.getPartitionStore().getPartition(num);
                            ConcurrentMap concurrentMap = (ConcurrentMap) EdgeStore.this.transientEdges.remove(num);
                            for (WritableComparable writableComparable : concurrentMap.keySet()) {
                                OutEdges convertInputToComputeEdges = EdgeStore.this.convertInputToComputeEdges((OutEdges) concurrentMap.remove(writableComparable));
                                Vertex vertex = partition.getVertex(writableComparable);
                                if (vertex == null) {
                                    Vertex createVertex = EdgeStore.this.configuration.createVertex();
                                    createVertex.initialize(writableComparable, EdgeStore.this.configuration.createVertexValue(), convertInputToComputeEdges);
                                    partition.putVertex(createVertex);
                                } else {
                                    vertex.setEdges(convertInputToComputeEdges);
                                    partition.saveVertex(vertex);
                                }
                            }
                            EdgeStore.this.service.getPartitionStore().putPartition(partition);
                        }
                    }
                };
            }
        }, this.configuration.getNumInputSplitsThreads(), "move-edges-%d", this.progressable);
        this.transientEdges.clear();
        if (LOG.isInfoEnabled()) {
            LOG.info("moveEdgesToVertices: Finished moving incoming edges to vertices.");
        }
    }
}
