package org.apache.giraph.worker;

import com.yammer.metrics.core.Meter;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MeterDesc;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/giraph/worker/InputSplitsCallable.class */
public abstract class InputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements Callable<VertexEdgeCount> {
    private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
    private static final Time TIME = SystemTime.get();
    protected final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
    protected final Mapper<?, ?, ?, ?>.Context context;
    private final GraphState<I, V, E, M> graphState;
    private final WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor;
    private final InputSplitsHandler splitsHandler;
    private final ZooKeeperExt zooKeeperExt;
    private final long startNanos = TIME.getNanoseconds();
    private final boolean useLocality;

    public InputSplitsCallable(Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState, ImmutableClassesGiraphConfiguration<I, V, E, M> immutableClassesGiraphConfiguration, BspServiceWorker<I, V, E, M> bspServiceWorker, InputSplitsHandler inputSplitsHandler, ZooKeeperExt zooKeeperExt) {
        this.zooKeeperExt = zooKeeperExt;
        this.context = context;
        this.workerClientRequestProcessor = new NettyWorkerClientRequestProcessor(context, immutableClassesGiraphConfiguration, bspServiceWorker);
        this.graphState = new GraphState<>(graphState.getSuperstep(), graphState.getTotalNumVertices(), graphState.getTotalNumEdges(), context, graphState.getGraphTaskManager(), this.workerClientRequestProcessor, null);
        this.useLocality = immutableClassesGiraphConfiguration.useInputSplitLocality();
        this.splitsHandler = inputSplitsHandler;
        this.configuration = immutableClassesGiraphConfiguration;
    }

    public static Meter getTotalEdgesLoadedMeter() {
        return GiraphMetrics.get().perJobRequired().getMeter(MeterDesc.EDGES_LOADED);
    }

    public static Meter getTotalVerticesLoadedMeter() {
        return GiraphMetrics.get().perJobRequired().getMeter(MeterDesc.VERTICES_LOADED);
    }

    protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit, GraphState<I, V, E, M> graphState) throws IOException, InterruptedException;

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public VertexEdgeCount call() {
        VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
        int i = 0;
        while (true) {
            try {
                String reserveInputSplit = this.splitsHandler.reserveInputSplit();
                if (reserveInputSplit == null) {
                    break;
                }
                vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(reserveInputSplit, this.graphState));
                this.context.progress();
                i++;
            } catch (KeeperException e) {
                throw new IllegalStateException("call: KeeperException", e);
            } catch (IOException e2) {
                throw new IllegalStateException("call: IOException", e2);
            } catch (ClassNotFoundException e3) {
                throw new IllegalStateException("call: ClassNotFoundException", e3);
            } catch (IllegalAccessException e4) {
                throw new IllegalStateException("call: IllegalAccessException", e4);
            } catch (InstantiationException e5) {
                throw new IllegalStateException("call: InstantiationException", e5);
            } catch (InterruptedException e6) {
                throw new IllegalStateException("call: InterruptedException", e6);
            }
        }
        if (LOG.isInfoEnabled()) {
            float nanosSince = ((float) Times.getNanosSince(TIME, this.startNanos)) / 1.0E9f;
            LOG.info("call: Loaded " + i + " input splits in " + nanosSince + " secs, " + vertexEdgeCount + " " + (((float) vertexEdgeCount.getVertexCount()) / nanosSince) + " vertices/sec, " + (((float) vertexEdgeCount.getEdgeCount()) / nanosSince) + " edges/sec");
        }
        try {
            this.workerClientRequestProcessor.flush();
            return vertexEdgeCount;
        } catch (IOException e7) {
            throw new IllegalStateException("call: Flushing failed.", e7);
        }
    }

    private VertexEdgeCount loadInputSplit(String str, GraphState<I, V, E, M> graphState) throws IOException, ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
        VertexEdgeCount readInputSplit = readInputSplit(getInputSplit(str), graphState);
        if (LOG.isInfoEnabled()) {
            LOG.info("loadFromInputSplit: Finished loading " + str + " " + readInputSplit);
        }
        this.splitsHandler.markInputSplitPathFinished(str);
        return readInputSplit;
    }

    protected InputSplit getInputSplit(String str) throws IOException, ClassNotFoundException {
        try {
            byte[] data = this.zooKeeperExt.getData(str, false, (Stat) null);
            this.context.progress();
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));
            if (this.useLocality) {
                Text.readString(dataInputStream);
            }
            Writable writable = (InputSplit) ReflectionUtils.newInstance(this.configuration.getClassByName(Text.readString(dataInputStream)), this.configuration);
            writable.readFields(dataInputStream);
            if (LOG.isInfoEnabled()) {
                LOG.info("getInputSplit: Reserved " + str + " from ZooKeeper and got input split '" + writable.toString() + "'");
            }
            return writable;
        } catch (KeeperException e) {
            throw new IllegalStateException("getInputSplit: KeeperException on " + str, e);
        } catch (InterruptedException e2) {
            throw new IllegalStateException("getInputSplit: IllegalStateException on " + str, e2);
        }
    }
}
