package org.apache.giraph.comm;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

@NotThreadSafe
/* loaded from: input_file:org/apache/giraph/comm/SendOneMessageToManyCache.class */
public class SendOneMessageToManyCache<I extends WritableComparable, M extends Writable> extends SendMessageCache<I, M> {
    private static final Logger LOG = Logger.getLogger(SendOneMessageToManyCache.class);
    private final ByteArrayOneMessageToManyIds<I, M>[] msgVidsCache;
    private final int[] msgVidsSizes;
    private final ExtendedDataOutput[] idSerializer;
    private final int[] idCounter;
    private final int[] firstPartitionMap;
    private final WorkerInfo[] workerInfoList;

    public SendOneMessageToManyCache(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, CentralizedServiceWorker<?, ?, ?> centralizedServiceWorker, NettyWorkerClientRequestProcessor<I, ?, ?> nettyWorkerClientRequestProcessor, int i) {
        super(immutableClassesGiraphConfiguration, centralizedServiceWorker, nettyWorkerClientRequestProcessor, i);
        int numWorkers = getNumWorkers();
        this.msgVidsCache = new ByteArrayOneMessageToManyIds[numWorkers];
        this.msgVidsSizes = new int[numWorkers];
        this.idSerializer = new ExtendedDataOutput[numWorkers];
        for (int i2 = 0; i2 < this.idSerializer.length; i2++) {
            int sendWorkerInitialBufferSize = getSendWorkerInitialBufferSize(i2);
            if (sendWorkerInitialBufferSize > 0) {
                this.idSerializer[i2] = immutableClassesGiraphConfiguration.createExtendedDataOutput(sendWorkerInitialBufferSize);
            }
        }
        this.idCounter = new int[numWorkers];
        this.firstPartitionMap = new int[numWorkers];
        this.workerInfoList = new WorkerInfo[numWorkers];
        for (WorkerInfo workerInfo : centralizedServiceWorker.getWorkerInfoList()) {
            this.workerInfoList[workerInfo.getTaskId()] = workerInfo;
        }
    }

    private void resetIdSerializers() {
        for (int i = 0; i < this.idSerializer.length; i++) {
            if (this.idSerializer[i] != null) {
                this.idSerializer[i].reset();
            }
        }
    }

    private void resetIdCounter() {
        Arrays.fill(this.idCounter, 0);
    }

    private int addOneToManyMessage(WorkerInfo workerInfo, byte[] bArr, int i, int i2, M m) {
        ByteArrayOneMessageToManyIds<I, M> byteArrayOneMessageToManyIds = this.msgVidsCache[workerInfo.getTaskId()];
        if (byteArrayOneMessageToManyIds == null) {
            byteArrayOneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(getConf().createOutgoingMessageValueFactory());
            byteArrayOneMessageToManyIds.setConf(getConf());
            byteArrayOneMessageToManyIds.initialize(getSendWorkerInitialBufferSize(workerInfo.getTaskId()));
            this.msgVidsCache[workerInfo.getTaskId()] = byteArrayOneMessageToManyIds;
        }
        byteArrayOneMessageToManyIds.add(bArr, i, i2, m);
        this.msgVidsSizes[workerInfo.getTaskId()] = byteArrayOneMessageToManyIds.getSize();
        return this.msgVidsSizes[workerInfo.getTaskId()];
    }

    private ByteArrayOneMessageToManyIds<I, M> removeWorkerMsgVids(WorkerInfo workerInfo) {
        ByteArrayOneMessageToManyIds<I, M> byteArrayOneMessageToManyIds = this.msgVidsCache[workerInfo.getTaskId()];
        if (byteArrayOneMessageToManyIds != null) {
            this.msgVidsCache[workerInfo.getTaskId()] = null;
            this.msgVidsSizes[workerInfo.getTaskId()] = 0;
        }
        return byteArrayOneMessageToManyIds;
    }

    private PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>> removeAllMsgVids() {
        PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>> pairList = new PairList<>();
        pairList.initialize(this.msgVidsCache.length);
        for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) {
            ByteArrayOneMessageToManyIds<I, M> removeWorkerMsgVids = removeWorkerMsgVids(workerInfo);
            if (removeWorkerMsgVids != null && !removeWorkerMsgVids.isEmpty()) {
                pairList.add(workerInfo, removeWorkerMsgVids);
            }
        }
        return pairList;
    }

    @Override // org.apache.giraph.comm.SendMessageCache
    public void sendMessageToAllRequest(Iterator<I> it, M m) {
        resetIdSerializers();
        resetIdCounter();
        while (it.hasNext()) {
            I next = it.next();
            PartitionOwner vertexPartitionOwner = getServiceWorker().getVertexPartitionOwner(next);
            int taskId = vertexPartitionOwner.getWorkerInfo().getTaskId();
            try {
                next.write(this.idSerializer[taskId]);
                int[] iArr = this.idCounter;
                iArr[taskId] = iArr[taskId] + 1;
                if (this.idCounter[taskId] == 1) {
                    this.firstPartitionMap[taskId] = vertexPartitionOwner.getPartitionId();
                }
            } catch (IOException e) {
                throw new IllegalStateException("Failed to serialize the target vertex id.");
            }
        }
        for (int i = 0; i < this.idCounter.length; i++) {
            if (this.idCounter[i] == 1) {
                int addMessage = addMessage(this.workerInfoList[i], this.firstPartitionMap[i], this.idSerializer[i].getByteArray(), this.idSerializer[i].getPos(), m);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("sendMessageToAllRequest: Send bytes (" + m.toString() + ") to one target in  worker " + this.workerInfoList[i]);
                }
                this.totalMsgsSentInSuperstep++;
                if (addMessage >= this.maxMessagesSizePerWorker) {
                    SendWorkerMessagesRequest sendWorkerMessagesRequest = new SendWorkerMessagesRequest(removeWorkerMessages(this.workerInfoList[i]));
                    this.totalMsgBytesSentInSuperstep += sendWorkerMessagesRequest.getSerializedSize();
                    this.clientProcessor.doRequest(this.workerInfoList[i], sendWorkerMessagesRequest);
                    getServiceWorker().getGraphTaskManager().notifySentMessages();
                }
            } else if (this.idCounter[i] > 1) {
                int addOneToManyMessage = addOneToManyMessage(this.workerInfoList[i], this.idSerializer[i].getByteArray(), this.idSerializer[i].getPos(), this.idCounter[i], m);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("sendMessageToAllRequest: Send bytes (" + m.toString() + ") to all targets in worker" + this.workerInfoList[i]);
                }
                this.totalMsgsSentInSuperstep += this.idCounter[i];
                if (addOneToManyMessage >= this.maxMessagesSizePerWorker) {
                    SendWorkerOneMessageToManyRequest sendWorkerOneMessageToManyRequest = new SendWorkerOneMessageToManyRequest(removeWorkerMsgVids(this.workerInfoList[i]), getConf());
                    this.totalMsgBytesSentInSuperstep += sendWorkerOneMessageToManyRequest.getSerializedSize();
                    this.clientProcessor.doRequest(this.workerInfoList[i], sendWorkerOneMessageToManyRequest);
                    getServiceWorker().getGraphTaskManager().notifySentMessages();
                }
            }
        }
    }

    @Override // org.apache.giraph.comm.SendMessageCache
    public void flush() {
        super.flush();
        PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>.Iterator iterator = removeAllMsgVids().getIterator();
        while (iterator.hasNext()) {
            iterator.next();
            SendWorkerOneMessageToManyRequest sendWorkerOneMessageToManyRequest = new SendWorkerOneMessageToManyRequest(iterator.getCurrentSecond(), getConf());
            this.totalMsgBytesSentInSuperstep += sendWorkerOneMessageToManyRequest.getSerializedSize();
            this.clientProcessor.doRequest(iterator.getCurrentFirst(), sendWorkerOneMessageToManyRequest);
        }
    }
}
