package org.apache.pulsar.broker.service.plugin;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/plugin/FilterEntryTest.class */
public class FilterEntryTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(FilterEntryTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        internalCleanup();
    }

    @Test
    public void testOverride() throws Exception {
        this.conf.setAllowOverrideEntryFilters(true);
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("test");
        }
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        EntryFilterWithClassLoader entryFilterWithClassLoader = (EntryFilterWithClassLoader) Mockito.mock(EntryFilterWithClassLoader.class);
        Mockito.when(entryFilterWithClassLoader.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.REJECT);
        ImmutableMap of = ImmutableMap.of("key", entryFilterWithClassLoader);
        Field declaredField = persistentTopic.getClass().getSuperclass().getDeclaredField("entryFilters");
        declaredField.setAccessible(true);
        declaredField.set(persistentTopic, of);
        EntryFilterWithClassLoader entryFilterWithClassLoader2 = (EntryFilterWithClassLoader) Mockito.mock(EntryFilterWithClassLoader.class);
        Mockito.when(entryFilterWithClassLoader2.filterEntry((Entry) ArgumentMatchers.any(Entry.class), (FilterContext) ArgumentMatchers.any(FilterContext.class))).thenReturn(EntryFilter.FilterResult.ACCEPT);
        ImmutableMap of2 = ImmutableMap.of("key2", entryFilterWithClassLoader2);
        Field declaredField2 = this.pulsar.getBrokerService().getClass().getDeclaredField("entryFilters");
        declaredField2.setAccessible(true);
        declaredField2.set(this.pulsar.getBrokerService(), of2);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();
        int i2 = 0;
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge(receive);
        }
        AssertJUnit.assertEquals(0, i2);
        this.conf.setAllowOverrideEntryFilters(false);
        subscribe.close();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub1").subscribe();
        int i3 = 0;
        while (true) {
            Message receive2 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive2 == null) {
                AssertJUnit.assertEquals(10, i3);
                this.conf.setAllowOverrideEntryFilters(false);
                subscribe2.close();
                return;
            }
            i3++;
            subscribe2.acknowledge(receive2);
        }
    }

    @Test
    public void testFilter() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("1", "1");
        hashMap.put("2", "2");
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionProperties(hashMap).isAckReceiptEnabled(true).subscriptionName("sub").subscribe();
        PersistentSubscription subscription = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub");
        Dispatcher dispatcher = subscription.getDispatcher();
        Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
        declaredField.setAccessible(true);
        NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
        EntryFilterWithClassLoader entryFilterWithClassLoader = (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader);
        EntryFilterWithClassLoader entryFilterWithClassLoader2 = (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, new EntryFilter2Test(), narClassLoader);
        declaredField.set(dispatcher, ImmutableList.of(entryFilterWithClassLoader, entryFilterWithClassLoader2));
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("test");
        }
        verifyBacklog(str, "sub", 10, 10, 10, 10, 0, 0, 0, 0);
        int i2 = 0;
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                break;
            }
            i2++;
            subscribe.acknowledge(receive);
        }
        AssertJUnit.assertEquals(10, i2);
        verifyBacklog(str, "sub", 0, 0, 0, 0, 0, 0, 0, 0);
        subscribe.close();
        MessageIdImpl messageIdImpl = null;
        for (int i3 = 0; i3 < 10; i3++) {
            messageIdImpl = (MessageIdImpl) create.newMessage().property("REJECT", "").value("1").send();
        }
        verifyBacklog(str, "sub", 10, 10, 0, 0, 10, 10, 0, 0);
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).isAckReceiptEnabled(true).subscriptionProperties(hashMap).subscriptionName("sub").subscribe();
        int i4 = 0;
        while (true) {
            Message receive2 = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive2 == null) {
                break;
            }
            i4++;
            subscribe2.acknowledge(receive2);
        }
        AssertJUnit.assertEquals(0, i4);
        verifyBacklog(str, "sub", 0, 0, 0, 0, 0, 0, 0, 0);
        AssertJUnit.assertNotNull(messageIdImpl);
        MessageIdImpl messageIdImpl2 = messageIdImpl;
        Awaitility.await().untilAsserted(() -> {
            PositionImpl markDeletedPosition = subscription.getCursor().getMarkDeletedPosition();
            AssertJUnit.assertEquals(markDeletedPosition.getLedgerId(), messageIdImpl2.getLedgerId());
            AssertJUnit.assertEquals(markDeletedPosition.getEntryId(), messageIdImpl2.getEntryId());
        });
        subscribe2.close();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionProperties(hashMap).subscriptionName("sub").subscribe();
        for (int i5 = 0; i5 < 10; i5++) {
            create.newMessage().property(String.valueOf(i5), String.valueOf(i5)).value("1").send();
        }
        int i6 = 0;
        while (true) {
            Message receive3 = subscribe3.receive(1, TimeUnit.SECONDS);
            if (receive3 == null) {
                AssertJUnit.assertEquals(2, i6);
                create.close();
                subscribe3.close();
                PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
                Field declaredField2 = persistentTopic.getClass().getSuperclass().getDeclaredField("entryFilters");
                declaredField2.setAccessible(true);
                declaredField2.set(persistentTopic, ImmutableMap.of("1", entryFilterWithClassLoader, "2", entryFilterWithClassLoader2));
                cleanup();
                ((EntryFilterWithClassLoader) Mockito.verify(entryFilterWithClassLoader, Mockito.times(1))).close();
                ((EntryFilterWithClassLoader) Mockito.verify(entryFilterWithClassLoader2, Mockito.times(1))).close();
                return;
            }
            i6++;
            subscribe3.acknowledge(receive3);
        }
    }

    @Test
    public void testFilteredMsgCount() throws Throwable {
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub").subscribe();
            try {
                PersistentSubscription subscription = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub");
                Dispatcher dispatcher = subscription.getDispatcher();
                Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
                declaredField.setAccessible(true);
                NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
                declaredField.set(dispatcher, ImmutableList.of((EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader), (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilter2Test(), narClassLoader)));
                for (int i = 0; i < 10; i++) {
                    create.send("test");
                }
                for (int i2 = 0; i2 < 10; i2++) {
                    AssertJUnit.assertNotNull(create.newMessage().property("REJECT", "").value("1").send());
                }
                int i3 = 0;
                while (true) {
                    Message receive = subscribe.receive(10, TimeUnit.SECONDS);
                    if (receive == null) {
                        break;
                    }
                    i3++;
                    AssertJUnit.assertEquals((String) receive.getValue(), "test");
                    subscribe.acknowledge(receive);
                }
                AssertJUnit.assertEquals(10, i3);
                AssertJUnit.assertEquals(subscription.getTopic().getFilteredEntriesCount(), 10L);
                if (subscribe != null) {
                    subscribe.close();
                }
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscription() throws Throwable {
        String str = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
        HashMap hashMap = new HashMap();
        hashMap.put("matchValueAccept", "FOR-1");
        hashMap.put("matchValueReschedule", "FOR-2");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("matchValueAccept", "FOR-2");
        hashMap2.put("matchValueReschedule", "FOR-1");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(str).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionType(SubscriptionType.Shared).properties(hashMap).consumerName("consumer1").receiverQueueSize(5).subscriptionName("sub").subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Shared).properties(hashMap2).consumerName("consumer2").topic(new String[]{str}).receiverQueueSize(5).subscriptionName("sub").subscribe();
                try {
                    Dispatcher dispatcher = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("sub").getDispatcher();
                    Field declaredField = EntryFilterSupport.class.getDeclaredField("entryFilters");
                    declaredField.setAccessible(true);
                    NarClassLoader narClassLoader = (NarClassLoader) Mockito.mock(NarClassLoader.class);
                    declaredField.set(dispatcher, ImmutableList.of((EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader), (EntryFilterWithClassLoader) BrokerTestUtil.spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, new EntryFilterTest(), narClassLoader)));
                    int i = 200;
                    for (int i2 = 0; i2 < 200; i2++) {
                        if (i2 % 2 == 0) {
                            create.newMessage().property("FOR-1", "").value("consumer-1").send();
                        } else {
                            create.newMessage().property("FOR-2", "").value("consumer-2").send();
                        }
                    }
                    CompletableFuture completableFuture = new CompletableFuture();
                    this.pulsar.getExecutor().submit(() -> {
                        int i3 = 0;
                        while (i3 < i / 2) {
                            try {
                                Message receive = subscribe.receive(1, TimeUnit.MINUTES);
                                if (receive != null) {
                                    log.info("received1 {} - {}", receive.getValue(), receive.getProperties());
                                    i3++;
                                    AssertJUnit.assertEquals("consumer-1", (String) receive.getValue());
                                    subscribe.acknowledgeAsync(receive);
                                } else {
                                    completableFuture.completeExceptionally(new Exception("consumer1 did not receive all the messages"));
                                }
                            } catch (Throwable th) {
                                completableFuture.completeExceptionally(th);
                                return;
                            }
                        }
                        completableFuture.complete(null);
                    });
                    CompletableFuture completableFuture2 = new CompletableFuture();
                    this.pulsar.getExecutor().submit(() -> {
                        int i3 = 0;
                        while (i3 < i / 2) {
                            try {
                                Message receive = subscribe2.receive(1, TimeUnit.MINUTES);
                                if (receive != null) {
                                    log.info("received2 {} - {}", receive.getValue(), receive.getProperties());
                                    i3++;
                                    AssertJUnit.assertEquals("consumer-2", (String) receive.getValue());
                                    subscribe2.acknowledgeAsync(receive);
                                } else {
                                    completableFuture2.completeExceptionally(new Exception("consumer2 did not receive all the messages"));
                                }
                            } catch (Throwable th) {
                                completableFuture.completeExceptionally(th);
                                return;
                            }
                        }
                        completableFuture2.complete(null);
                    });
                    completableFuture.get(1L, TimeUnit.MINUTES);
                    completableFuture2.get(1L, TimeUnit.MINUTES);
                    if (subscribe2 != null) {
                        subscribe2.close();
                    }
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (subscribe2 != null) {
                        try {
                            subscribe2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void verifyBacklog(String str, String str2, int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8) throws Exception {
        AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog = this.admin.topics().analyzeSubscriptionBacklog(str, str2, Optional.empty());
        Assert.assertEquals(i, analyzeSubscriptionBacklog.getEntries());
        Assert.assertEquals(i3, analyzeSubscriptionBacklog.getFilterAcceptedEntries());
        Assert.assertEquals(i5, analyzeSubscriptionBacklog.getFilterRejectedEntries());
        Assert.assertEquals(i7, analyzeSubscriptionBacklog.getFilterRescheduledEntries());
        Assert.assertEquals(i2, analyzeSubscriptionBacklog.getMessages());
        Assert.assertEquals(i4, analyzeSubscriptionBacklog.getFilterAcceptedMessages());
        Assert.assertEquals(i6, analyzeSubscriptionBacklog.getFilterRejectedMessages());
        Assert.assertEquals(i8, analyzeSubscriptionBacklog.getFilterRescheduledMessages());
    }
}
