package org.apache.giraph.comm.messages;

import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/SimpleMessageStore.class */
public abstract class SimpleMessageStore<I extends WritableComparable, M extends Writable, T> implements MessageStoreByPartition<I, M> {
    protected final CentralizedServiceWorker<I, ?, ?, M> service;
    protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
    protected final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;

    public SimpleMessageStore(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
        this.service = centralizedServiceWorker;
        this.config = immutableClassesGiraphConfiguration;
        this.map = new MapMaker().concurrencyLevel(immutableClassesGiraphConfiguration.getNettyServerExecutionConcurrency()).makeMap();
    }

    protected abstract Iterable<M> getMessagesAsIterable(T t);

    protected abstract int getNumberOfMessagesIn(ConcurrentMap<I, T> concurrentMap);

    protected abstract void writeMessages(T t, DataOutput dataOutput) throws IOException;

    protected abstract T readFieldsForMessages(DataInput dataInput) throws IOException;

    protected int getPartitionId(I i) {
        return this.service.getVertexPartitionOwner(i).getPartitionId();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConcurrentMap<I, T> getOrCreatePartitionMap(int i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        if (concurrentMap == null) {
            ConcurrentMap<I, T> makeMap = new MapMaker().concurrencyLevel(this.config.getNettyServerExecutionConcurrency()).makeMap();
            concurrentMap = this.map.putIfAbsent(Integer.valueOf(i), makeMap);
            if (concurrentMap == null) {
                concurrentMap = makeMap;
            }
        }
        return concurrentMap;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public Iterable<I> getPartitionDestinationVertices(int i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        return concurrentMap == null ? Collections.emptyList() : concurrentMap.keySet();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForVertex(I i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(getPartitionId(i)));
        if (concurrentMap == null) {
            return false;
        }
        return concurrentMap.containsKey(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<I> getDestinationVertices() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<ConcurrentMap<I, T>> it = this.map.values().iterator();
        while (it.hasNext()) {
            newArrayList.addAll(it.next().keySet());
        }
        return newArrayList;
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public Iterable<M> getVertexMessages(I i) throws IOException {
        T t;
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(getPartitionId(i)));
        if (concurrentMap != null && (t = concurrentMap.get(i)) != null) {
            return getMessagesAsIterable(t);
        }
        return Collections.emptyList();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public int getNumberOfMessages() {
        int i = 0;
        Iterator<ConcurrentMap<I, T>> it = this.map.values().iterator();
        while (it.hasNext()) {
            i += getNumberOfMessagesIn(it.next());
        }
        return i;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void writePartition(DataOutput dataOutput, int i) throws IOException {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        dataOutput.writeBoolean(concurrentMap != null);
        if (concurrentMap != null) {
            dataOutput.writeInt(concurrentMap.size());
            for (Map.Entry<I, T> entry : concurrentMap.entrySet()) {
                entry.getKey().write(dataOutput);
                writeMessages(entry.getValue(), dataOutput);
            }
        }
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.map.size());
        Iterator<Integer> it = this.map.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            dataOutput.writeInt(intValue);
            writePartition(dataOutput, intValue);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void readFieldsForPartition(DataInput dataInput, int i) throws IOException {
        if (dataInput.readBoolean()) {
            ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
            int readInt = dataInput.readInt();
            for (int i2 = 0; i2 < readInt; i2++) {
                I createVertexId = this.config.createVertexId();
                createVertexId.readFields(dataInput);
                newConcurrentMap.put(createVertexId, readFieldsForMessages(dataInput));
            }
            this.map.put(Integer.valueOf(i), newConcurrentMap);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            readFieldsForPartition(dataInput, dataInput.readInt());
        }
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void clearVertexMessages(I i) throws IOException {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(getPartitionId(i)));
        if (concurrentMap != null) {
            concurrentMap.remove(i);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void clearPartition(int i) throws IOException {
        this.map.remove(Integer.valueOf(i));
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void clearAll() throws IOException {
        this.map.clear();
    }
}
