package org.apache.cassandra.db.commitlog;

import io.reactivex.Completable;
import io.reactivex.subjects.BehaviorSubject;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StagedScheduler;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/db/commitlog/AbstractCommitLogService.class */
public abstract class AbstractCommitLogService {
    private Thread thread;
    protected volatile long lastSyncedAt;
    final CommitLog commitLog;
    private final String name;
    private final long pollIntervalNanos;
    final TimeSource timeSource;
    private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
    private volatile boolean shutdown = false;
    private final AtomicLong written = new AtomicLong(0);
    protected final AtomicLong pending = new AtomicLong(0);
    private BehaviorSubject<Long> syncTimePublisher = BehaviorSubject.createDefault(0L);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractCommitLogService(CommitLog commitLog, String str, long j, TimeSource timeSource) {
        this.commitLog = commitLog;
        this.name = str;
        this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(j, TimeUnit.MILLISECONDS);
        this.timeSource = timeSource;
        this.lastSyncedAt = timeSource.nanoTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.pollIntervalNanos < 1) {
            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms", Double.valueOf(this.pollIntervalNanos * 1.0E-6d)));
        }
        Runnable runnable = new Runnable() { // from class: org.apache.cassandra.db.commitlog.AbstractCommitLogService.1
            @Override // java.lang.Runnable
            public void run() {
                long nanoTime;
                long j;
                long j2 = 0;
                long j3 = 0;
                long j4 = 0;
                int i = 0;
                int i2 = 0;
                while (true) {
                    boolean z = AbstractCommitLogService.this.shutdown;
                    try {
                        long nanoTime2 = AbstractCommitLogService.this.timeSource.nanoTime();
                        AbstractCommitLogService.this.commitLog.sync();
                        AbstractCommitLogService.this.lastSyncedAt = nanoTime2;
                        AbstractCommitLogService.this.syncTimePublisher.onNext(Long.valueOf(nanoTime2));
                        nanoTime = AbstractCommitLogService.this.timeSource.nanoTime();
                        j = nanoTime2 + AbstractCommitLogService.this.pollIntervalNanos;
                        if (j - nanoTime < 0) {
                            if (i == 0) {
                                j2 = nanoTime;
                                i = 0;
                                i2 = 0;
                                long j5 = 0;
                                j4 = j5;
                                j3 = j5;
                            }
                            j4 += Math.abs(nanoTime - j);
                            i++;
                        }
                        i2++;
                        j3 += Math.abs(nanoTime - nanoTime2);
                        if (i != 0 && NoSpamLogger.log(AbstractCommitLogService.logger, NoSpamLogger.Level.WARN, 5L, TimeUnit.MINUTES, "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms", Integer.valueOf(i2), String.format("%.2f", Double.valueOf(Math.abs(nanoTime - j2) * 1.0E-9d)), String.format("%.2f", Double.valueOf((j3 * 1.0E-6d) / i2)), Integer.valueOf(i), String.format("%.2f", Double.valueOf((j4 * 1.0E-6d) / i)))) {
                            i = 0;
                        }
                    } catch (Throwable th) {
                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", th)) {
                            AbstractCommitLogService.this.syncTimePublisher.onError(th);
                            return;
                        }
                        LockSupport.parkNanos(AbstractCommitLogService.this.pollIntervalNanos);
                    }
                    if (z) {
                        return;
                    }
                    if (j - nanoTime > 0) {
                        LockSupport.parkNanos(Math.abs(j - nanoTime));
                    }
                }
            }
        };
        this.shutdown = false;
        this.thread = NamedThreadFactory.createThread(runnable, this.name);
        this.thread.setDaemon(true);
        this.thread.start();
    }

    public Completable finishWriteFor(CommitLogSegment.Allocation allocation, StagedScheduler stagedScheduler) {
        return maybeWaitForSync(allocation, stagedScheduler).doOnComplete(() -> {
            this.written.incrementAndGet();
        });
    }

    protected abstract Completable maybeWaitForSync(CommitLogSegment.Allocation allocation, StagedScheduler stagedScheduler);

    public void requestExtraSync() {
        LockSupport.unpark(this.thread);
    }

    public void shutdown() {
        this.shutdown = true;
        requestExtraSync();
    }

    public void syncBlocking() {
        long nanoTime = this.timeSource.nanoTime();
        requestExtraSync();
        awaitSyncAt(nanoTime).blockingGet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Completable awaitSyncAt(long j) {
        return this.syncTimePublisher.filter(l -> {
            return l.longValue() - j >= 0;
        }).first(0L).toCompletable();
    }

    public void awaitTermination() throws InterruptedException {
        this.thread.join();
    }

    public long getCompletedTasks() {
        return this.written.get();
    }

    public long getPendingTasks() {
        return this.pending.get();
    }
}
