This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 644703d94e9 [fix][broker] Fix IllegalArgumentException in 
BucketDelayedDeliveryTracker.addMessage (#25371)
644703d94e9 is described below

commit 644703d94e9d697ba11b8e702464492f904f7946
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 20 14:25:23 2026 -0700

    [fix][broker] Fix IllegalArgumentException in 
BucketDelayedDeliveryTracker.addMessage (#25371)
---
 .../bucket/BucketDelayedDeliveryTracker.java        |  9 ++++-----
 ...ucketDelayedDeliveryTrackerThreadSafetyTest.java | 21 +++++++++++++++++++++
 2 files changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index ff077e57340..9ed304c7331 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -392,14 +392,13 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             }
         }
 
-        if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
-            // If (ledgerId < startLedgerId || existBucket) means that message 
index belong to previous bucket range,
+        if (ledgerId >= lastMutableBucket.endLedgerId && !existBucket) {
+            lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
+        } else {
+            // Message index belongs to previous bucket range or the current 
mutable bucket range,
             // enter sharedBucketPriorityQueue directly
             sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
             lastMutableBucket.putIndexBit(ledgerId, entryId);
-        } else {
-            checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
-            lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
         }
 
         numberDelayedMessages.incrementAndGet();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
index 3bc96499bfd..dca8f2d6fb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
@@ -176,6 +176,27 @@ public class BucketDelayedDeliveryTrackerThreadSafetyTest {
         assertEquals(errors.get(), 0, "No exceptions should occur during 
concurrent operations");
     }
 
+    /**
+     * Regression test: addMessage should not throw IllegalArgumentException 
when a ledgerId
+     * falls within the current mutable bucket range [startLedgerId, 
endLedgerId).
+     * Before the fix, such ledger IDs hit a checkArgument(ledgerId >= 
endLedgerId)
+     * that assumed monotonically increasing ledger IDs.
+     */
+    @Test
+    public void testAddMessageWithNonMonotonicLedgerIds() {
+        long deliverAt = System.currentTimeMillis() + 10000;
+
+        // Add messages with increasing ledger IDs to establish the mutable 
bucket range.
+        tracker.addMessage(100, 0, deliverAt);
+        tracker.addMessage(100, 1, deliverAt);
+        tracker.addMessage(101, 0, deliverAt);
+        tracker.addMessage(101, 1, deliverAt);
+
+        // Now add a message with a ledgerId that falls within the established 
range.
+        // This should NOT throw IllegalArgumentException.
+        tracker.addMessage(100, 2, deliverAt);
+    }
+
     /**
      * Test concurrent nextDeliveryTime() calls.
      * This verifies the StampedLock implementation for read-heavy operations.

Reply via email to