package org.apache.giraph.comm.messages;

import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/SimpleMessageStore.class */
public abstract class SimpleMessageStore<I extends WritableComparable, M extends Writable, T> implements MessageStore<I, M> {
    protected final MessageValueFactory<M> messageValueFactory;
    protected final CentralizedServiceWorker<I, ?, ?> service;
    protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
    protected final ImmutableClassesGiraphConfiguration<I, ?, ?> config;

    public SimpleMessageStore(MessageValueFactory<M> messageValueFactory, CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        this.messageValueFactory = messageValueFactory;
        this.service = centralizedServiceWorker;
        this.config = immutableClassesGiraphConfiguration;
        this.map = new MapMaker().concurrencyLevel(immutableClassesGiraphConfiguration.getNettyServerExecutionConcurrency()).makeMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Iterable<M> getMessagesAsIterable(T t);

    protected abstract int getNumberOfMessagesIn(ConcurrentMap<I, T> concurrentMap);

    protected abstract void writeMessages(T t, DataOutput dataOutput) throws IOException;

    protected abstract T readFieldsForMessages(DataInput dataInput) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public int getPartitionId(I i) {
        return this.service.getVertexPartitionOwner(i).getPartitionId();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConcurrentMap<I, T> getOrCreatePartitionMap(int i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        if (concurrentMap == null) {
            ConcurrentMap<I, T> makeMap = new MapMaker().concurrencyLevel(this.config.getNettyServerExecutionConcurrency()).makeMap();
            concurrentMap = this.map.putIfAbsent(Integer.valueOf(i), makeMap);
            if (concurrentMap == null) {
                concurrentMap = makeMap;
            }
        }
        return concurrentMap;
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void finalizeStore() {
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<I> getPartitionDestinationVertices(int i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        return concurrentMap == null ? Collections.emptyList() : concurrentMap.keySet();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForVertex(I i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(getPartitionId(i)));
        return concurrentMap != null && concurrentMap.containsKey(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<M> getVertexMessages(I i) {
        T t;
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(getPartitionId(i)));
        if (concurrentMap != null && (t = concurrentMap.get(i)) != null) {
            return getMessagesAsIterable(t);
        }
        return Collections.emptyList();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void writePartition(DataOutput dataOutput, int i) throws IOException {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        dataOutput.writeBoolean(concurrentMap != null);
        if (concurrentMap != null) {
            dataOutput.writeInt(concurrentMap.size());
            for (Map.Entry<I, T> entry : concurrentMap.entrySet()) {
                entry.getKey().write(dataOutput);
                writeMessages(entry.getValue(), dataOutput);
            }
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void readFieldsForPartition(DataInput dataInput, int i) throws IOException {
        if (dataInput.readBoolean()) {
            ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
            int readInt = dataInput.readInt();
            for (int i2 = 0; i2 < readInt; i2++) {
                I createVertexId = this.config.createVertexId();
                createVertexId.readFields(dataInput);
                newConcurrentMap.put(createVertexId, readFieldsForMessages(dataInput));
            }
            this.map.put(Integer.valueOf(i), newConcurrentMap);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void clearVertexMessages(I i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(getPartitionId(i)));
        if (concurrentMap != null) {
            concurrentMap.remove(i);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void clearPartition(int i) {
        this.map.remove(Integer.valueOf(i));
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForPartition(int i) {
        ConcurrentMap<I, T> concurrentMap = this.map.get(Integer.valueOf(i));
        return (concurrentMap == null || concurrentMap.isEmpty()) ? false : true;
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public void clearAll() {
        this.map.clear();
    }
}
