package alluxio.master.journal;

import alluxio.annotation.SuppressFBWarnings;
import alluxio.collections.ConcurrentHashSet;
import alluxio.concurrent.ForkJoinPoolHelper;
import alluxio.concurrent.jsr.ForkJoinPool;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.JournalClosedException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.master.journal.sink.JournalSink;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.util.logging.SamplingLogger;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
@SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED"})
/* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter.class */
public final class AsyncJournalWriter {
    private static final Logger SAMPLING_LOG = new SamplingLogger(LoggerFactory.getLogger(AsyncJournalWriter.class), AbstractJournalProgressLogger.MAX_LOG_INTERVAL_MS);
    private final JournalWriter mJournalWriter;
    private final ConcurrentLinkedQueue<Journal.JournalEntry> mQueue;
    private final AtomicLong mCounter;
    private final AtomicLong mFlushCounter;
    private long mWriteCounter;
    private final long mFlushBatchTimeNs;
    private final Set<FlushTicket> mTicketSet;
    private String mJournalName;
    private Thread mFlushThread;
    private final Semaphore mFlushSemaphore;
    private volatile boolean mStopFlushing;
    private final Supplier<Set<JournalSink>> mJournalSinks;

    /* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter$FlushTicket.class */
    private static class FlushTicket implements ForkJoinPool.ManagedBlocker {
        private final long mTargetCounter;
        private final SettableFuture<Void> mIsCompleted = SettableFuture.create();
        private Throwable mError = null;

        public FlushTicket(long j) {
            this.mTargetCounter = j;
        }

        public long getTargetCounter() {
            return this.mTargetCounter;
        }

        public void setCompleted() {
            this.mIsCompleted.set((Object) null);
        }

        public void setError(Throwable th) {
            this.mIsCompleted.setException(th);
            this.mError = th;
        }

        public void waitCompleted() throws Throwable {
            ForkJoinPoolHelper.safeManagedBlock(this);
            if (this.mError != null) {
                throw this.mError;
            }
        }

        public boolean block() throws InterruptedException {
            try {
                this.mIsCompleted.get();
                return true;
            } catch (ExecutionException e) {
                this.mError = e.getCause();
                return true;
            }
        }

        public boolean isReleasable() {
            return this.mIsCompleted.isDone() || this.mIsCompleted.isCancelled();
        }
    }

    @ThreadSafe
    /* loaded from: input_file:alluxio/master/journal/AsyncJournalWriter$Metrics.class */
    private static final class Metrics {
        private static final Counter JOURNAL_FLUSH_FAILURE = MetricsSystem.counter(MetricKey.MASTER_JOURNAL_FLUSH_FAILURE.getName());

        private Metrics() {
        }
    }

    public AsyncJournalWriter(JournalWriter journalWriter, Supplier<Set<JournalSink>> supplier) {
        this.mTicketSet = new ConcurrentHashSet();
        this.mJournalName = "Raft";
        this.mFlushThread = new Thread(this::doFlush, "AsyncJournalWriterThread-" + this.mJournalName);
        this.mFlushSemaphore = new Semaphore(0, true);
        this.mStopFlushing = false;
        this.mJournalWriter = (JournalWriter) Preconditions.checkNotNull(journalWriter, "journalWriter");
        this.mQueue = new ConcurrentLinkedQueue<>();
        this.mCounter = new AtomicLong(0L);
        this.mFlushCounter = new AtomicLong(0L);
        this.mWriteCounter = 0L;
        this.mFlushBatchTimeNs = TimeUnit.NANOSECONDS.convert(Configuration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_BATCH_TIME_MS), TimeUnit.MILLISECONDS);
        this.mJournalSinks = supplier;
        this.mFlushThread.start();
    }

    public AsyncJournalWriter(JournalWriter journalWriter, Supplier<Set<JournalSink>> supplier, String str) {
        this(journalWriter, supplier);
        this.mJournalName = str;
    }

    public long appendEntry(Journal.JournalEntry journalEntry) {
        this.mCounter.incrementAndGet();
        this.mQueue.offer(journalEntry);
        return this.mCounter.get();
    }

    public void close() {
        stop();
    }

    @VisibleForTesting
    void stop() {
        this.mStopFlushing = true;
        this.mFlushSemaphore.release();
        try {
            this.mFlushThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.mFlushThread = null;
            this.mFlushSemaphore.tryAcquire();
        }
    }

    @VisibleForTesting
    void start() {
        if (this.mFlushThread != null) {
            close();
        }
        this.mFlushThread = new Thread(this::doFlush, "AsyncJournalWriterThread-" + this.mJournalName);
        this.mStopFlushing = false;
        this.mFlushThread.start();
    }

    private void doFlush() {
        Journal.JournalEntry peek;
        while (!this.mStopFlushing) {
            while (this.mQueue.isEmpty() && !this.mStopFlushing) {
                try {
                    if (this.mFlushSemaphore.tryAcquire(this.mFlushBatchTimeNs, TimeUnit.NANOSECONDS)) {
                        break;
                    }
                } catch (InterruptedException e) {
                }
            }
            try {
                long nanoTime = System.nanoTime();
                while (!this.mQueue.isEmpty() && (peek = this.mQueue.peek()) != null) {
                    this.mJournalWriter.write(peek);
                    JournalUtils.sinkAppend(this.mJournalSinks, peek);
                    this.mQueue.poll();
                    this.mWriteCounter++;
                    if (System.nanoTime() - nanoTime >= this.mFlushBatchTimeNs && !this.mStopFlushing) {
                        break;
                    }
                }
                if (this.mFlushCounter.get() < this.mWriteCounter) {
                    Timer.Context time = MetricsSystem.timer(MetricKey.MASTER_JOURNAL_FLUSH_TIMER.getName()).time();
                    Throwable th = null;
                    try {
                        try {
                            this.mJournalWriter.flush();
                            if (time != null) {
                                if (0 != 0) {
                                    try {
                                        time.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    time.close();
                                }
                            }
                            JournalUtils.sinkFlush(this.mJournalSinks);
                            this.mFlushCounter.set(this.mWriteCounter);
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                            break;
                        }
                    } finally {
                    }
                }
                Iterator<FlushTicket> it = this.mTicketSet.iterator();
                while (it.hasNext()) {
                    FlushTicket next = it.next();
                    if (next.getTargetCounter() <= this.mFlushCounter.get()) {
                        next.setCompleted();
                        it.remove();
                    }
                }
            } catch (IOException | JournalClosedException e2) {
                SAMPLING_LOG.warn("Failed to flush journal entry: " + e2.getMessage(), e2);
                Metrics.JOURNAL_FLUSH_FAILURE.inc();
                Iterator<FlushTicket> it2 = this.mTicketSet.iterator();
                while (it2.hasNext()) {
                    FlushTicket next2 = it2.next();
                    it2.remove();
                    if (next2.getTargetCounter() <= this.mFlushCounter.get()) {
                        next2.setCompleted();
                    } else {
                        next2.setError(e2);
                    }
                }
            }
        }
    }

    public void flush(long j) throws IOException, JournalClosedException {
        if (j <= this.mFlushCounter.get()) {
            return;
        }
        FlushTicket flushTicket = new FlushTicket(j);
        this.mTicketSet.add(flushTicket);
        try {
            try {
                try {
                    this.mFlushSemaphore.release();
                    flushTicket.waitCompleted();
                    this.mFlushSemaphore.tryAcquire();
                } catch (Throwable th) {
                    if (th instanceof IOException) {
                        throw ((IOException) th);
                    }
                    if (!(th instanceof JournalClosedException)) {
                        throw new AlluxioStatusException(Status.INTERNAL.withCause(th));
                    }
                    throw th;
                }
            } catch (InterruptedException e) {
                throw new AlluxioStatusException(Status.CANCELLED.withCause(e));
            }
        } catch (Throwable th2) {
            this.mFlushSemaphore.tryAcquire();
            throw th2;
        }
    }
}
