package org.apache.cassandra.utils.flow;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.cassandra.concurrent.StagedScheduler;
import org.apache.cassandra.concurrent.TPCTaskType;
import org.apache.cassandra.concurrent.TPCTimeoutTask;
import org.apache.cassandra.utils.time.ApolloTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/utils/flow/DeferredFlowImpl.class */
class DeferredFlowImpl<T> extends DeferredFlow<T> implements FlowSubscriptionRecipient {
    private static final Logger logger;
    private final AtomicReference<Flow<T>> source;
    private final long deadlineNanos;
    private final Supplier<Flow<T>> timeoutSupplier;
    private final Supplier<StagedScheduler> schedulerSupplier;
    private final Supplier<Consumer<Flow<T>>> notification;
    private volatile FlowSubscriber<T> subscriber;
    private volatile FlowSubscriptionRecipient subscriptionRecipient;
    private volatile FlowSubscription subscription;
    private volatile TPCTimeoutTask<DeferredFlow<T>> timeoutTask;
    private final AtomicBoolean subscribed = new AtomicBoolean(false);
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/utils/flow/DeferredFlowImpl$TimeoutAction.class */
    public static class TimeoutAction<T> implements Consumer<DeferredFlow<T>> {
        private final Supplier<Flow<T>> timeoutSupplier;
        private final Supplier<StagedScheduler> schedulerSupplier;

        public TimeoutAction(Supplier<Flow<T>> supplier, Supplier<StagedScheduler> supplier2) {
            this.timeoutSupplier = supplier;
            this.schedulerSupplier = supplier2;
        }

        @Override // java.util.function.Consumer
        public void accept(DeferredFlow<T> deferredFlow) {
            if (deferredFlow.hasSource()) {
                return;
            }
            deferredFlow.onSource(this.timeoutSupplier.get().lift(Threads.requestOn(this.schedulerSupplier.get(), TPCTaskType.READ_TIMEOUT)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeferredFlowImpl(long j, Supplier<StagedScheduler> supplier, Supplier<Flow<T>> supplier2, Supplier<Consumer<Flow<T>>> supplier3) {
        if (!$assertionsDisabled && supplier == null) {
            throw new AssertionError();
        }
        this.source = new AtomicReference<>(null);
        this.deadlineNanos = j;
        this.timeoutSupplier = supplier2;
        this.schedulerSupplier = supplier;
        this.notification = supplier3;
    }

    @Override // org.apache.cassandra.utils.flow.Flow
    public void requestFirst(FlowSubscriber<T> flowSubscriber, FlowSubscriptionRecipient flowSubscriptionRecipient) {
        if (!$assertionsDisabled && this.subscriber != null) {
            throw new AssertionError("Only one subscriber is supported");
        }
        this.subscriptionRecipient = flowSubscriptionRecipient;
        this.subscriber = flowSubscriber;
        if (this.source.get() == null) {
            startTimeoutTask();
        }
        maybeSubscribe();
    }

    @Override // org.apache.cassandra.utils.flow.FlowSubscriptionRecipient
    public void onSubscribe(FlowSubscription flowSubscription) {
        this.subscription = flowSubscription;
    }

    @Override // org.apache.cassandra.utils.flow.DeferredFlow
    public boolean onSource(Flow<T> flow) {
        if (!this.source.compareAndSet(null, flow)) {
            return false;
        }
        if (logger.isTraceEnabled()) {
            logger.trace("{} - got source", Integer.valueOf(hashCode()));
        }
        Consumer<Flow<T>> consumer = this.notification != null ? this.notification.get() : null;
        if (consumer != null) {
            try {
                consumer.accept(flow);
            } catch (Throwable th) {
                logger.warn("onSource notification error: " + th.getMessage(), th);
            }
        }
        maybeSubscribe();
        return true;
    }

    private void startTimeoutTask() {
        if (!$assertionsDisabled && this.timeoutTask != null) {
            throw new AssertionError("timeout task already running!");
        }
        long approximateNanoTime = this.deadlineNanos - ApolloTime.approximateNanoTime();
        if (approximateNanoTime <= 0) {
            onSource(this.timeoutSupplier.get());
            return;
        }
        this.timeoutTask = new TPCTimeoutTask<>(this);
        this.timeoutTask.submit(new TimeoutAction(this.timeoutSupplier, this.schedulerSupplier), approximateNanoTime, TimeUnit.NANOSECONDS);
        if (hasSource()) {
            this.timeoutTask.dispose();
        }
    }

    private void disposeTimeoutTask() {
        TPCTimeoutTask<DeferredFlow<T>> tPCTimeoutTask = this.timeoutTask;
        if (tPCTimeoutTask != null) {
            tPCTimeoutTask.dispose();
        }
    }

    @Override // org.apache.cassandra.utils.flow.DeferredFlow
    public boolean hasSource() {
        return this.source.get() != null;
    }

    private void maybeSubscribe() {
        if (logger.isTraceEnabled()) {
            logger.trace("{} - maybeSubscribe {}/{}", Integer.valueOf(hashCode()), this.source, this.subscription);
        }
        if (this.subscriber == null || this.source.get() == null || !this.subscribed.compareAndSet(false, true)) {
            return;
        }
        disposeTimeoutTask();
        this.source.get().requestFirst(this.subscriber, this.subscriptionRecipient);
    }

    static {
        $assertionsDisabled = !DeferredFlowImpl.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) DeferredFlowImpl.class);
    }
}
