This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 35b4581fb0a [fix][test] Fix flaky testMsgDropStat in 
NonPersistentTopicTest (#25426)
35b4581fb0a is described below

commit 35b4581fb0a735eeb64b652c381742470251c90d
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 30 15:53:41 2026 -0700

    [fix][test] Fix flaky testMsgDropStat in NonPersistentTopicTest (#25426)
    
    (cherry picked from commit e9630c60e8bff865e1bfeb2ed26a6c1d65f040b4)
---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 54 ++++++++++------------
 1 file changed, 25 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 284e6a68928..10919bd37c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -59,7 +59,6 @@ import 
org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
@@ -886,30 +885,31 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
             ExecutorService executor = Executors.newFixedThreadPool(threads);
             byte[] msgData = "testData".getBytes();
 
+            NonPersistentTopic topic =
+                    (NonPersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
             /*
-             * Trigger at least one publisher drop through concurrent send() 
calls.
+             * Send concurrent bursts until publisher AND subscription drop 
rates are all > 0.
+             *
+             * Each burst uses a CyclicBarrier so all threads send 
simultaneously. With
+             * maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx 
drops overlapping
+             * sends (publisher drops). Once subscriber queues (size 1) are 
full, the dispatcher
+             * also drops delivered messages (subscription drops).
              *
-             * Uses CyclicBarrier to ensure all threads send simultaneously, 
creating overlap.
-             * With maxConcurrentNonPersistentMessagePerConnection = 0, 
ServerCnx#handleSend
-             * drops any send while another is in-flight, returning MessageId 
with entryId = -1.
-             * Awaitility repeats whole bursts (bounded to 20s) until a drop 
is observed.
+             * IMPORTANT: updateRates() calls Rate.calculateRate() which 
resets counters via
+             * sumThenReset(). We must keep sending fresh bursts so each 
updateRates() call
+             * sees new drops, rather than retrying with stale (reset) 
counters.
              */
-            AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
-            Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+            
Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(()
 -> {
                 CyclicBarrier barrier = new CyclicBarrier(threads);
                 CountDownLatch completionLatch = new CountDownLatch(threads);
                 AtomicReference<Throwable> error = new AtomicReference<>();
-                publisherDropSeen.set(false);
 
                 for (int i = 0; i < threads; i++) {
                     executor.submit(() -> {
                         try {
                             barrier.await();
-                            MessageId msgId = producer.send(msgData);
-                            // Publisher drop is signaled by 
MessageIdImpl.entryId == -1
-                            if (msgId instanceof MessageIdImpl && 
((MessageIdImpl) msgId).getEntryId() == -1) {
-                                publisherDropSeen.set(true);
-                            }
+                            producer.send(msgData);
                         } catch (Throwable t) {
                             if (t instanceof InterruptedException) {
                                 Thread.currentThread().interrupt();
@@ -921,27 +921,23 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
                     });
                 }
 
-                // Wait for all sends to complete.
-                assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
-
-                assertNull(error.get(), "Concurrent send encountered an 
exception");
-                return publisherDropSeen.get();
-            });
-
-            assertTrue(publisherDropSeen.get(), "Expected at least one 
publisher drop (entryId == -1)");
-
-            NonPersistentTopic topic =
-                    (NonPersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+                completionLatch.await(20, TimeUnit.SECONDS);
+                if (error.get() != null) {
+                    return false;
+                }
 
-            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                 pulsar.getBrokerService().updateRates();
                 NonPersistentTopicStats stats = topic.getStats(false, false, 
false);
+                if (stats.getPublishers().isEmpty()) {
+                    return false;
+                }
                 NonPersistentPublisherStats npStats = 
stats.getPublishers().get(0);
                 NonPersistentSubscriptionStats sub1Stats = 
stats.getSubscriptions().get("subscriber-1");
                 NonPersistentSubscriptionStats sub2Stats = 
stats.getSubscriptions().get("subscriber-2");
-                assertTrue(npStats.getMsgDropRate() > 0);
-                assertTrue(sub1Stats.getMsgDropRate() > 0);
-                assertTrue(sub2Stats.getMsgDropRate() > 0);
+                return sub1Stats != null && sub2Stats != null
+                        && npStats.getMsgDropRate() > 0
+                        && sub1Stats.getMsgDropRate() > 0
+                        && sub2Stats.getMsgDropRate() > 0;
             });
 
         } finally {

Reply via email to