package org.apache.cassandra.concurrent;

import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.EmptyDisposable;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.cassandra.concurrent.IOThread;
import org.apache.cassandra.config.PropertyConfiguration;
import org.apache.cassandra.utils.time.ApolloTime;

/* loaded from: input_file:org/apache/cassandra/concurrent/IOScheduler.class */
public class IOScheduler extends StagedScheduler {

    @VisibleForTesting
    static final int MIN_POOL_SIZE = PropertyConfiguration.getInteger("dse.io.sched.min_pool_size", 8);

    @VisibleForTesting
    public static final int MAX_POOL_SIZE = PropertyConfiguration.getInteger("dse.io.sched.max_pool_size", 256);

    @VisibleForTesting
    static final int KEEP_ALIVE_TIME_SECS = PropertyConfiguration.getInteger("dse.io.sched.keep_alive_secs", 5);
    private final Function<ThreadFactory, ExecutorBasedWorker> workerSupplier;
    private final AtomicReference<WorkersPool> pool;
    private final AtomicBoolean shutdown;
    private final int keepAliveMillis;

    /* loaded from: input_file:org/apache/cassandra/concurrent/IOScheduler$PooledTaskWorker.class */
    static final class PooledTaskWorker implements Runnable {
        private final WorkersPool pool;
        private final ExecutorBasedWorker worker;
        private final Runnable task;

        PooledTaskWorker(WorkersPool workersPool, Runnable runnable) {
            if (workersPool == null) {
                throw new RejectedExecutionException("Task sent to a shut down scheduler.");
            }
            this.pool = workersPool;
            this.worker = workersPool.get();
            this.task = runnable;
        }

        public void execute() {
            this.worker.getExecutor().execute(this);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.task.run();
            } finally {
                this.pool.release(this.worker);
            }
        }
    }

    /* loaded from: input_file:org/apache/cassandra/concurrent/IOScheduler$PooledWorker.class */
    static class PooledWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final WorkersPool pool;
        private final ExecutorBasedWorker worker;
        private final AtomicBoolean disposed;

        PooledWorker(WorkersPool workersPool) {
            if (workersPool == null) {
                throw new RejectedExecutionException("Task sent to a shut down scheduler.");
            }
            this.tasks = new CompositeDisposable();
            this.pool = workersPool;
            this.worker = workersPool.get();
            this.disposed = new AtomicBoolean(this.worker == ExecutorBasedWorker.DISPOSED);
        }

        @Override // io.reactivex.disposables.Disposable
        public void dispose() {
            if (this.disposed.compareAndSet(false, true)) {
                this.tasks.dispose();
                this.pool.release(this.worker);
            }
        }

        @Override // io.reactivex.disposables.Disposable
        public boolean isDisposed() {
            return this.disposed.get();
        }

        @Override // io.reactivex.Scheduler.Worker
        public Disposable schedule(Runnable runnable, long j, TimeUnit timeUnit) {
            return this.disposed.get() ? EmptyDisposable.INSTANCE : this.worker.scheduleActual(runnable, j, timeUnit, this.tasks);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/concurrent/IOScheduler$WorkersPool.class */
    public static final class WorkersPool {
        private final Function<ThreadFactory, ExecutorBasedWorker> workerSupplier;
        private final ScheduledFuture<?> killTimer;
        private final ThreadFactory threadFactory = new IOThread.Factory();
        private final Deque<ExecutorBasedWorker> workersQueue = new ConcurrentLinkedDeque();
        private final AtomicInteger workersQueueSize = new AtomicInteger(0);
        private final CompositeDisposable allWorkers = new CompositeDisposable();
        private final AtomicBoolean shutdown = new AtomicBoolean(false);

        WorkersPool(Function<ThreadFactory, ExecutorBasedWorker> function, int i) {
            this.workerSupplier = function;
            this.killTimer = startKillTimer(i);
        }

        int cachedSize() {
            return this.workersQueueSize.get();
        }

        ExecutorBasedWorker get() {
            if (this.shutdown.get() || this.allWorkers.isDisposed()) {
                return ExecutorBasedWorker.DISPOSED;
            }
            ExecutorBasedWorker pollFirst = this.workersQueue.pollFirst();
            if (pollFirst != null) {
                this.workersQueueSize.decrementAndGet();
                return pollFirst;
            }
            ExecutorBasedWorker apply = this.workerSupplier.apply(this.threadFactory);
            this.allWorkers.add(apply);
            return apply;
        }

        private ScheduledFuture<?> startKillTimer(int i) {
            if (IOScheduler.MAX_POOL_SIZE <= IOScheduler.MIN_POOL_SIZE) {
                return null;
            }
            return ScheduledExecutors.scheduledFastTasks.scheduleAtFixedRate(() -> {
                if (this.workersQueueSize.get() <= IOScheduler.MIN_POOL_SIZE) {
                    return;
                }
                long millisSinceStartup = ApolloTime.millisSinceStartup();
                do {
                    ExecutorBasedWorker peekLast = this.workersQueue.peekLast();
                    if (peekLast == null || peekLast.unusedTime(millisSinceStartup) < i || !this.workersQueue.removeLastOccurrence(peekLast)) {
                        return;
                    }
                    this.allWorkers.delete(peekLast);
                    peekLast.dispose();
                } while (this.workersQueueSize.decrementAndGet() > IOScheduler.MIN_POOL_SIZE);
            }, i, i, TimeUnit.MILLISECONDS);
        }

        void release(ExecutorBasedWorker executorBasedWorker) {
            if (this.shutdown.get() || this.workersQueueSize.get() >= IOScheduler.MAX_POOL_SIZE) {
                this.allWorkers.delete(executorBasedWorker);
                executorBasedWorker.dispose();
            } else {
                this.workersQueueSize.incrementAndGet();
                executorBasedWorker.markUse(ApolloTime.millisSinceStartup());
                this.workersQueue.addFirst(executorBasedWorker);
            }
        }

        void shutdown() {
            if (this.shutdown.compareAndSet(false, true)) {
                if (this.killTimer != null) {
                    this.killTimer.cancel(false);
                }
                this.allWorkers.dispose();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IOScheduler() {
        this(threadFactory -> {
            return ExecutorBasedWorker.singleThreaded(threadFactory);
        }, KEEP_ALIVE_TIME_SECS * 1000);
    }

    @VisibleForTesting
    IOScheduler(Function<ThreadFactory, ExecutorBasedWorker> function, int i) {
        this.shutdown = new AtomicBoolean(false);
        this.workerSupplier = function;
        this.pool = new AtomicReference<>(new WorkersPool(function, i));
        this.keepAliveMillis = i;
    }

    @Override // org.apache.cassandra.concurrent.StagedScheduler
    public int metricsCoreId() {
        return TPCUtils.getNumCores();
    }

    @Override // org.apache.cassandra.concurrent.StagedScheduler
    public void enqueue(TPCRunnable tPCRunnable) {
        new PooledTaskWorker(this.pool.get(), tPCRunnable).execute();
    }

    @Override // org.apache.cassandra.concurrent.StagedScheduler, io.reactivex.Scheduler
    public Disposable scheduleDirect(Runnable runnable, long j, TimeUnit timeUnit) {
        return super.scheduleDirect(TPCRunnable.wrap(runnable, j != 0 ? TPCTaskType.TIMED_UNKNOWN : TPCTaskType.UNKNOWN, TPC.getNextCore()), j, timeUnit);
    }

    @Override // org.apache.cassandra.concurrent.StagedScheduler
    public Disposable schedule(Runnable runnable, TPCTaskType tPCTaskType, long j, TimeUnit timeUnit) {
        return super.scheduleDirect(TPCRunnable.wrap(runnable, tPCTaskType, TPC.getNextCore()), j, timeUnit);
    }

    @Override // org.apache.cassandra.concurrent.StagedScheduler
    public boolean isOnScheduler(Thread thread) {
        return thread instanceof IOThread;
    }

    @Override // io.reactivex.Scheduler
    public Scheduler.Worker createWorker() {
        return new PooledWorker(this.pool.get());
    }

    @Override // io.reactivex.Scheduler
    public void start() {
        while (this.pool.get() == null) {
            WorkersPool workersPool = new WorkersPool(this.workerSupplier, this.keepAliveMillis);
            if (this.pool.compareAndSet(null, workersPool)) {
                return;
            } else {
                workersPool.shutdown();
            }
        }
    }

    @Override // io.reactivex.Scheduler
    public void shutdown() {
        if (!this.shutdown.compareAndSet(false, true)) {
            return;
        }
        WorkersPool workersPool = this.pool.get();
        while (true) {
            WorkersPool workersPool2 = workersPool;
            if (workersPool2 == null) {
                return;
            }
            if (this.pool.compareAndSet(workersPool2, null)) {
                workersPool2.shutdown();
                return;
            }
            workersPool = this.pool.get();
        }
    }

    public boolean isShutdown() {
        return this.shutdown.get();
    }

    @VisibleForTesting
    public int numCachedWorkers() {
        WorkersPool workersPool = this.pool.get();
        if (workersPool == null) {
            return 0;
        }
        return workersPool.cachedSize();
    }

    public String toString() {
        return String.format("IO scheduler: cached workers: %d", Integer.valueOf(numCachedWorkers()));
    }
}
