This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 9636094bb3c [fix][test] Fix flaky
PersistentStickyKeyDispatcherMultipleConsumersClassicTest.testSkipRedeliverTemporally
(#25385)
9636094bb3c is described below
commit 9636094bb3c2b2dd8eda71522134a965a98c0496
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Mar 23 05:32:33 2026 +0200
[fix][test] Fix flaky
PersistentStickyKeyDispatcherMultipleConsumersClassicTest.testSkipRedeliverTemporally
(#25385)
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
(cherry picked from commit 953d092b10ab7ae5215677aeb46df2a5df85c2eb)
---
...StickyKeyDispatcherMultipleConsumersClassicTest.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
index 1ef74915418..edece4342e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
@@ -499,13 +499,23 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassicTest {
return Collections.emptySet();
}).when(cursorMock).asyncReplayEntries(anySet(), any(), any(),
anyBoolean());
+ // Simulate real cursor behavior: track read position so entries are
only returned once
+ // by normal reads (subsequent access must go through
asyncReplayEntries).
+ // When no new entries are available, don't call the callback
(simulating "OrWait" behavior).
+ Set<Position> normalReadReturned = new ConcurrentSkipListSet<>();
doAnswer(invocationOnMock -> {
int maxEntries = invocationOnMock.getArgument(0);
AsyncCallbacks.ReadEntriesCallback callback =
invocationOnMock.getArgument(2);
List<Entry> entries = allEntries.stream()
- .filter(entry -> entry.getLedgerId() != -1 &&
!alreadySent.contains(entry.getPosition()))
+ .filter(entry -> entry.getLedgerId() != -1
+ &&
!normalReadReturned.contains(entry.getPosition()))
.limit(maxEntries)
.toList();
+ if (entries.isEmpty()) {
+ // No new entries available - simulate "wait" by not calling
callback
+ return null;
+ }
+ entries.forEach(e -> normalReadReturned.add(e.getPosition()));
Object ctx = invocationOnMock.getArgument(3);
callback.readEntriesComplete(copyEntries(entries), ctx);
return null;
@@ -551,6 +561,11 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassicTest {
// set permits to 2
slowConsumerAvailablePermits.set(2);
+ // Trigger a new read cycle so the dispatcher can do a replay read to
deliver
+ // messages to the slow consumer. In production, this would be
triggered by
+ // consumerFlow when the consumer sends more permits.
+ persistentDispatcher.readMoreEntriesAsync();
+
// now wait for slow consumer messages since there are permits
assertTrue(slowConsumerMessagesSent.await(5, TimeUnit.SECONDS));