package org.apache.giraph.comm.messages;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/OneMessagePerVertexStore.class */
public class OneMessagePerVertexStore<I extends WritableComparable, M extends Writable> extends SimpleMessageStore<I, M, M> {
    private final Combiner<I, M> combiner;

    /* loaded from: input_file:org/apache/giraph/comm/messages/OneMessagePerVertexStore$Factory.class */
    private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
        private final CentralizedServiceWorker<I, ?, ?, M> service;
        private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
        private final Combiner<I, M> combiner;

        public Factory(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
            this.service = centralizedServiceWorker;
            this.config = immutableClassesGiraphConfiguration;
            this.combiner = immutableClassesGiraphConfiguration.createCombiner();
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public MessageStoreByPartition<I, M> newStore() {
            return new OneMessagePerVertexStore(this.service, this.combiner, this.config);
        }
    }

    OneMessagePerVertexStore(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, Combiner<I, M> combiner, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
        super(centralizedServiceWorker, immutableClassesGiraphConfiguration);
        this.combiner = combiner;
    }

    private M getOrCreateCurrentMessage(I i, ConcurrentMap<I, M> concurrentMap) {
        M m = concurrentMap.get(i);
        if (m == null) {
            M createInitialMessage = this.combiner.createInitialMessage();
            m = concurrentMap.putIfAbsent(i, createInitialMessage);
            if (m == null) {
                m = createInitialMessage;
            }
        }
        return m;
    }

    private void addVertexMessageToPartition(I i, M m, ConcurrentMap<I, M> concurrentMap) throws IOException {
        M orCreateCurrentMessage = getOrCreateCurrentMessage(i, concurrentMap);
        synchronized (orCreateCurrentMessage) {
            this.combiner.combine(i, orCreateCurrentMessage, m);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void addPartitionMessages(int i, ByteArrayVertexIdMessages<I, M> byteArrayVertexIdMessages) throws IOException {
        ConcurrentMap<I, M> orCreatePartitionMap = getOrCreatePartitionMap(i);
        ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator vertexIdMessageIterator = byteArrayVertexIdMessages.getVertexIdMessageIterator();
        while (vertexIdMessageIterator.hasNext()) {
            vertexIdMessageIterator.next();
            I currentVertexId = vertexIdMessageIterator.getCurrentVertexId();
            Writable writable = (Writable) orCreatePartitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
            if (writable == null) {
                M createInitialMessage = this.combiner.createInitialMessage();
                writable = (Writable) orCreatePartitionMap.putIfAbsent(vertexIdMessageIterator.releaseCurrentVertexId(), createInitialMessage);
                if (writable == null) {
                    writable = createInitialMessage;
                }
            }
            synchronized (writable) {
                this.combiner.combine(currentVertexId, writable, vertexIdMessageIterator.getCurrentMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public Iterable<M> getMessagesAsIterable(M m) {
        return Collections.singleton(m);
    }

    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    protected int getNumberOfMessagesIn(ConcurrentMap<I, M> concurrentMap) {
        return concurrentMap.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public void writeMessages(M m, DataOutput dataOutput) throws IOException {
        m.write(dataOutput);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public M readFieldsForMessages(DataInput dataInput) throws IOException {
        M createMessageValue = this.config.createMessageValue();
        createMessageValue.readFields(dataInput);
        return createMessageValue;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void addMessages(MessageStore<I, M> messageStore) throws IOException {
        if (!(messageStore instanceof OneMessagePerVertexStore)) {
            throw new IllegalArgumentException("addMessages: Illegal argument " + messageStore.getClass());
        }
        for (Map.Entry entry : ((OneMessagePerVertexStore) messageStore).map.entrySet()) {
            ConcurrentMap orCreatePartitionMap = getOrCreatePartitionMap(((Integer) entry.getKey()).intValue());
            for (Map.Entry entry2 : ((ConcurrentMap) entry.getValue()).entrySet()) {
                addVertexMessageToPartition((WritableComparable) entry2.getKey(), (Writable) entry2.getValue(), orCreatePartitionMap);
            }
        }
    }

    public static <I extends WritableComparable, M extends Writable> MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
        return new Factory(centralizedServiceWorker, immutableClassesGiraphConfiguration);
    }
}
