package org.apache.giraph.worker;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;

/* loaded from: input_file:org/apache/giraph/worker/InputSplitsHandler.class */
public class InputSplitsHandler implements Watcher {
    private static final Logger LOG = Logger.getLogger(InputSplitsHandler.class);
    private final List<String> pathList;
    private final AtomicInteger currentIndex = new AtomicInteger(0);
    private final ZooKeeperExt zooKeeper;
    private final Mapper<?, ?, ?, ?>.Context context;
    private final String inputSplitReservedNode;
    private final String inputSplitFinishedNode;

    public InputSplitsHandler(InputSplitPathOrganizer inputSplitPathOrganizer, ZooKeeperExt zooKeeperExt, Mapper<?, ?, ?, ?>.Context context, String str, String str2) {
        this.pathList = Lists.newArrayList(inputSplitPathOrganizer.getPathList());
        this.zooKeeper = zooKeeperExt;
        this.context = context;
        this.inputSplitReservedNode = str;
        this.inputSplitFinishedNode = str2;
    }

    public String reserveInputSplit() throws KeeperException, InterruptedException {
        String str;
        while (true) {
            int andIncrement = this.currentIndex.getAndIncrement();
            if (andIncrement >= this.pathList.size()) {
                return null;
            }
            str = this.pathList.get(andIncrement);
            this.context.progress();
            String str2 = str + this.inputSplitReservedNode;
            if (this.zooKeeper.exists(str2, this) == null) {
                try {
                    this.zooKeeper.createExt(str2, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, false);
                    if (!LOG.isInfoEnabled()) {
                        break;
                    }
                    LOG.info("reserveInputSplit: Reserved input split path " + str + ", overall roughly " + ((andIncrement * 100.0f) / this.pathList.size()) + "% input splits reserved");
                    break;
                } catch (KeeperException.NodeExistsException e) {
                    LOG.info("reserveInputSplit: Couldn't reserve (already reserved) inputSplit at " + str2);
                } catch (KeeperException e2) {
                    throw new IllegalStateException("reserveInputSplit: KeeperException on reserve", e2);
                } catch (InterruptedException e3) {
                    throw new IllegalStateException("reserveInputSplit: InterruptedException on reserve", e3);
                }
            }
        }
        return str;
    }

    public void markInputSplitPathFinished(String str) {
        String str2 = str + this.inputSplitFinishedNode;
        try {
            this.zooKeeper.createExt(str2, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
        } catch (InterruptedException e) {
            throw new IllegalStateException("markInputSplitPathFinished: InterruptedException on " + str2, e);
        } catch (KeeperException e2) {
            throw new IllegalStateException("markInputSplitPathFinished: KeeperException on " + str2, e2);
        } catch (KeeperException.NodeExistsException e3) {
            LOG.warn("markInputSplitPathFinished: " + str2 + " already exists!");
        }
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getPath() == null) {
            LOG.warn("process: Problem with zookeeper, got event with path null, state " + watchedEvent.getState() + ", event type " + watchedEvent.getType());
            return;
        }
        if (watchedEvent.getPath().endsWith(this.inputSplitReservedNode) && watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
            synchronized (this.pathList) {
                String path = watchedEvent.getPath();
                String substring = path.substring(0, path.indexOf(this.inputSplitReservedNode));
                this.pathList.add(substring);
                if (LOG.isInfoEnabled()) {
                    LOG.info("process: Input split " + substring + " lost reservation");
                }
            }
        }
    }
}
