package io.vertx.core.eventbus;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBusTestBase;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import io.vertx.test.tls.Cert;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.junit.Test;

/* loaded from: input_file:io/vertx/core/eventbus/ClusteredEventBusTest.class */
public class ClusteredEventBusTest extends ClusteredEventBusTestBase {
    @Test
    public void testLocalHandlerNotReceive() throws Exception {
        startNodes(2);
        this.vertices[1].eventBus().localConsumer("some-address1").handler(message -> {
            fail("Should not receive message");
        });
        this.vertices[0].eventBus().send("some-address1", "foo");
        this.vertices[0].setTimer(1000L, l -> {
            testComplete();
        });
        await();
    }

    @Test
    public void testDecoderSendAsymmetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 myPOJOEncoder1 = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec(myPOJOEncoder1);
        this.vertices[1].eventBus().registerCodec(myPOJOEncoder1);
        String randomAlphaString = TestUtils.randomAlphaString(100);
        testSend(new EventBusTestBase.MyPOJO(randomAlphaString), randomAlphaString, null, new DeliveryOptions().setCodecName(myPOJOEncoder1.name()));
    }

    @Test
    public void testDecoderReplyAsymmetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 myPOJOEncoder1 = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerCodec(myPOJOEncoder1);
        this.vertices[1].eventBus().registerCodec(myPOJOEncoder1);
        String randomAlphaString = TestUtils.randomAlphaString(100);
        testReply(new EventBusTestBase.MyPOJO(randomAlphaString), randomAlphaString, null, new DeliveryOptions().setCodecName(myPOJOEncoder1.name()));
    }

    @Test
    public void testDecoderSendSymmetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 myPOJOEncoder2 = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec(myPOJOEncoder2);
        this.vertices[1].eventBus().registerCodec(myPOJOEncoder2);
        EventBusTestBase.MyPOJO myPOJO = new EventBusTestBase.MyPOJO(TestUtils.randomAlphaString(100));
        testSend(myPOJO, myPOJO, null, new DeliveryOptions().setCodecName(myPOJOEncoder2.name()));
    }

    @Test
    public void testDecoderReplySymmetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 myPOJOEncoder2 = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerCodec(myPOJOEncoder2);
        this.vertices[1].eventBus().registerCodec(myPOJOEncoder2);
        EventBusTestBase.MyPOJO myPOJO = new EventBusTestBase.MyPOJO(TestUtils.randomAlphaString(100));
        testReply(myPOJO, myPOJO, null, new DeliveryOptions().setCodecName(myPOJOEncoder2.name()));
    }

    @Test
    public void testDefaultDecoderSendAsymmetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 myPOJOEncoder1 = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder1);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder1);
        String randomAlphaString = TestUtils.randomAlphaString(100);
        testSend(new EventBusTestBase.MyPOJO(randomAlphaString), randomAlphaString, null, null);
    }

    @Test
    public void testDefaultDecoderReplyAsymmetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder1 myPOJOEncoder1 = new EventBusTestBase.MyPOJOEncoder1();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder1);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder1);
        String randomAlphaString = TestUtils.randomAlphaString(100);
        testReply(new EventBusTestBase.MyPOJO(randomAlphaString), randomAlphaString, null, null);
    }

    @Test
    public void testDefaultDecoderSendSymetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 myPOJOEncoder2 = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder2);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder2);
        EventBusTestBase.MyPOJO myPOJO = new EventBusTestBase.MyPOJO(TestUtils.randomAlphaString(100));
        testSend(myPOJO, myPOJO, null, null);
    }

    @Test
    public void testDefaultDecoderReplySymetric() throws Exception {
        startNodes(2);
        EventBusTestBase.MyPOJOEncoder2 myPOJOEncoder2 = new EventBusTestBase.MyPOJOEncoder2();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder2);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyPOJO.class, myPOJOEncoder2);
        EventBusTestBase.MyPOJO myPOJO = new EventBusTestBase.MyPOJO(TestUtils.randomAlphaString(100));
        testReply(myPOJO, myPOJO, null, null);
    }

    @Test
    public void testDefaultCodecReplyExceptionSubclass() throws Exception {
        startNodes(2);
        EventBusTestBase.MyReplyException myReplyException = new EventBusTestBase.MyReplyException(23, "my exception");
        EventBusTestBase.MyReplyExceptionMessageCodec myReplyExceptionMessageCodec = new EventBusTestBase.MyReplyExceptionMessageCodec();
        this.vertices[0].eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, myReplyExceptionMessageCodec);
        this.vertices[1].eventBus().registerDefaultCodec(EventBusTestBase.MyReplyException.class, myReplyExceptionMessageCodec);
        this.vertices[0].eventBus().consumer("some-address1", message -> {
            assertTrue(message.body() instanceof EventBusTestBase.MyReplyException);
            testComplete();
        }).completionHandler(asyncResult -> {
            this.vertices[1].eventBus().send("some-address1", myReplyException);
        });
        await();
    }

    @Test
    public void testClusteredPong() throws Exception {
        VertxOptions vertxOptions = new VertxOptions();
        vertxOptions.getEventBusOptions().setClusterPingInterval(500L).setClusterPingReplyInterval(500L);
        startNodes(2, vertxOptions);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.vertices[0].eventBus().consumer("foobar").handler(message -> {
            if (atomicBoolean.get()) {
                testComplete();
            } else {
                atomicBoolean.set(true);
                this.vertx.setTimer(4000L, l -> {
                    this.vertices[1].eventBus().send("foobar", "whatever2");
                });
            }
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].eventBus().send("foobar", "whatever");
        });
        await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously1() {
        startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        ThreadLocal threadLocal = new ThreadLocal();
        threadLocal.set(true);
        consumer.completionHandler(asyncResult -> {
            assertTrue(Vertx.currentContext().isEventLoopContext());
            assertNull(threadLocal.get());
            testComplete();
        });
        consumer.handler(message -> {
        });
        await();
    }

    @Test
    public void testConsumerHandlesCompletionAsynchronously2() {
        startNodes(2);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        consumer.handler(message -> {
        });
        ThreadLocal threadLocal = new ThreadLocal();
        threadLocal.set(true);
        consumer.completionHandler(asyncResult -> {
            assertTrue(Vertx.currentContext().isEventLoopContext());
            assertNull(threadLocal.get());
            testComplete();
        });
        await();
    }

    @Test
    public void testSubsRemovedForClosedNode() throws Exception {
        testSubsRemoved(countDownLatch -> {
            this.vertices[1].close(onSuccess(r3 -> {
                countDownLatch.countDown();
            }));
        });
    }

    @Test
    public void testSubsRemovedForKilledNode() throws Exception {
        testSubsRemoved(countDownLatch -> {
            this.vertices[1].getClusterManager().leave(onSuccess(r3 -> {
                countDownLatch.countDown();
            }));
        });
    }

    private void testSubsRemoved(Consumer<CountDownLatch> consumer) throws Exception {
        startNodes(3);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        this.vertices[0].eventBus().consumer("some-address1", message -> {
            int andIncrement = atomicInteger.getAndIncrement();
            assertEquals(message.body(), "foo" + andIncrement);
            if (andIncrement == 9) {
                testComplete();
            }
            if (andIncrement > 9) {
                fail("too many messages");
            }
        }).completionHandler(onSuccess(r6 -> {
            this.vertices[1].eventBus().consumer("some-address1", message2 -> {
                fail("shouldn't get message");
            }).completionHandler(onSuccess(r3 -> {
                countDownLatch.countDown();
            }));
        }));
        awaitLatch(countDownLatch);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        consumer.accept(countDownLatch2);
        awaitLatch(countDownLatch2);
        Thread.sleep(2000L);
        this.vertices[2].runOnContext(r62 -> {
            EventBus eventBus = this.vertices[2].eventBus();
            for (int i = 0; i < 10; i++) {
                eventBus.send("some-address1", "foo" + i);
            }
        });
        await();
    }

    @Test
    public void sendNoContext() throws Exception {
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        ConcurrentLinkedDeque concurrentLinkedDeque2 = new ConcurrentLinkedDeque();
        startNodes(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.vertices[1].eventBus().consumer("some-address1", message -> {
            concurrentLinkedDeque2.add(message.body());
            if (concurrentLinkedDeque2.size() == concurrentLinkedDeque.size()) {
                assertEquals(new ArrayList(concurrentLinkedDeque), new ArrayList(concurrentLinkedDeque2));
                testComplete();
            }
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            countDownLatch.countDown();
        });
        countDownLatch.await();
        EventBus eventBus = this.vertices[0].eventBus();
        for (int i = 0; i < 1000; i++) {
            concurrentLinkedDeque.add(Integer.valueOf(i));
            eventBus.send("some-address1", Integer.valueOf(i));
        }
        await();
    }

    @Test
    public void testSendLocalOnly() {
        testDeliveryOptionsLocalOnly(true);
    }

    @Test
    public void testPublishLocalOnly() {
        testDeliveryOptionsLocalOnly(false);
    }

    private void testDeliveryOptionsLocalOnly(boolean z) {
        waitFor(30);
        startNodes(2);
        AtomicLong atomicLong = new AtomicLong();
        this.vertices[0].eventBus().localConsumer("some-address1").handler(message -> {
            atomicLong.incrementAndGet();
            complete();
        });
        AtomicLong atomicLong2 = new AtomicLong();
        this.vertices[1].eventBus().consumer("some-address1").handler(message2 -> {
            atomicLong2.incrementAndGet();
        }).completionHandler(onSuccess(r8 -> {
            for (int i = 0; i < 30; i++) {
                if (z) {
                    this.vertices[0].eventBus().send("some-address1", "msg", new DeliveryOptions().setLocalOnly(true));
                } else {
                    this.vertices[0].eventBus().publish("some-address1", "msg", new DeliveryOptions().setLocalOnly(true));
                }
            }
        }));
        await();
        assertEquals(30L, atomicLong.get());
        assertEquals(0L, atomicLong2.get());
    }

    @Test
    public void testLocalOnlyDoesNotApplyToReplies() {
        startNodes(2);
        this.vertices[1].eventBus().consumer("some-address1").handler(message -> {
            message.reply("pong", new DeliveryOptions().setLocalOnly(true));
        }).completionHandler(onSuccess(r8 -> {
            this.vertices[0].eventBus().send("some-address1", "ping", new DeliveryOptions().setSendTimeout(500L), onSuccess(message2 -> {
                testComplete();
            }));
        }));
        await();
    }

    @Test
    public void testImmediateUnregistration() {
        startNodes(1);
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("some-address1");
        AtomicInteger atomicInteger = new AtomicInteger();
        consumer.completionHandler(asyncResult -> {
            assertEquals(0L, atomicInteger.getAndIncrement());
            assertTrue(asyncResult.failed());
            this.vertx.setTimer(10L, l -> {
                testComplete();
            });
        });
        consumer.handler(message -> {
        });
        consumer.unregister();
        await();
    }

    @Test
    public void testSendWriteHandler() {
        startNodes(2);
        waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", message -> {
            complete();
        }).completionHandler(onSuccess(r6 -> {
            this.vertices[0].eventBus().sender("some-address1").write("body", onSuccess(r3 -> {
                complete();
            }));
        }));
        await();
    }

    @Test
    public void testSendWriteHandlerNoConsumer() {
        startNodes(2);
        this.vertices[0].eventBus().sender("some-address1").write("body", onFailure(th -> {
            assertTrue(th instanceof ReplyException);
            assertEquals(-1L, ((ReplyException) th).failureCode());
            testComplete();
        }));
        await();
    }

    @Test
    public void testPublishWriteHandler() {
        startNodes(2);
        waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", message -> {
            complete();
        }).completionHandler(onSuccess(r6 -> {
            this.vertices[0].eventBus().publisher("some-address1").write("body", onSuccess(r3 -> {
                complete();
            }));
        }));
        await();
    }

    @Test
    public void testPublishWriteHandlerNoConsumer() {
        startNodes(2);
        this.vertices[0].eventBus().publisher("some-address1").write("body", onFailure(th -> {
            assertTrue(th instanceof ReplyException);
            assertEquals(-1L, ((ReplyException) th).failureCode());
            testComplete();
        }));
        await();
    }

    @Test
    public void testWriteHandlerConnectFailure() {
        VertxOptions options = getOptions();
        options.getEventBusOptions().setSsl(true).setTrustAll(false).setKeyCertOptions((KeyCertOptions) Cert.SERVER_JKS.get());
        startNodes(2, options);
        this.vertices[1].eventBus().consumer("some-address1", message -> {
        }).completionHandler(onSuccess(r6 -> {
            this.vertices[0].eventBus().sender("some-address1").write("body", onFailure(th -> {
                testComplete();
            }));
        }));
        await();
    }

    @Test
    public void testWriteHandlerLookupFailure() {
        final Throwable th = new Throwable();
        VertxOptions clusterManager = new VertxOptions().setClusterManager(new FakeClusterManager() { // from class: io.vertx.core.eventbus.ClusteredEventBusTest.1
            @Override // io.vertx.test.fakecluster.FakeClusterManager
            public <K, V> void getAsyncMultiMap(String str, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) {
                if (!"__vertx.subs".equals(str)) {
                    super.getAsyncMultiMap(str, handler);
                } else {
                    Throwable th2 = th;
                    super.getAsyncMultiMap(str, asyncResult -> {
                        handler.handle(asyncResult.map(asyncMultiMap -> {
                            return new AsyncMultiMap<K, V>() { // from class: io.vertx.core.eventbus.ClusteredEventBusTest.1.1
                                public void add(K k, V v, Handler<AsyncResult<Void>> handler2) {
                                    asyncMultiMap.add(k, v, handler2);
                                }

                                public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> handler2) {
                                    handler2.handle(Future.failedFuture(th2));
                                }

                                public void remove(K k, V v, Handler<AsyncResult<Boolean>> handler2) {
                                    asyncMultiMap.remove(k, v, handler2);
                                }

                                public void removeAllForValue(V v, Handler<AsyncResult<Void>> handler2) {
                                    asyncMultiMap.removeAllForValue(v, handler2);
                                }

                                public void removeAllMatching(Predicate<V> predicate, Handler<AsyncResult<Void>> handler2) {
                                    asyncMultiMap.removeAllMatching(predicate, handler2);
                                }
                            };
                        }));
                    });
                }
            }
        });
        clusterManager.getEventBusOptions().setHost("localhost").setPort(0).setClustered(true);
        this.vertices = new Vertx[1];
        clusteredVertx(clusterManager, onSuccess(vertx -> {
            this.vertices[0] = vertx;
        }));
        assertWaitUntil(() -> {
            return this.vertices[0] != null;
        });
        this.vertices[0].eventBus().sender("some-address1").write("the_string", onFailure(th2 -> {
            assertSame(th, th2);
            testComplete();
        }));
        await();
    }
}
