package org.apache.giraph.comm.messages;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:org/apache/giraph/comm/messages/DiskBackedMessageStore.class */
public class DiskBackedMessageStore<I extends WritableComparable, M extends Writable> implements FlushableMessageStore<I, M> {
    private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
    private final MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
    private volatile ConcurrentNavigableMap<I, ExtendedDataOutput> inMemoryMessages = new ConcurrentSkipListMap();
    private final AtomicInteger numberOfMessagesInMemory = new AtomicInteger(0);
    private final Set<I> destinationVertices = Collections.newSetFromMap(Maps.newConcurrentMap());
    private final Collection<BasicMessageStore<I, M>> fileStores = Lists.newArrayList();

    /* loaded from: input_file:org/apache/giraph/comm/messages/DiskBackedMessageStore$Factory.class */
    private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, FlushableMessageStore<I, M>> {
        private final ImmutableClassesGiraphConfiguration config;
        private final MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;

        public Factory(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, MessageStoreFactory<I, M, BasicMessageStore<I, M>> messageStoreFactory) {
            this.config = immutableClassesGiraphConfiguration;
            this.fileStoreFactory = messageStoreFactory;
        }

        @Override // org.apache.giraph.comm.messages.MessageStoreFactory
        public FlushableMessageStore<I, M> newStore() {
            return new DiskBackedMessageStore(this.config, this.fileStoreFactory);
        }
    }

    /* loaded from: input_file:org/apache/giraph/comm/messages/DiskBackedMessageStore$MessageIterable.class */
    private class MessageIterable extends RepresentativeByteArrayIterable<M> {
        public MessageIterable(byte[] bArr, int i, int i2) {
            super(DiskBackedMessageStore.this.config, bArr, i, i2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.giraph.utils.ByteArrayIterable
        public M createWritable() {
            return (M) DiskBackedMessageStore.this.config.createMessageValue();
        }
    }

    /* loaded from: input_file:org/apache/giraph/comm/messages/DiskBackedMessageStore$TemporaryMessageStore.class */
    private class TemporaryMessageStore implements MessageStore<I, M> {
        private final ConcurrentNavigableMap<I, ExtendedDataOutput> temporaryMessages;

        private TemporaryMessageStore(ConcurrentNavigableMap<I, ExtendedDataOutput> concurrentNavigableMap) {
            this.temporaryMessages = concurrentNavigableMap;
        }

        @Override // org.apache.giraph.comm.messages.MessageStore
        public int getNumberOfMessages() {
            throw new IllegalAccessError("getNumberOfMessages: Not supported");
        }

        @Override // org.apache.giraph.comm.messages.MessageStore
        public boolean hasMessagesForVertex(I i) {
            return this.temporaryMessages.containsKey(i);
        }

        @Override // org.apache.giraph.comm.messages.MessageStore
        public Iterable<I> getDestinationVertices() {
            return this.temporaryMessages.keySet();
        }

        @Override // org.apache.giraph.comm.messages.BasicMessageStore
        public void addMessages(MessageStore<I, M> messageStore) throws IOException {
            throw new IllegalAccessError("addMessages: Not supported");
        }

        @Override // org.apache.giraph.comm.messages.BasicMessageStore
        public Iterable<M> getVertexMessages(I i) throws IOException {
            ExtendedDataOutput extendedDataOutput = (ExtendedDataOutput) this.temporaryMessages.get(i);
            if (extendedDataOutput == null) {
                extendedDataOutput = DiskBackedMessageStore.this.config.createExtendedDataOutput();
            }
            return new MessageIterable(extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
        }

        @Override // org.apache.giraph.comm.messages.BasicMessageStore
        public void clearVertexMessages(I i) throws IOException {
            this.temporaryMessages.remove(i);
        }

        @Override // org.apache.giraph.comm.messages.BasicMessageStore
        public void clearAll() throws IOException {
            this.temporaryMessages.clear();
        }

        public void write(DataOutput dataOutput) throws IOException {
            throw new IllegalAccessError("write: Not supported");
        }

        public void readFields(DataInput dataInput) throws IOException {
            throw new IllegalAccessError("readFields: Not supported");
        }
    }

    public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration, MessageStoreFactory<I, M, BasicMessageStore<I, M>> messageStoreFactory) {
        this.config = immutableClassesGiraphConfiguration;
        this.fileStoreFactory = messageStoreFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean addVertexMessages(I i, Iterable<M> iterable) throws IOException {
        boolean z = false;
        this.destinationVertices.add(i);
        this.rwLock.readLock().lock();
        try {
            ExtendedDataOutput extendedDataOutput = (ExtendedDataOutput) this.inMemoryMessages.get(i);
            if (extendedDataOutput == null) {
                ExtendedDataOutput createExtendedDataOutput = this.config.createExtendedDataOutput();
                extendedDataOutput = this.inMemoryMessages.putIfAbsent(i, createExtendedDataOutput);
                if (extendedDataOutput == null) {
                    z = true;
                    extendedDataOutput = createExtendedDataOutput;
                }
            }
            synchronized (extendedDataOutput) {
                Iterator<M> it = iterable.iterator();
                while (it.hasNext()) {
                    it.next().write(extendedDataOutput);
                    this.numberOfMessagesInMemory.getAndIncrement();
                }
            }
            return z;
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void addMessages(MessageStore<I, M> messageStore) throws IOException {
        for (I i : messageStore.getDestinationVertices()) {
            addVertexMessages(i, messageStore.getVertexMessages(i));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v16, types: [java.lang.Iterable] */
    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public Iterable<M> getVertexMessages(I i) throws IOException {
        ExtendedDataOutput extendedDataOutput = (ExtendedDataOutput) this.inMemoryMessages.get(i);
        if (extendedDataOutput == null) {
            extendedDataOutput = this.config.createExtendedDataOutput();
        }
        MessageIterable messageIterable = new MessageIterable(extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
        Iterator<BasicMessageStore<I, M>> it = this.fileStores.iterator();
        while (it.hasNext()) {
            messageIterable = Iterables.concat(messageIterable, it.next().getVertexMessages(i));
        }
        return messageIterable;
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public int getNumberOfMessages() {
        return this.numberOfMessagesInMemory.get();
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public boolean hasMessagesForVertex(I i) {
        return this.destinationVertices.contains(i);
    }

    @Override // org.apache.giraph.comm.messages.MessageStore
    public Iterable<I> getDestinationVertices() {
        return this.destinationVertices;
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void clearVertexMessages(I i) throws IOException {
        this.inMemoryMessages.remove(i);
    }

    @Override // org.apache.giraph.comm.messages.BasicMessageStore
    public void clearAll() throws IOException {
        this.inMemoryMessages.clear();
        this.destinationVertices.clear();
        Iterator<BasicMessageStore<I, M>> it = this.fileStores.iterator();
        while (it.hasNext()) {
            it.next().clearAll();
        }
        this.fileStores.clear();
    }

    @Override // org.apache.giraph.comm.messages.FlushableMessageStore
    public void flush() throws IOException {
        this.rwLock.writeLock().lock();
        try {
            ConcurrentNavigableMap<I, ExtendedDataOutput> concurrentNavigableMap = this.inMemoryMessages;
            this.inMemoryMessages = new ConcurrentSkipListMap();
            this.numberOfMessagesInMemory.set(0);
            this.rwLock.writeLock().unlock();
            BasicMessageStore<I, M> newStore = this.fileStoreFactory.newStore();
            newStore.addMessages(new TemporaryMessageStore(concurrentNavigableMap));
            synchronized (this.fileStores) {
                this.fileStores.add(newStore);
            }
        } catch (Throwable th) {
            this.rwLock.writeLock().unlock();
            throw th;
        }
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.destinationVertices.size());
        Iterator<I> it = this.destinationVertices.iterator();
        while (it.hasNext()) {
            it.next().write(dataOutput);
        }
        dataOutput.writeInt(this.numberOfMessagesInMemory.get());
        dataOutput.writeInt(this.inMemoryMessages.size());
        for (Map.Entry entry : this.inMemoryMessages.entrySet()) {
            ((WritableComparable) entry.getKey()).write(dataOutput);
            dataOutput.writeInt(((ExtendedDataOutput) entry.getValue()).getPos());
            dataOutput.write(((ExtendedDataOutput) entry.getValue()).getByteArray(), 0, ((ExtendedDataOutput) entry.getValue()).getPos());
        }
        dataOutput.writeInt(this.fileStores.size());
        Iterator<BasicMessageStore<I, M>> it2 = this.fileStores.iterator();
        while (it2.hasNext()) {
            it2.next().write(dataOutput);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            I createVertexId = this.config.createVertexId();
            createVertexId.readFields(dataInput);
            this.destinationVertices.add(createVertexId);
        }
        this.numberOfMessagesInMemory.set(dataInput.readInt());
        int readInt2 = dataInput.readInt();
        for (int i2 = 0; i2 < readInt2; i2++) {
            I createVertexId2 = this.config.createVertexId();
            createVertexId2.readFields(dataInput);
            int readInt3 = dataInput.readInt();
            this.inMemoryMessages.put(createVertexId2, this.config.createExtendedDataOutput(new byte[readInt3], readInt3));
        }
        int readInt4 = dataInput.readInt();
        for (int i3 = 0; i3 < readInt4; i3++) {
            BasicMessageStore<I, M> newStore = this.fileStoreFactory.newStore();
            newStore.readFields(dataInput);
            this.fileStores.add(newStore);
        }
    }

    public static <I extends WritableComparable, M extends Writable> MessageStoreFactory<I, M, FlushableMessageStore<I, M>> newFactory(ImmutableClassesGiraphConfiguration<I, ?, ?, M> immutableClassesGiraphConfiguration, MessageStoreFactory<I, M, BasicMessageStore<I, M>> messageStoreFactory) {
        return new Factory(immutableClassesGiraphConfiguration, messageStoreFactory);
    }
}
