package org.apache.giraph.comm.messages;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.class */
public class DiskBackedMessageStoreByPartition<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements MessageStoreByPartition<I, M> {
    private final CentralizedServiceWorker<I, V, E, M> service;
    private final int maxNumberOfMessagesInMemory;
    private final MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;
    private final ConcurrentMap<Integer, FlushableMessageStore<I, M>> partitionMessageStores = Maps.newConcurrentMap();

    /* loaded from: input_file:org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition$Factory.class */
    private static class Factory<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
        private final CentralizedServiceWorker<I, V, E, M> service;
        private final int maxMessagesInMemory;
        private final MessageStoreFactory<I, M, FlushableMessageStore<I, M>> fileStoreFactory;

        public Factory(CentralizedServiceWorker<I, V, E, M> centralizedServiceWorker, int i, MessageStoreFactory<I, M, FlushableMessageStore<I, M>> messageStoreFactory) {
            this.service = centralizedServiceWorker;
            this.maxMessagesInMemory = i;
            this.fileStoreFactory = messageStoreFactory;
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public MessageStoreByPartition<I, M> newStore() {
            return new DiskBackedMessageStoreByPartition(this.service, this.maxMessagesInMemory, this.fileStoreFactory);
        }
    }

    public DiskBackedMessageStoreByPartition(CentralizedServiceWorker<I, V, E, M> centralizedServiceWorker, int i, MessageStoreFactory<I, M, FlushableMessageStore<I, M>> messageStoreFactory) {
        this.service = centralizedServiceWorker;
        this.maxNumberOfMessagesInMemory = i;
        this.fileStoreFactory = messageStoreFactory;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void addPartitionMessages(int i, ByteArrayVertexIdMessages<I, M> byteArrayVertexIdMessages) throws IOException {
        FlushableMessageStore<I, M> messageStore = getMessageStore(i);
        if (!(messageStore instanceof DiskBackedMessageStore)) {
            throw new IllegalStateException("addPartitionMessages: Doesn't support class " + messageStore.getClass());
        }
        DiskBackedMessageStore diskBackedMessageStore = (DiskBackedMessageStore) messageStore;
        ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator vertexIdMessageIterator = byteArrayVertexIdMessages.getVertexIdMessageIterator();
        while (vertexIdMessageIterator.hasNext()) {
            vertexIdMessageIterator.next();
            if (diskBackedMessageStore.addVertexMessages(vertexIdMessageIterator.getCurrentVertexId(), Collections.singleton(vertexIdMessageIterator.getCurrentMessage()))) {
                vertexIdMessageIterator.releaseCurrentVertexId();
            }
        }
        checkMemory();
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void addMessages(MessageStore<I, M> messageStore) throws IOException {
        for (I i : messageStore.getDestinationVertices()) {
            FlushableMessageStore<I, M> messageStore2 = getMessageStore((DiskBackedMessageStoreByPartition<I, V, E, M>) i);
            if (!(messageStore2 instanceof DiskBackedMessageStore)) {
                throw new IllegalStateException("addMessages: Doesn't support class " + messageStore2.getClass());
            }
            ((DiskBackedMessageStore) messageStore2).addVertexMessages(i, messageStore.getVertexMessages(i));
        }
        checkMemory();
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public Iterable<M> getVertexMessages(I i) throws IOException {
        return hasMessagesForVertex(i) ? getMessageStore((DiskBackedMessageStoreByPartition<I, V, E, M>) i).getVertexMessages(i) : EmptyIterable.get();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public int getNumberOfMessages() {
        int i = 0;
        Iterator<FlushableMessageStore<I, M>> it = this.partitionMessageStores.values().iterator();
        while (it.hasNext()) {
            i += it.next().getNumberOfMessages();
        }
        return i;
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForVertex(I i) {
        return getMessageStore((DiskBackedMessageStoreByPartition<I, V, E, M>) i).hasMessagesForVertex(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<I> getDestinationVertices() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<FlushableMessageStore<I, M>> it = this.partitionMessageStores.values().iterator();
        while (it.hasNext()) {
            Iterables.addAll(newArrayList, it.next().getDestinationVertices());
        }
        return newArrayList;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public Iterable<I> getPartitionDestinationVertices(int i) {
        FlushableMessageStore<I, M> flushableMessageStore = this.partitionMessageStores.get(Integer.valueOf(i));
        return flushableMessageStore == null ? Collections.emptyList() : flushableMessageStore.getDestinationVertices();
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void clearVertexMessages(I i) throws IOException {
        if (hasMessagesForVertex(i)) {
            getMessageStore((DiskBackedMessageStoreByPartition<I, V, E, M>) i).clearVertexMessages(i);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void clearPartition(int i) throws IOException {
        FlushableMessageStore<I, M> flushableMessageStore = this.partitionMessageStores.get(Integer.valueOf(i));
        if (flushableMessageStore != null) {
            flushableMessageStore.clearAll();
        }
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void clearAll() throws IOException {
        Iterator<FlushableMessageStore<I, M>> it = this.partitionMessageStores.values().iterator();
        while (it.hasNext()) {
            it.next().clearAll();
        }
        this.partitionMessageStores.clear();
    }

    private void checkMemory() throws IOException {
        while (memoryFull()) {
            flushOnePartition();
        }
    }

    private boolean memoryFull() {
        int i = 0;
        Iterator<FlushableMessageStore<I, M>> it = this.partitionMessageStores.values().iterator();
        while (it.hasNext()) {
            i += it.next().getNumberOfMessages();
        }
        return i > this.maxNumberOfMessagesInMemory;
    }

    private void flushOnePartition() throws IOException {
        int i = 0;
        FlushableMessageStore<I, M> flushableMessageStore = null;
        for (FlushableMessageStore<I, M> flushableMessageStore2 : this.partitionMessageStores.values()) {
            int numberOfMessages = flushableMessageStore2.getNumberOfMessages();
            if (numberOfMessages > i) {
                i = numberOfMessages;
                flushableMessageStore = flushableMessageStore2;
            }
        }
        if (flushableMessageStore != null) {
            flushableMessageStore.flush();
        }
    }

    private FlushableMessageStore<I, M> getMessageStore(I i) {
        return getMessageStore(this.service.getVertexPartitionOwner(i).getPartitionId());
    }

    private FlushableMessageStore<I, M> getMessageStore(int i) {
        FlushableMessageStore<I, M> flushableMessageStore = this.partitionMessageStores.get(Integer.valueOf(i));
        if (flushableMessageStore != null) {
            return flushableMessageStore;
        }
        FlushableMessageStore<I, M> newStore = this.fileStoreFactory.newStore();
        FlushableMessageStore<I, M> putIfAbsent = this.partitionMessageStores.putIfAbsent(Integer.valueOf(i), newStore);
        return putIfAbsent == null ? newStore : putIfAbsent;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void writePartition(DataOutput dataOutput, int i) throws IOException {
        FlushableMessageStore<I, M> flushableMessageStore = this.partitionMessageStores.get(Integer.valueOf(i));
        dataOutput.writeBoolean(flushableMessageStore != null);
        if (flushableMessageStore != null) {
            flushableMessageStore.write(dataOutput);
        }
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.partitionMessageStores.size());
        for (Map.Entry<Integer, FlushableMessageStore<I, M>> entry : this.partitionMessageStores.entrySet()) {
            dataOutput.writeInt(entry.getKey().intValue());
            entry.getValue().write(dataOutput);
        }
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreByPartition
    public void readFieldsForPartition(DataInput dataInput, int i) throws IOException {
        if (dataInput.readBoolean()) {
            FlushableMessageStore<I, M> newStore = this.fileStoreFactory.newStore();
            newStore.readFields(dataInput);
            this.partitionMessageStores.put(Integer.valueOf(i), newStore);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            int readInt2 = dataInput.readInt();
            FlushableMessageStore<I, M> newStore = this.fileStoreFactory.newStore();
            newStore.readFields(dataInput);
            this.partitionMessageStores.put(Integer.valueOf(readInt2), newStore);
        }
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(CentralizedServiceWorker<I, V, E, M> centralizedServiceWorker, int i, MessageStoreFactory<I, M, FlushableMessageStore<I, M>> messageStoreFactory) {
        return new Factory(centralizedServiceWorker, i, messageStoreFactory);
    }
}
