package com.datastax.dse.driver.internal.core.cql.reactive;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:java-driver-core-4.15.0.jar:com/datastax/dse/driver/internal/core/cql/reactive/SimpleUnicastProcessor.class */
public class SimpleUnicastProcessor<ElementT> implements Processor<ElementT, ElementT>, Subscription {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SimpleUnicastProcessor.class);
    private static final Object ON_COMPLETE = new Object();
    private final Queue<Object> queue = new ConcurrentLinkedDeque();
    private final AtomicBoolean once = new AtomicBoolean(false);
    private final AtomicInteger draining = new AtomicInteger(0);
    private final AtomicLong requested = new AtomicLong(0);
    private volatile Subscriber<? super ElementT> subscriber;
    private volatile boolean cancelled;

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super ElementT> subscriber) {
        Objects.requireNonNull(subscriber, "Subscriber cannot be null");
        if (!this.once.compareAndSet(false, true)) {
            subscriber.onSubscribe(EmptySubscription.INSTANCE);
            subscriber.onError(new IllegalStateException("This publisher does not support multiple subscriptions"));
            return;
        }
        this.subscriber = subscriber;
        try {
            subscriber.onSubscribe(this);
        } catch (Throwable th) {
            doOnError(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", th));
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(ElementT elementt) {
        if (this.cancelled) {
            return;
        }
        this.queue.offer(elementt);
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (this.cancelled) {
            return;
        }
        this.queue.offer(th);
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.cancelled) {
            return;
        }
        this.queue.offer(ON_COMPLETE);
        drain();
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        if (this.cancelled) {
            return;
        }
        if (j < 1) {
            doOnError(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
        } else {
            ReactiveOperators.addCap(this.requested, j);
            drain();
        }
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        if (this.cancelled) {
            return;
        }
        this.cancelled = true;
        if (this.draining.getAndIncrement() == 0) {
            clear();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:26:0x0080, code lost:
    
        if (r5.cancelled == false) goto L29;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x008c, code lost:
    
        if (r9 == 0) goto L32;
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x008f, code lost:
    
        com.datastax.dse.driver.internal.core.cql.reactive.ReactiveOperators.subCap(r5.requested, r9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x0098, code lost:
    
        r6 = r5.draining.addAndGet(-r6);
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x0083, code lost:
    
        clear();
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x0087, code lost:
    
        return;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void drain() {
        /*
            r5 = this;
            r0 = r5
            java.util.concurrent.atomic.AtomicInteger r0 = r0.draining
            int r0 = r0.getAndIncrement()
            if (r0 == 0) goto Lb
            return
        Lb:
            r0 = 1
            r6 = r0
        Ld:
            r0 = r5
            java.util.concurrent.atomic.AtomicLong r0 = r0.requested
            long r0 = r0.get()
            r7 = r0
            r0 = 0
            r9 = r0
        L18:
            r0 = r7
            r1 = r9
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 == 0) goto L7c
            r0 = r5
            boolean r0 = r0.cancelled
            if (r0 == 0) goto L2b
            r0 = r5
            r0.clear()
            return
        L2b:
            r0 = r5
            java.util.Queue<java.lang.Object> r0 = r0.queue
            java.lang.Object r0 = r0.poll()
            r11 = r0
            r0 = r11
            if (r0 != 0) goto L3e
            goto L7c
        L3e:
            r0 = r11
            boolean r0 = r0 instanceof java.lang.Throwable
            if (r0 == 0) goto L58
            r0 = r11
            java.lang.Throwable r0 = (java.lang.Throwable) r0
            r12 = r0
            r0 = r5
            r1 = r12
            r0.doOnError(r1)
            r0 = r5
            r0.clear()
            return
        L58:
            r0 = r11
            java.lang.Object r1 = com.datastax.dse.driver.internal.core.cql.reactive.SimpleUnicastProcessor.ON_COMPLETE
            if (r0 != r1) goto L69
            r0 = r5
            r0.doOnComplete()
            r0 = r5
            r0.clear()
            return
        L69:
            r0 = r11
            r12 = r0
            r0 = r5
            r1 = r12
            r0.doOnNext(r1)
            r0 = r9
            r1 = 1
            long r0 = r0 + r1
            r9 = r0
            goto L18
        L7c:
            r0 = r5
            boolean r0 = r0.cancelled
            if (r0 == 0) goto L88
            r0 = r5
            r0.clear()
            return
        L88:
            r0 = r9
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 == 0) goto L98
            r0 = r5
            java.util.concurrent.atomic.AtomicLong r0 = r0.requested
            r1 = r9
            com.datastax.dse.driver.internal.core.cql.reactive.ReactiveOperators.subCap(r0, r1)
        L98:
            r0 = r5
            java.util.concurrent.atomic.AtomicInteger r0 = r0.draining
            r1 = r6
            int r1 = -r1
            int r0 = r0.addAndGet(r1)
            r6 = r0
            r0 = r6
            if (r0 != 0) goto La9
            goto Lac
        La9:
            goto Ld
        Lac:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.datastax.dse.driver.internal.core.cql.reactive.SimpleUnicastProcessor.drain():void");
    }

    private void doOnNext(@NonNull ElementT elementt) {
        try {
            this.subscriber.onNext(elementt);
        } catch (Throwable th) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext.", th);
            cancel();
        }
    }

    private void doOnComplete() {
        try {
            this.subscriber.onComplete();
        } catch (Throwable th) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", th);
        }
        cancel();
    }

    private void doOnError(@NonNull Throwable th) {
        try {
            this.subscriber.onError(th);
        } catch (Throwable th2) {
            th2.addSuppressed(th);
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", th2);
        }
        cancel();
    }

    private void clear() {
        this.queue.clear();
        this.subscriber = null;
    }
}
