package org.apache.giraph.comm.messages;

import com.google.common.collect.Iterators;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
import org.apache.giraph.utils.VertexIdIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.class */
public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, M extends Writable> extends SimpleMessageStore<I, M, ExtendedDataOutput> {

    /* loaded from: input_file:org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore$Factory.class */
    private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
        private final CentralizedServiceWorker<I, ?, ?, M> service;
        private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;

        public Factory(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
            this.service = centralizedServiceWorker;
            this.config = immutableClassesGiraphConfiguration;
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public MessageStoreByPartition<I, M> newStore() {
            return new ByteArrayMessagesPerVertexStore(this.service, this.config);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore$MessagesIterable.class */
    public class MessagesIterable extends RepresentativeByteArrayIterable<M> {
        private MessagesIterable(byte[] bArr, int i, int i2) {
            super(ByteArrayMessagesPerVertexStore.this.config, bArr, i, i2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.giraph.utils.ByteArrayIterable
        public M createWritable() {
            return ByteArrayMessagesPerVertexStore.this.config.createMessageValue();
        }
    }

    /* loaded from: input_file:org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore$RepresentativeMessageIterator.class */
    private class RepresentativeMessageIterator extends RepresentativeByteArrayIterator<M> {
        public RepresentativeMessageIterator(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, byte[] bArr, int i, int i2) {
            super(immutableClassesGiraphConfiguration, bArr, i, i2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.giraph.utils.ByteArrayIterator
        public M createWritable() {
            return ByteArrayMessagesPerVertexStore.this.config.createMessageValue();
        }
    }

    public ByteArrayMessagesPerVertexStore(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
        super(centralizedServiceWorker, immutableClassesGiraphConfiguration);
    }

    private ExtendedDataOutput getExtendedDataOutput(ConcurrentMap<I, ExtendedDataOutput> concurrentMap, VertexIdIterator<I> vertexIdIterator) {
        ExtendedDataOutput extendedDataOutput = concurrentMap.get(vertexIdIterator.getCurrentVertexId());
        if (extendedDataOutput == null) {
            ExtendedDataOutput createExtendedDataOutput = this.config.createExtendedDataOutput();
            extendedDataOutput = concurrentMap.putIfAbsent(vertexIdIterator.releaseCurrentVertexId(), createExtendedDataOutput);
            if (extendedDataOutput == null) {
                extendedDataOutput = createExtendedDataOutput;
            }
        }
        return extendedDataOutput;
    }

    /* JADX WARN: Type inference failed for: r0v16, types: [org.apache.hadoop.io.Writable] */
    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void addPartitionMessages(int i, ByteArrayVertexIdMessages<I, M> byteArrayVertexIdMessages) throws IOException {
        ConcurrentMap<I, ExtendedDataOutput> orCreatePartitionMap = getOrCreatePartitionMap(i);
        ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator vertexIdMessageBytesIterator = byteArrayVertexIdMessages.getVertexIdMessageBytesIterator();
        if (vertexIdMessageBytesIterator != null) {
            while (vertexIdMessageBytesIterator.hasNext()) {
                vertexIdMessageBytesIterator.next();
                ExtendedDataOutput extendedDataOutput = getExtendedDataOutput(orCreatePartitionMap, vertexIdMessageBytesIterator);
                synchronized (extendedDataOutput) {
                    vertexIdMessageBytesIterator.writeCurrentMessageBytes(extendedDataOutput);
                }
            }
            return;
        }
        ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator vertexIdMessageIterator = byteArrayVertexIdMessages.getVertexIdMessageIterator();
        while (vertexIdMessageIterator.hasNext()) {
            vertexIdMessageIterator.next();
            ExtendedDataOutput extendedDataOutput2 = getExtendedDataOutput(orCreatePartitionMap, vertexIdMessageIterator);
            synchronized (extendedDataOutput2) {
                vertexIdMessageIterator.getCurrentMessage().write(extendedDataOutput2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public Iterable<M> getMessagesAsIterable(ExtendedDataOutput extendedDataOutput) {
        return new MessagesIterable(extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
    }

    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    protected int getNumberOfMessagesIn(ConcurrentMap<I, ExtendedDataOutput> concurrentMap) {
        int i = 0;
        for (ExtendedDataOutput extendedDataOutput : concurrentMap.values()) {
            i += Iterators.size(new RepresentativeMessageIterator(this.config, extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos()));
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public void writeMessages(ExtendedDataOutput extendedDataOutput, DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(extendedDataOutput.getPos());
        dataOutput.write(extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.giraph.comm.messages.SimpleMessageStore
    public ExtendedDataOutput readFieldsForMessages(DataInput dataInput) throws IOException {
        byte[] bArr = new byte[dataInput.readInt()];
        dataInput.readFully(bArr);
        return this.config.createExtendedDataOutput(bArr, 0);
    }

    public static <I extends WritableComparable, M extends Writable> MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(CentralizedServiceWorker<I, ?, ?, M> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration) {
        return new Factory(centralizedServiceWorker, immutableClassesGiraphConfiguration);
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void addMessages(MessageStore<I, M> messageStore) throws IOException {
        if (!(messageStore instanceof ByteArrayMessagesPerVertexStore)) {
            throw new IllegalArgumentException("addMessages: Illegal argument " + messageStore.getClass());
        }
        for (Map.Entry entry : ((ByteArrayMessagesPerVertexStore) messageStore).map.entrySet()) {
            for (Map.Entry entry2 : ((ConcurrentMap) entry.getValue()).entrySet()) {
                ConcurrentMap<I, ExtendedDataOutput> orCreatePartitionMap = getOrCreatePartitionMap(((Integer) entry.getKey()).intValue());
                ExtendedDataOutput extendedDataOutput = (ExtendedDataOutput) orCreatePartitionMap.get(entry2.getKey());
                if (extendedDataOutput == null) {
                    ExtendedDataOutput createExtendedDataOutput = this.config.createExtendedDataOutput();
                    extendedDataOutput = (ExtendedDataOutput) orCreatePartitionMap.putIfAbsent(entry2.getKey(), createExtendedDataOutput);
                    if (extendedDataOutput == null) {
                        extendedDataOutput = createExtendedDataOutput;
                    }
                }
                extendedDataOutput.write(((ExtendedDataOutput) entry2.getValue()).getByteArray(), 0, ((ExtendedDataOutput) entry2.getValue()).getPos());
            }
        }
    }
}
