package org.apache.hadoop.hdfs;

import com.google.common.collect.Iterators;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
/* loaded from: input_file:lib/hadoop-hdfs-2.6.4.jar:org/apache/hadoop/hdfs/DFSInotifyEventInputStream.class */
public class DFSInotifyEventInputStream {
    public static Logger LOG;
    private final ClientProtocol namenode;
    private Iterator<EventBatch> it;
    private long lastReadTxid;
    private long syncTxid;
    private Random rng;
    private static final int INITIAL_WAIT_MS = 10;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DFSInotifyEventInputStream(ClientProtocol clientProtocol) throws IOException {
        this(clientProtocol, clientProtocol.getCurrentEditLogTxid());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DFSInotifyEventInputStream(ClientProtocol clientProtocol, long j) throws IOException {
        this.rng = new Random();
        this.namenode = clientProtocol;
        this.it = Iterators.emptyIterator();
        this.lastReadTxid = j;
    }

    public EventBatch poll() throws IOException, MissingEventsException {
        if (this.lastReadTxid == -1) {
            LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
            this.lastReadTxid = this.namenode.getCurrentEditLogTxid();
            return null;
        }
        if (!this.it.hasNext()) {
            EventBatchList editsFromTxid = this.namenode.getEditsFromTxid(this.lastReadTxid + 1);
            if (editsFromTxid.getLastTxid() == -1) {
                LOG.debug("poll(): read no edits from the NN when requesting edits after txid {}", Long.valueOf(this.lastReadTxid));
                return null;
            }
            this.syncTxid = editsFromTxid.getSyncTxid();
            this.it = editsFromTxid.getBatches().iterator();
            long j = this.lastReadTxid;
            this.lastReadTxid = editsFromTxid.getLastTxid();
            if (editsFromTxid.getFirstTxid() != j + 1) {
                throw new MissingEventsException(j + 1, editsFromTxid.getFirstTxid());
            }
        }
        if (this.it.hasNext()) {
            return this.it.next();
        }
        return null;
    }

    public long getTxidsBehindEstimate() {
        if (this.syncTxid == 0) {
            return -1L;
        }
        if ($assertionsDisabled || this.syncTxid >= this.lastReadTxid) {
            return this.syncTxid - this.lastReadTxid;
        }
        throw new AssertionError();
    }

    public EventBatch poll(long j, TimeUnit timeUnit) throws IOException, InterruptedException, MissingEventsException {
        EventBatch poll;
        long monotonicNow = Time.monotonicNow();
        long convert = TimeUnit.MILLISECONDS.convert(j, timeUnit);
        long j2 = 10;
        while (true) {
            poll = poll();
            if (poll != null) {
                break;
            }
            long monotonicNow2 = convert - (Time.monotonicNow() - monotonicNow);
            if (monotonicNow2 <= 0) {
                LOG.debug("timed poll(): timed out");
                break;
            }
            j2 = monotonicNow2 < j2 * 2 ? monotonicNow2 : j2 * 2;
            LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", Long.valueOf(j2));
            Thread.sleep(j2);
        }
        return poll;
    }

    public EventBatch take() throws IOException, InterruptedException, MissingEventsException {
        int i = 10;
        while (true) {
            int i2 = i;
            EventBatch poll = poll();
            if (poll != null) {
                return poll;
            }
            int nextInt = i2 + this.rng.nextInt(i2);
            LOG.debug("take(): poll() returned null, sleeping for {} ms", Integer.valueOf(nextInt));
            Thread.sleep(nextInt);
            i = Math.min(60000, i2 * 2);
        }
    }

    static {
        $assertionsDisabled = !DFSInotifyEventInputStream.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream.class);
    }
}
