This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e9630c60e8b [fix][test] Fix flaky testMsgDropStat in
NonPersistentTopicTest (#25426)
e9630c60e8b is described below
commit e9630c60e8bff865e1bfeb2ed26a6c1d65f040b4
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 30 15:53:41 2026 -0700
[fix][test] Fix flaky testMsgDropStat in NonPersistentTopicTest (#25426)
---
.../pulsar/client/api/NonPersistentTopicTest.java | 54 ++++++++++------------
1 file changed, 25 insertions(+), 29 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 0252221301a..8feed728a46 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -59,7 +59,6 @@ import
org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
@@ -887,30 +886,31 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
ExecutorService executor = Executors.newFixedThreadPool(threads);
byte[] msgData = "testData".getBytes();
+ NonPersistentTopic topic =
+ (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
/*
- * Trigger at least one publisher drop through concurrent send()
calls.
+ * Send concurrent bursts until publisher AND subscription drop
rates are all > 0.
+ *
+ * Each burst uses a CyclicBarrier so all threads send
simultaneously. With
+ * maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx
drops overlapping
+ * sends (publisher drops). Once subscriber queues (size 1) are
full, the dispatcher
+ * also drops delivered messages (subscription drops).
*
- * Uses CyclicBarrier to ensure all threads send simultaneously,
creating overlap.
- * With maxConcurrentNonPersistentMessagePerConnection = 0,
ServerCnx#handleSend
- * drops any send while another is in-flight, returning MessageId
with entryId = -1.
- * Awaitility repeats whole bursts (bounded to 20s) until a drop
is observed.
+ * IMPORTANT: updateRates() calls Rate.calculateRate() which
resets counters via
+ * sumThenReset(). We must keep sending fresh bursts so each
updateRates() call
+ * sees new drops, rather than retrying with stale (reset)
counters.
*/
- AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
- Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+
Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(()
-> {
CyclicBarrier barrier = new CyclicBarrier(threads);
CountDownLatch completionLatch = new CountDownLatch(threads);
AtomicReference<Throwable> error = new AtomicReference<>();
- publisherDropSeen.set(false);
for (int i = 0; i < threads; i++) {
executor.submit(() -> {
try {
barrier.await();
- MessageId msgId = producer.send(msgData);
- // Publisher drop is signaled by
MessageIdImpl.entryId == -1
- if (msgId instanceof MessageIdImpl &&
((MessageIdImpl) msgId).getEntryId() == -1) {
- publisherDropSeen.set(true);
- }
+ producer.send(msgData);
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -922,27 +922,23 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
});
}
- // Wait for all sends to complete.
- assertTrue(completionLatch.await(20, TimeUnit.SECONDS));
-
- assertNull(error.get(), "Concurrent send encountered an
exception");
- return publisherDropSeen.get();
- });
-
- assertTrue(publisherDropSeen.get(), "Expected at least one
publisher drop (entryId == -1)");
-
- NonPersistentTopic topic =
- (NonPersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ completionLatch.await(20, TimeUnit.SECONDS);
+ if (error.get() != null) {
+ return false;
+ }
- Awaitility.await().ignoreExceptions().untilAsserted(() -> {
pulsar.getBrokerService().updateRates();
NonPersistentTopicStats stats = topic.getStats(false, false,
false);
+ if (stats.getPublishers().isEmpty()) {
+ return false;
+ }
NonPersistentPublisherStats npStats =
stats.getPublishers().get(0);
NonPersistentSubscriptionStats sub1Stats =
stats.getSubscriptions().get("subscriber-1");
NonPersistentSubscriptionStats sub2Stats =
stats.getSubscriptions().get("subscriber-2");
- assertTrue(npStats.getMsgDropRate() > 0);
- assertTrue(sub1Stats.getMsgDropRate() > 0);
- assertTrue(sub2Stats.getMsgDropRate() > 0);
+ return sub1Stats != null && sub2Stats != null
+ && npStats.getMsgDropRate() > 0
+ && sub1Stats.getMsgDropRate() > 0
+ && sub2Stats.getMsgDropRate() > 0;
});
} finally {