package io.vertx.core.eventbus;

import io.vertx.core.Vertx;
import io.vertx.test.core.VertxTestBase;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

/* loaded from: input_file:io/vertx/core/eventbus/EventBusFlowControlTest.class */
public class EventBusFlowControlTest extends VertxTestBase {
    protected EventBus eb;

    @Test
    public void testFlowControl() {
        MessageProducer sender = this.eb.sender("some-address");
        int i = 1000;
        int i2 = 2000;
        sender.setWriteQueueMaxSize(2000);
        MessageConsumer consumer = this.eb.consumer("some-address");
        AtomicInteger atomicInteger = new AtomicInteger();
        consumer.handler(message -> {
            if (atomicInteger.incrementAndGet() == i * i2) {
                testComplete();
            }
        });
        this.vertx.runOnContext(r10 -> {
            sendBatch(sender, i2, i, 0);
        });
        await();
    }

    @Test
    public void testFlowControlWithOptions() {
        MessageProducer sender = this.eb.sender("some-address");
        sender.deliveryOptions(new DeliveryOptions().addHeader("foo", "bar"));
        int i = 1000;
        int i2 = 2000;
        sender.setWriteQueueMaxSize(2000);
        MessageConsumer consumer = this.eb.consumer("some-address");
        AtomicInteger atomicInteger = new AtomicInteger();
        consumer.handler(message -> {
            if (atomicInteger.incrementAndGet() == i * i2) {
                testComplete();
            }
        });
        this.vertx.runOnContext(r10 -> {
            sendBatch(sender, i2, i, 0);
        });
        await();
    }

    private void sendBatch(MessageProducer<String> messageProducer, int i, int i2, int i3) {
        while (i3 < i2) {
            for (int i4 = 0; i4 < i; i4++) {
                messageProducer.send("message-" + i4);
            }
            if (messageProducer.writeQueueFull()) {
                int i5 = i3 + 1;
                messageProducer.drainHandler(r11 -> {
                    sendBatch(messageProducer, i, i2, i5);
                });
                return;
            }
            i3++;
        }
    }

    @Test
    public void testDrainHandlerCalledWhenQueueAlreadyDrained() throws Exception {
        this.eb.consumer("some-address").handler(message -> {
        });
        MessageProducer sender = this.eb.sender("some-address");
        sender.setWriteQueueMaxSize(1);
        sender.write("msg");
        assertTrue(sender.writeQueueFull());
        waitUntil(() -> {
            return !sender.writeQueueFull();
        });
        sender.drainHandler(r3 -> {
            testComplete();
        });
        await();
    }

    @Test
    public void testFlowControlPauseConsumer() {
        MessageProducer<String> sender = this.eb.sender("some-address");
        int i = 10;
        int i2 = 100;
        sender.setWriteQueueMaxSize(100);
        MessageConsumer consumer = this.eb.consumer("some-address");
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        consumer.handler(message -> {
            assertFalse(atomicBoolean.get());
            int incrementAndGet = atomicInteger.incrementAndGet();
            if (incrementAndGet == i * i2) {
                testComplete();
            }
            if (incrementAndGet % 100 == 0) {
                consumer.pause();
                atomicBoolean.set(true);
                this.vertx.setTimer(100L, l -> {
                    atomicBoolean.set(false);
                    consumer.resume();
                });
            }
        });
        sendBatch(sender, 100, 10, 0);
        await();
    }

    @Test
    public void testFlowControlNoConsumer() {
        MessageProducer sender = this.eb.sender("some-address");
        sender.setWriteQueueMaxSize(2000);
        boolean z = false;
        for (int i = 0; i < 2000 * 2; i++) {
            sender.send("message-" + i);
            if (sender.writeQueueFull() && !z) {
                sender.drainHandler(r4 -> {
                    fail("Should not be called");
                });
                z = true;
            }
        }
        assertTrue(z);
        this.vertx.setTimer(500L, l -> {
            testComplete();
        });
        await();
    }

    @Test
    public void testResumePausedProducer() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        AtomicReference atomicReference = new AtomicReference();
        MessageConsumer consumer = this.eb.consumer("some-address", message -> {
            if (linkedBlockingQueue.isEmpty()) {
                atomicReference.set(Vertx.currentContext());
            } else {
                assertEquals(Vertx.currentContext(), atomicReference.get());
            }
            linkedBlockingQueue.add(message.body());
        });
        consumer.pause();
        MessageProducer sender = this.eb.sender("some-address");
        LinkedList linkedList = new LinkedList();
        int i = 0;
        while (!sender.writeQueueFull()) {
            int i2 = i;
            i++;
            linkedList.add(Integer.valueOf(i2));
            sender.send(Integer.valueOf(i2));
        }
        consumer.resume();
        assertWaitUntil(() -> {
            return !sender.writeQueueFull();
        });
        int i3 = i;
        assertWaitUntil(() -> {
            return linkedBlockingQueue.size() == i3;
        });
        while (linkedList.size() > 0) {
            assertEquals(linkedList.removeFirst(), linkedBlockingQueue.poll());
        }
        assertNotNull(atomicReference.get());
    }

    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    public void setUp() throws Exception {
        super.setUp();
        this.eb = this.vertx.eventBus();
    }
}
