This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 29b65a4bd2f [fix][broker] Fix IllegalArgumentException in
BucketDelayedDeliveryTracker.addMessage (#25371)
29b65a4bd2f is described below
commit 29b65a4bd2f62101bee92074f8060fc0544ca42f
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 20 14:25:23 2026 -0700
[fix][broker] Fix IllegalArgumentException in
BucketDelayedDeliveryTracker.addMessage (#25371)
---
.../bucket/BucketDelayedDeliveryTracker.java | 9 ++++-----
...ucketDelayedDeliveryTrackerThreadSafetyTest.java | 21 +++++++++++++++++++++
2 files changed, 25 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index ff077e57340..9ed304c7331 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -392,14 +392,13 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
}
}
- if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
- // If (ledgerId < startLedgerId || existBucket) means that message
index belong to previous bucket range,
+ if (ledgerId >= lastMutableBucket.endLedgerId && !existBucket) {
+ lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
+ } else {
+ // Message index belongs to previous bucket range or the current
mutable bucket range,
// enter sharedBucketPriorityQueue directly
sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
lastMutableBucket.putIndexBit(ledgerId, entryId);
- } else {
- checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
- lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
}
numberDelayedMessages.incrementAndGet();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
index 3bc96499bfd..dca8f2d6fb6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
@@ -176,6 +176,27 @@ public class BucketDelayedDeliveryTrackerThreadSafetyTest {
assertEquals(errors.get(), 0, "No exceptions should occur during
concurrent operations");
}
+ /**
+ * Regression test: addMessage should not throw IllegalArgumentException
when a ledgerId
+ * falls within the current mutable bucket range [startLedgerId,
endLedgerId).
+ * Before the fix, such ledger IDs hit a checkArgument(ledgerId >=
endLedgerId)
+ * that assumed monotonically increasing ledger IDs.
+ */
+ @Test
+ public void testAddMessageWithNonMonotonicLedgerIds() {
+ long deliverAt = System.currentTimeMillis() + 10000;
+
+ // Add messages with increasing ledger IDs to establish the mutable
bucket range.
+ tracker.addMessage(100, 0, deliverAt);
+ tracker.addMessage(100, 1, deliverAt);
+ tracker.addMessage(101, 0, deliverAt);
+ tracker.addMessage(101, 1, deliverAt);
+
+ // Now add a message with a ledgerId that falls within the established
range.
+ // This should NOT throw IllegalArgumentException.
+ tracker.addMessage(100, 2, deliverAt);
+ }
+
/**
* Test concurrent nextDeliveryTime() calls.
* This verifies the StampedLock implementation for read-heavy operations.