package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.kafka.companion.RecordsSubscriber;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber.class */
public class RecordsSubscriber<T, SELF extends RecordsSubscriber<T, SELF>> implements MultiSubscriber<T>, Iterable<T> {
    public static Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10);
    private final CountDownLatch terminal;
    private final AtomicReference<Subscription> subscription;
    private final AtomicLong requested;
    private final List<T> records;
    private final AtomicLong received;
    private final AtomicReference<T> lastReceived;
    private final AtomicReference<T> firstReceived;
    private final AtomicReference<Throwable> failure;
    private final AtomicBoolean completed;
    private boolean cancelled;
    private final List<EventListener> eventListeners;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber$Event.class */
    public static class Event {
        private final Long recordCount;
        private final Throwable failure;
        private final boolean completion;
        private final boolean cancellation;

        private Event(Long l, Throwable th, boolean z, boolean z2) {
            this.recordCount = l;
            this.failure = th;
            this.completion = z;
            this.cancellation = z2;
        }

        public Long recordCount() {
            return this.recordCount;
        }

        public boolean isRecord() {
            return this.recordCount != null;
        }

        public boolean isCancellation() {
            return this.cancellation;
        }

        public boolean isFailure() {
            return this.failure != null;
        }

        public boolean isCompletion() {
            return this.completion;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber$EventListener.class */
    public interface EventListener extends Consumer<Event> {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/RecordsSubscriber$RecordTask.class */
    public class RecordTask {
        private final long targetCount;
        private final SELF subscriber;
        private final long startCount;

        public RecordTask(long j, SELF self) {
            this.targetCount = j;
            this.subscriber = self;
            this.startCount = self.count();
        }

        public CompletableFuture<Void> future() {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            if (this.startCount >= this.targetCount) {
                completableFuture.complete(null);
                return completableFuture;
            }
            EventListener eventListener = event -> {
                if (event.recordCount() != null && event.recordCount.longValue() >= this.targetCount) {
                    completableFuture.complete(null);
                } else if (event.isCancellation() || event.isFailure() || event.isCompletion()) {
                    completableFuture.completeExceptionally(new NoSuchElementException("Received a terminal event while waiting for records"));
                }
            };
            this.subscriber.registerListener(eventListener);
            return completableFuture.whenComplete((r5, th) -> {
                this.subscriber.unregisterListener(eventListener);
            });
        }
    }

    public RecordsSubscriber(long j) {
        this.terminal = new CountDownLatch(1);
        this.subscription = new AtomicReference<>();
        this.requested = new AtomicLong();
        this.records = new CopyOnWriteArrayList();
        this.received = new AtomicLong();
        this.lastReceived = new AtomicReference<>();
        this.firstReceived = new AtomicReference<>();
        this.failure = new AtomicReference<>();
        this.completed = new AtomicBoolean();
        this.eventListeners = new CopyOnWriteArrayList();
        this.requested.set(j);
    }

    public RecordsSubscriber() {
        this(0L);
    }

    protected SELF self() {
        return this;
    }

    public long count() {
        return this.received.get();
    }

    public T getFirstRecord() {
        return this.firstReceived.get();
    }

    public T getLastRecord() {
        return this.lastReceived.get();
    }

    public List<T> getRecords() {
        return this.records;
    }

    public SELF awaitNextRecord() {
        return awaitNextRecords(1);
    }

    public SELF awaitNextRecord(Duration duration) {
        return awaitNextRecords(1, 1, duration);
    }

    public SELF awaitNextRecords(int i) {
        return awaitNextRecords(i, DEFAULT_TIMEOUT);
    }

    public SELF awaitNextRecords(int i, int i2) {
        return awaitNextRecords(i, i2, DEFAULT_TIMEOUT);
    }

    public SELF awaitNextRecords(int i, Duration duration) {
        return awaitNextRecords(i, i, duration);
    }

    public SELF awaitNextRecords(int i, int i2, Duration duration) {
        if (!hasCompleted() && getFailure() == null) {
            awaitNextRecordEvents(i, i2, duration);
            return self();
        }
        if (hasCompleted()) {
            throw new AssertionError("Expecting a next records, but a completion event has already being received");
        }
        throw new AssertionError("Expecting a next records, but a failure event has already being received: " + getFailure());
    }

    public SELF awaitRecords(int i) {
        return awaitRecords(i, DEFAULT_TIMEOUT);
    }

    public SELF awaitRecords(int i, Duration duration) {
        long count = count();
        if (count > i) {
            throw new AssertionError("Expected the number of records to be " + i + ", but it's already " + count);
        }
        if (!isCancelled() && !hasCompleted() && getFailure() == null) {
            awaitRecordEvents(i, duration);
            return self();
        }
        if (count != i) {
            throw new AssertionError("Expected the number of records to be " + i + ", but received " + count + " and we received a terminal event already");
        }
        return self();
    }

    public SELF awaitCompletion() {
        return awaitCompletion(DEFAULT_TIMEOUT);
    }

    public SELF awaitCompletion(Duration duration) {
        return awaitCompletion((th, bool) -> {
        }, duration);
    }

    public SELF awaitCompletion(BiConsumer<Throwable, Boolean> biConsumer) {
        return awaitCompletion(biConsumer, DEFAULT_TIMEOUT);
    }

    public SELF awaitCompletion(BiConsumer<Throwable, Boolean> biConsumer, Duration duration) {
        try {
            awaitEvent(this.terminal, duration);
            try {
                biConsumer.accept(this.failure.get(), Boolean.valueOf(this.cancelled));
                return self();
            } catch (AssertionError e) {
                throw new AssertionError("Received a failure or cancellation event, but did not pass the validation: " + e, e);
            }
        } catch (TimeoutException e2) {
            throw new AssertionError("No completion event received in the last " + duration.toMillis() + " ms");
        }
    }

    private void awaitEvent(CountDownLatch countDownLatch, Duration duration) throws TimeoutException {
        if (countDownLatch.getCount() == 0) {
            return;
        }
        try {
            if (countDownLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            } else {
                throw new TimeoutException();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void awaitNextRecordEvents(int i, int i2, Duration duration) {
        long count = count();
        RecordTask recordTask = new RecordTask(count + i, self());
        if (i2 > 0) {
            request(i2);
        }
        try {
            recordTask.future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            long count2 = count() - count;
            if (isCancelled()) {
                throw new AssertionError("Expected " + i + " records, but received a cancellation event while waiting. Only " + count2 + " record(s) have been received.");
            }
            if (!hasCompleted()) {
                throw new AssertionError("Expected " + i + " records, but received a failure event while waiting: " + getFailure() + ". Only " + count2 + " record(s) have been received.");
            }
            throw new AssertionError("Expected " + i + " records, but received a completion event while waiting. Only " + count2 + " record(s) have been received.");
        } catch (TimeoutException e3) {
            throw new AssertionError("Expected " + i + " records in " + duration.toMillis() + " ms, but only received " + (count() - count) + " records.");
        }
    }

    private void awaitRecordEvents(int i, Duration duration) {
        try {
            new RecordTask(i, self()).future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            long count = count();
            if (i == count) {
                return;
            }
            if (isCancelled()) {
                throw new AssertionError("Expected " + i + " records, but received a cancellation event while waiting. Only " + count + " records have been received.");
            }
            if (hasCompleted()) {
                throw new AssertionError("Expected " + i + " records, but received a completion event while waiting. Only " + count + " records have been received.");
            }
            if (getFailure() == null) {
                throw new AssertionError("Expected " + i + " records.  Only " + count + " records have been received.");
            }
            throw new AssertionError("Expected " + i + " records, but received a failure event while waiting: " + getFailure() + ". Only " + count + " records have been received.");
        } catch (TimeoutException e3) {
            long count2 = count();
            if (count2 < i) {
                throw new AssertionError("Expected " + i + " records.  Only " + count2 + " records have been received.");
            }
        }
    }

    public SELF cancel() {
        if (this.subscription.get() != null) {
            this.subscription.get().cancel();
        }
        this.cancelled = true;
        this.terminal.countDown();
        Event event = new Event(null, null, false, true);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
        return self();
    }

    private SELF request(long j) {
        this.requested.addAndGet(j);
        if (this.subscription.get() != null) {
            this.subscription.get().request(j);
        }
        return self();
    }

    public void onSubscribe(Subscription subscription) {
        if (!this.subscription.compareAndSet(null, subscription) || this.requested.get() <= 0) {
            return;
        }
        subscription.request(this.requested.get());
    }

    public synchronized void onItem(T t) {
        long incrementAndGet = this.received.incrementAndGet();
        if (incrementAndGet == 1) {
            this.firstReceived.set(t);
        }
        received(t);
        this.lastReceived.set(t);
        Event event = new Event(Long.valueOf(incrementAndGet), null, false, false);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
    }

    public void received(T t) {
        this.records.add(t);
    }

    public void onFailure(Throwable th) {
        this.failure.set(th);
        this.terminal.countDown();
        Event event = new Event(null, th, false, false);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
    }

    public void onCompletion() {
        this.completed.set(true);
        this.terminal.countDown();
        Event event = new Event(null, null, true, false);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
    }

    @Override // java.lang.Iterable
    public Iterator<T> iterator() {
        return getRecords().iterator();
    }

    @Override // java.lang.Iterable
    public Spliterator<T> spliterator() {
        return getRecords().spliterator();
    }

    public Throwable getFailure() {
        return this.failure.get();
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public boolean hasCompleted() {
        return this.completed.get();
    }

    public boolean hasSubscribed() {
        return this.subscription.get() != null;
    }

    protected void registerListener(EventListener eventListener) {
        this.eventListeners.add(eventListener);
    }

    protected void unregisterListener(EventListener eventListener) {
        this.eventListeners.remove(eventListener);
    }
}
