package org.apache.giraph.comm.netty;

import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.util.PercentGauge;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.SendEdgeCache;
import org.apache.giraph.comm.SendMessageCache;
import org.apache.giraph.comm.SendMutationsCache;
import org.apache.giraph.comm.SendPartitionCache;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
import org.apache.giraph.comm.requests.SendWorkerEdgesRequest;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.class */
public class NettyWorkerClientRequestProcessor<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements WorkerClientRequestProcessor<I, V, E, M> {
    private static final Logger LOG = Logger.getLogger(NettyWorkerClientRequestProcessor.class);
    private final SendPartitionCache<I, V, E, M> sendPartitionCache;
    private final SendMessageCache<I, M> sendMessageCache;
    private final SendEdgeCache<I, E> sendEdgeCache;
    private final WorkerClient<I, V, E, M> workerClient;
    private final int maxMessagesSizePerWorker;
    private final int maxEdgesSizePerWorker;
    private final int maxMutationsPerPartition;
    private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
    private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
    private final ServerData<I, V, E, M> serverData;
    private final Counter localRequests;
    private final Counter remoteRequests;
    private final SendMutationsCache<I, V, E, M> sendMutationsCache = new SendMutationsCache<>();
    private long totalMsgsSentInSuperstep = 0;

    public NettyWorkerClientRequestProcessor(Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E, M> immutableClassesGiraphConfiguration, CentralizedServiceWorker<I, V, E, M> centralizedServiceWorker) {
        this.workerClient = centralizedServiceWorker.getWorkerClient();
        this.configuration = immutableClassesGiraphConfiguration;
        this.sendPartitionCache = new SendPartitionCache<>(context, immutableClassesGiraphConfiguration);
        this.sendMessageCache = new SendMessageCache<>(immutableClassesGiraphConfiguration, centralizedServiceWorker);
        this.sendEdgeCache = new SendEdgeCache<>(immutableClassesGiraphConfiguration, centralizedServiceWorker);
        this.maxMessagesSizePerWorker = GiraphConstants.MAX_MSG_REQUEST_SIZE.get(immutableClassesGiraphConfiguration);
        this.maxEdgesSizePerWorker = GiraphConstants.MAX_EDGE_REQUEST_SIZE.get(immutableClassesGiraphConfiguration);
        this.maxMutationsPerPartition = GiraphConstants.MAX_MUTATIONS_PER_REQUEST.get(immutableClassesGiraphConfiguration);
        this.serviceWorker = centralizedServiceWorker;
        this.serverData = centralizedServiceWorker.getServerData();
        SuperstepMetricsRegistry perSuperstep = GiraphMetrics.get().perSuperstep();
        this.localRequests = perSuperstep.getCounter(MetricNames.LOCAL_REQUESTS);
        this.remoteRequests = perSuperstep.getCounter(MetricNames.REMOTE_REQUESTS);
        final Gauge gauge = perSuperstep.getGauge(MetricNames.TOTAL_REQUESTS, new Gauge<Long>() { // from class: org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.1
            /* renamed from: value, reason: merged with bridge method [inline-methods] */
            public Long m31value() {
                return Long.valueOf(NettyWorkerClientRequestProcessor.this.localRequests.count() + NettyWorkerClientRequestProcessor.this.remoteRequests.count());
            }
        });
        perSuperstep.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() { // from class: org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.2
            protected double getNumerator() {
                return NettyWorkerClientRequestProcessor.this.localRequests.count();
            }

            protected double getDenominator() {
                return ((Long) gauge.value()).longValue();
            }
        });
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public boolean sendMessageRequest(I i, M m) {
        PartitionOwner vertexPartitionOwner = this.serviceWorker.getVertexPartitionOwner(i);
        WorkerInfo workerInfo = vertexPartitionOwner.getWorkerInfo();
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("sendMessageRequest: Send bytes (" + m.toString() + ") to " + i + " on worker " + workerInfo);
        }
        this.totalMsgsSentInSuperstep++;
        if (this.sendMessageCache.addMessage(workerInfo, partitionId, i, m) < this.maxMessagesSizePerWorker) {
            return false;
        }
        doRequest(workerInfo, new SendWorkerMessagesRequest(this.sendMessageCache.removeWorkerMessages(workerInfo)));
        return true;
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void sendPartitionRequest(WorkerInfo workerInfo, Partition<I, V, E, M> partition) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("sendVertexRequest: Sending to " + workerInfo + ", with partition " + partition);
        }
        doRequest(workerInfo, new SendVertexRequest(partition));
        if (this.serviceWorker.getSuperstep() != -1) {
            sendPartitionMessages(workerInfo, partition);
        }
    }

    private void sendPartitionMessages(WorkerInfo workerInfo, Partition<I, V, E, M> partition) {
        int id = partition.getId();
        MessageStoreByPartition<I, M> currentMessageStore = this.serverData.getCurrentMessageStore();
        ByteArrayVertexIdMessages byteArrayVertexIdMessages = new ByteArrayVertexIdMessages();
        byteArrayVertexIdMessages.setConf(this.configuration);
        byteArrayVertexIdMessages.initialize();
        for (I i : currentMessageStore.getPartitionDestinationVertices(id)) {
            try {
                Iterator<M> it = currentMessageStore.getVertexMessages(i).iterator();
                while (it.hasNext()) {
                    byteArrayVertexIdMessages.add(i, it.next());
                }
                if (byteArrayVertexIdMessages.getSize() > this.maxMessagesSizePerWorker) {
                    doRequest(workerInfo, new SendPartitionCurrentMessagesRequest(id, byteArrayVertexIdMessages));
                    byteArrayVertexIdMessages = new ByteArrayVertexIdMessages();
                    byteArrayVertexIdMessages.setConf(this.configuration);
                    byteArrayVertexIdMessages.initialize();
                }
            } catch (IOException e) {
                throw new IllegalStateException("sendVertexRequest: Got IOException ", e);
            }
        }
        if (byteArrayVertexIdMessages.isEmpty()) {
            return;
        }
        doRequest(workerInfo, new SendPartitionCurrentMessagesRequest(id, byteArrayVertexIdMessages));
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void sendVertexRequest(PartitionOwner partitionOwner, Vertex<I, V, E, M> vertex) {
        Partition<I, V, E, M> addVertex = this.sendPartitionCache.addVertex(partitionOwner, vertex);
        if (addVertex == null) {
            return;
        }
        sendPartitionRequest(partitionOwner.getWorkerInfo(), addVertex);
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void addEdgeRequest(I i, Edge<I, E> edge) throws IOException {
        PartitionOwner vertexPartitionOwner = this.serviceWorker.getVertexPartitionOwner(i);
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " + i + " with partition " + partitionId);
        }
        sendMutationsRequestIfFull(partitionId, vertexPartitionOwner, this.sendMutationsCache.addEdgeMutation(Integer.valueOf(partitionId), i, edge));
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public boolean sendEdgeRequest(I i, Edge<I, E> edge) throws IOException {
        PartitionOwner vertexPartitionOwner = this.serviceWorker.getVertexPartitionOwner(i);
        WorkerInfo workerInfo = vertexPartitionOwner.getWorkerInfo();
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("sendEdgeRequest: Send bytes (" + edge.toString() + ") to " + i + " on worker " + workerInfo);
        }
        if (this.sendEdgeCache.addEdge(workerInfo, partitionId, i, edge) < this.maxEdgesSizePerWorker) {
            return false;
        }
        doRequest(workerInfo, new SendWorkerEdgesRequest(this.sendEdgeCache.removeWorkerEdges(workerInfo)));
        return true;
    }

    private void sendMutationsRequestIfFull(int i, PartitionOwner partitionOwner, int i2) {
        if (i2 >= this.maxMutationsPerPartition) {
            doRequest(partitionOwner.getWorkerInfo(), new SendPartitionMutationsRequest(i, this.sendMutationsCache.removePartitionMutations(i)));
        }
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void removeEdgesRequest(I i, I i2) throws IOException {
        PartitionOwner vertexPartitionOwner = this.serviceWorker.getVertexPartitionOwner(i);
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("removeEdgesRequest: Removing edge " + i2 + " for index " + i + " with partition " + partitionId);
        }
        sendMutationsRequestIfFull(partitionId, vertexPartitionOwner, this.sendMutationsCache.removeEdgeMutation(Integer.valueOf(partitionId), i, i2));
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
        PartitionOwner vertexPartitionOwner = this.serviceWorker.getVertexPartitionOwner(vertex.getId());
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("addVertexRequest: Sending vertex " + vertex + " to partition " + partitionId);
        }
        sendMutationsRequestIfFull(partitionId, vertexPartitionOwner, this.sendMutationsCache.addVertexMutation(Integer.valueOf(partitionId), vertex));
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void removeVertexRequest(I i) throws IOException {
        PartitionOwner vertexPartitionOwner = this.serviceWorker.getVertexPartitionOwner(i);
        int partitionId = vertexPartitionOwner.getPartitionId();
        if (LOG.isTraceEnabled()) {
            LOG.trace("removeVertexRequest: Removing vertex index " + i + " from partition " + partitionId);
        }
        sendMutationsRequestIfFull(partitionId, vertexPartitionOwner, this.sendMutationsCache.removeVertexMutation(Integer.valueOf(partitionId), i));
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public void flush() throws IOException {
        for (Map.Entry<PartitionOwner, Partition<I, V, E, M>> entry : this.sendPartitionCache.getOwnerPartitionMap().entrySet()) {
            sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue());
        }
        this.sendPartitionCache.clear();
        PairList<WorkerInfo, PairList<Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator iterator = this.sendMessageCache.removeAllMessages().getIterator();
        while (iterator.hasNext()) {
            iterator.next();
            doRequest(iterator.getCurrentFirst(), new SendWorkerMessagesRequest(iterator.getCurrentSecond()));
        }
        PairList<WorkerInfo, PairList<Integer, ByteArrayVertexIdEdges<I, E>>>.Iterator iterator2 = this.sendEdgeCache.removeAllEdges().getIterator();
        while (iterator2.hasNext()) {
            iterator2.next();
            doRequest(iterator2.getCurrentFirst(), new SendWorkerEdgesRequest(iterator2.getCurrentSecond()));
        }
        for (Map.Entry<Integer, Map<I, VertexMutations<I, V, E, M>>> entry2 : this.sendMutationsCache.removeAllPartitionMutations().entrySet()) {
            doRequest(this.serviceWorker.getVertexPartitionOwner(entry2.getValue().keySet().iterator().next()).getWorkerInfo(), new SendPartitionMutationsRequest(entry2.getKey().intValue(), entry2.getValue()));
        }
    }

    @Override // org.apache.giraph.comm.WorkerClientRequestProcessor
    public long resetMessageCount() {
        long j = this.totalMsgsSentInSuperstep;
        this.totalMsgsSentInSuperstep = 0L;
        return j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void doRequest(WorkerInfo workerInfo, WritableRequest writableRequest) {
        if (this.serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId()) {
            ((WorkerRequest) writableRequest).doRequest(this.serverData);
            this.localRequests.inc();
        } else {
            this.workerClient.sendWritableRequest(Integer.valueOf(workerInfo.getTaskId()), writableRequest);
            this.remoteRequests.inc();
        }
    }
}
