package reactor.rx.action.error;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/rx/action/error/RetryWhenAction.class */
public class RetryWhenAction<T> extends Action<T, T> {
    private final Broadcaster<Throwable> retryStream;
    private final Publisher<? extends T> rootPublisher;
    private Dispatcher dispatcher;

    /* loaded from: input_file:reactor/rx/action/error/RetryWhenAction$RestartSubscriber.class */
    private class RestartSubscriber implements Subscriber<Object>, NonBlocking {
        Subscription s;

        private RestartSubscriber() {
        }

        public boolean isReactivePull(Dispatcher dispatcher, long j) {
            return RetryWhenAction.this.isReactivePull(dispatcher, j);
        }

        public long getCapacity() {
            return RetryWhenAction.this.capacity;
        }

        public void onSubscribe(Subscription subscription) {
            this.s = subscription;
            subscription.request(1L);
        }

        public void onNext(Object obj) {
            RetryWhenAction.this.doRetry();
            if (this.s != null) {
                this.s.request(1L);
            }
        }

        public void onError(Throwable th) {
            if (this.s != null) {
                this.s.cancel();
            }
            RetryWhenAction.this.onError(th);
        }

        public void onComplete() {
            RetryWhenAction.this.onComplete();
        }
    }

    public RetryWhenAction(Dispatcher dispatcher, Function<? super Stream<? extends Throwable>, ? extends Publisher<?>> function, Publisher<? extends T> publisher) {
        this.retryStream = Broadcaster.create(null, dispatcher);
        if (SynchronousDispatcher.INSTANCE == dispatcher) {
            this.dispatcher = Environment.tailRecurse();
        } else {
            this.dispatcher = dispatcher;
        }
        this.rootPublisher = publisher;
        ((Publisher) function.apply(this.retryStream)).subscribe(new RestartSubscriber());
    }

    @Override // reactor.rx.action.Action
    protected void doNext(T t) {
        broadcastNext(t);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doComplete() {
        this.retryStream.onComplete();
        super.doComplete();
    }

    @Override // reactor.rx.Stream
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected void doRetry() {
        this.dispatcher.dispatch((Object) null, new Consumer<Void>() { // from class: reactor.rx.action.error.RetryWhenAction.1
            public void accept(Void r7) {
                long j = Long.MAX_VALUE;
                if (RetryWhenAction.this.rootPublisher != null) {
                    PushSubscription pushSubscription = RetryWhenAction.this.upstreamSubscription;
                    if (pushSubscription == null) {
                        RetryWhenAction.this.rootPublisher.subscribe(RetryWhenAction.this);
                        pushSubscription = RetryWhenAction.this.upstreamSubscription;
                    } else {
                        j = pushSubscription.pendingRequestSignals();
                        if (TailRecurseDispatcher.class.isAssignableFrom(RetryWhenAction.this.dispatcher.getClass())) {
                            RetryWhenAction.this.dispatcher.shutdown();
                            RetryWhenAction.this.dispatcher = Environment.tailRecurse();
                        }
                    }
                    if (pushSubscription != null) {
                        pushSubscription.request(j != Long.MAX_VALUE ? j + 1 : j);
                    }
                }
            }
        }, (Consumer) null);
    }

    @Override // reactor.rx.action.Action
    public void onError(Throwable th) {
        cancel();
        this.retryStream.onNext(th);
    }

    public Broadcaster<Throwable> retryStream() {
        return this.retryStream;
    }
}
