This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 027f4e94b4a [fix][broker] Fix delayed messages stalling with 
isDelayedDeliveryDeliverAtTimeStrict=true (#26012)
027f4e94b4a is described below

commit 027f4e94b4a2e975c4aae2d6111bf31d201dc8b1
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Jun 13 07:01:22 2026 +0800

    [fix][broker] Fix delayed messages stalling with 
isDelayedDeliveryDeliverAtTimeStrict=true (#26012)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../delayed/AbstractDelayedDeliveryTracker.java    |  99 ++++++++---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  53 ++++--
 .../bucket/BucketDelayedDeliveryTracker.java       |   7 +-
 .../delayed/InMemoryDeliveryTrackerTest.java       | 186 ++++++++++++++++++++-
 4 files changed, 302 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
index 2caf71a6eda..5140e9866bc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java
@@ -36,21 +36,26 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
     // Reference to the shared (per-broker) timer for delayed delivery
     protected final Timer timer;
 
-    // Current timeout or null if not set
-    protected Timeout timeout;
+    // Current timeout or null if not set. Guarded by timeoutLock.
+    private Timeout timeout;
 
-    // Timestamp at which the timeout is currently set
+    // Timestamp at which the timeout is currently set. Guarded by timeoutLock.
     private long currentTimeoutTarget;
 
-    // Last time the TimerTask was triggered for this class
+    // Last time the TimerTask was triggered for this class. Guarded by 
timeoutLock.
     private long lastTickRun;
 
-    protected long tickTimeMillis;
+    // Updated through resetTickTime() from dispatcher threads and read on the 
timer thread.
+    protected volatile long tickTimeMillis;
 
     protected final Clock clock;
 
     private final boolean isDelayedDeliveryDeliverAtTimeStrict;
     private final Object triggerLock;
+    // Guards the timer state (timeout, currentTimeoutTarget, lastTickRun) 
against concurrent access from
+    // dispatcher threads (updateTimer/rescheduleTimer/close) and the timer 
thread (run). It is a leaf lock:
+    // no subclass method is invoked while holding it.
+    private final Object timeoutLock = new Object();
 
     public 
AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                           long tickTimeMillis,
@@ -99,14 +104,34 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
         return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : 
clock.millis() + tickTimeMillis;
     }
 
+    protected boolean isDeliverAtTimeStrict() {
+        return isDelayedDeliveryDeliverAtTimeStrict;
+    }
+
     public void resetTickTime(long tickTime) {
         if (this.tickTimeMillis != tickTime) {
             this.tickTimeMillis = tickTime;
         }
     }
 
-    protected void updateTimer() {
-        if (getNumberOfDelayedMessages() == 0) {
+    /**
+     * Update the delivery timer to fire when the next message in the tracker 
becomes due.
+     *
+     * Callers are expected to serialize all tracker state mutations (at the 
dispatcher or tracker level), so the
+     * snapshot of {@link #getNumberOfDelayedMessages()} and {@link 
#nextDeliveryTime()} is taken before acquiring
+     * timeoutLock. This keeps timeoutLock a leaf lock that never calls into 
subclass methods, ruling out lock
+     * ordering deadlocks with subclasses that synchronize those methods on 
the tracker instance.
+     */
+    protected final void updateTimer() {
+        long numberOfDelayedMessages = getNumberOfDelayedMessages();
+        long nextDeliveryTimestamp = numberOfDelayedMessages > 0 ? 
nextDeliveryTime() : -1;
+        synchronized (timeoutLock) {
+            doUpdateTimer(numberOfDelayedMessages, nextDeliveryTimestamp);
+        }
+    }
+
+    private void doUpdateTimer(long numberOfDelayedMessages, long timestamp) {
+        if (numberOfDelayedMessages == 0) {
             if (timeout != null) {
                 currentTimeoutTarget = -1;
                 timeout.cancel();
@@ -114,7 +139,6 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
             }
             return;
         }
-        long timestamp = nextDeliveryTime();
         if (timestamp == currentTimeoutTarget) {
             // The timer is already set to the correct target time
             return;
@@ -122,52 +146,83 @@ public abstract class AbstractDelayedDeliveryTracker 
implements DelayedDeliveryT
 
         if (timeout != null) {
             timeout.cancel();
+            timeout = null;
         }
+        // Reset the tracked state so a subsequent updateTimer() call cannot 
short-circuit on a stale
+        // currentTimeoutTarget while no live timer remains. See #25996.
+        currentTimeoutTarget = -1;
 
         long now = clock.millis();
         long delayMillis = timestamp - now;
 
-        if (delayMillis < 0) {
+        if (delayMillis <= 0) {
             // There are messages that are already ready to be delivered. If
             // the dispatcher is not getting them is because the consumer is
             // either not connected or slow.
             // We don't need to keep retriggering the timer. When the consumer
             // catches up, the dispatcher will do the readMoreEntries() and
-            // get these messages
+            // get these messages.
             return;
         }
 
         // Compute the earliest time that we schedule the timer to run.
         long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
         long calculatedDelayMillis = Math.max(delayMillis, 
remainingTickDelayMillis);
-            log.debug().attr("delayMillis", calculatedDelayMillis)
-                    .log("Start timer");
-                // Even though we may delay longer than this timestamp because 
of the tick delay, we still track the
+        log.debug().attr("delayMillis", calculatedDelayMillis)
+                .log("Start timer");
+        // Even though we may delay longer than this timestamp because of the 
tick delay, we still track the
         // current timeout with reference to the next message's timestamp.
         currentTimeoutTarget = timestamp;
         timeout = timer.newTimeout(this, calculatedDelayMillis, 
TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public void run(Timeout timeout) throws Exception {
-            log.debug("Timer triggered");
-                if (timeout == null || timeout.isCancelled()) {
+    public void run(Timeout triggeredTimeout) throws Exception {
+        log.debug("Timer triggered");
+
+        if (triggeredTimeout == null || triggeredTimeout.isCancelled()) {
             return;
         }
 
-        synchronized (triggerLock) {
+        synchronized (timeoutLock) {
             lastTickRun = clock.millis();
-            currentTimeoutTarget = -1;
-            this.timeout = null;
+            // Only reset the timer state if the triggered timeout is the 
currently armed one. A timeout that
+            // was already superseded by updateTimer()/rescheduleTimer() may 
still fire if it passed its
+            // isCancelled() check before being cancelled; it must not clear 
the state of the newer timer.
+            if (triggeredTimeout == this.timeout) {
+                currentTimeoutTarget = -1;
+                this.timeout = null;
+            }
+        }
+
+        synchronized (triggerLock) {
             context.triggerReadMoreEntries();
         }
     }
 
+    /**
+     * Cancel the current timer (if any) and schedule the timer task to run 
after the given delay. Used by
+     * subclasses to trigger a dispatch round from asynchronous completions 
instead of mutating the timer
+     * state directly.
+     */
+    protected final void rescheduleTimer(long delayMillis) {
+        synchronized (timeoutLock) {
+            if (timeout != null) {
+                timeout.cancel();
+            }
+            currentTimeoutTarget = -1;
+            timeout = timer.newTimeout(this, delayMillis, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
     @Override
     public void close() {
-        if (timeout != null) {
-            timeout.cancel();
-            timeout = null;
+        synchronized (timeoutLock) {
+            if (timeout != null) {
+                timeout.cancel();
+                timeout = null;
+            }
+            currentTimeoutTarget = -1;
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index b1d4e8cecbd..8e9b8343704 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -62,6 +62,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
 
     // The bit count to trim to reduce memory occupation.
     private final int timestampPrecisionBitCnt;
+    private final long precisionMillis;
 
     // Count of delayed messages in the tracker.
     private final AtomicLong delayedMessagesCount = new AtomicLong(0);
@@ -91,6 +92,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         this.log = LOG.with().ctx(super.log).build();
         this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
         this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+        this.precisionMillis = 1L << timestampPrecisionBitCnt;
     }
 
     /**
@@ -133,7 +135,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                 .attr("entryId", entryId)
                 .attr("deliveryInMs", () -> deliverAt - clock.millis())
                 .log("Add message");
-        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+        long timestamp = roundTimestamp(deliverAt);
 
         Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, 
k -> new TreeMap<>())
             .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
@@ -142,7 +144,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         boolean isNew = !bitmap.contains(entryId);
 
         if (isNew) {
-            bitmap.add(entryId);
+            bitmap.addLong(entryId);
             delayedMessagesCount.incrementAndGet();
         }
 
@@ -153,6 +155,27 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         return true;
     }
 
+    /**
+     * Round the deliverAt timestamp to the bucket boundary used as the key in 
{@link #delayedMessageMap}, so that
+     * all messages within the same bucket share a single map entry to reduce 
memory usage.
+     *
+     * In strict delivery mode the timestamp is rounded up: a bucket then 
becomes due only after every deliverAt
+     * time inside it has passed, so messages are delivered up to one bucket 
(less than tickTimeMillis) late, but
+     * never before their deliverAt time. Rounding down instead would let 
{@link #getScheduledMessages(int)} hand a
+     * message to the dispatcher before its deliverAt time; the dispatcher 
would put it back and re-trigger reads
+     * in a loop until the deliverAt time is reached (see issue #25996).
+     *
+     * In non-strict mode the timestamp is rounded down, since delivering up 
to tickTimeMillis early is allowed.
+     */
+    private long roundTimestamp(long deliverAt) {
+        if (isDeliverAtTimeStrict()) {
+            // round up, saturating at Long.MAX_VALUE instead of overflowing 
for deliverAt close to Long.MAX_VALUE
+            long roundedUp = deliverAt + precisionMillis - 1;
+            return trimLowerBit(roundedUp < deliverAt ? Long.MAX_VALUE : 
roundedUp, timestampPrecisionBitCnt);
+        }
+        return trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+    }
+
     /**
      * Check that new delivery time comes after the current highest, or at
      * least within a single tick time interval of 1 second.
@@ -198,20 +221,22 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
             for (var ledgerEntry : ledgerMap.entrySet()) {
                 long ledgerId = ledgerEntry.getKey();
                 Roaring64Bitmap entryIds = ledgerEntry.getValue();
-                int cardinality = (int) entryIds.getLongCardinality();
+                long cardinality = entryIds.getLongCardinality();
                 if (cardinality <= n) {
+                    int cardinalityInt = (int) cardinality;
                     entryIds.forEach(entryId -> {
                         positions.add(PositionFactory.create(ledgerId, 
entryId));
                     });
-                    n -= cardinality;
-                    delayedMessagesCount.addAndGet(-cardinality);
+                    n -= cardinalityInt;
+                    delayedMessagesCount.addAndGet(-cardinalityInt);
                     ledgerIdToDelete.add(ledgerId);
                 } else {
-                    long[] entryIdsArray = entryIds.toArray();
-                    for (int i = 0; i < n; i++) {
-                        positions.add(PositionFactory.create(ledgerId, 
entryIdsArray[i]));
-                        entryIds.removeLong(entryIdsArray[i]);
-                    }
+                    Roaring64Bitmap entryIdsToRemove = new Roaring64Bitmap();
+                    entryIds.stream().limit(n).forEach(entryId -> {
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                        entryIdsToRemove.addLong(entryId);
+                    });
+                    entryIds.andNot(entryIdsToRemove);
                     delayedMessagesCount.addAndGet(-n);
                     n = 0;
                 }
@@ -226,10 +251,10 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                 delayedMessageMap.remove(timestamp);
             }
         }
-            log.debug()
-                    .attr("messagesCount", positions.size())
-                    .log("Get scheduled messages");
-                if (delayedMessageMap.isEmpty()) {
+        log.debug()
+                .attr("messagesCount", positions.size())
+                .log("Get scheduled messages");
+        if (delayedMessageMap.isEmpty()) {
             // Reset to initial state
             highestDeliveryTimeTracked = 0;
             messagesHaveFixedDelay = true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 19c774852c9..0b0b48b8de3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -712,12 +712,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
                                 System.currentTimeMillis() - loadStartTime);
                     }
-                    synchronized (this) {
-                        if (timeout != null) {
-                            timeout.cancel();
-                        }
-                        timeout = timer.newTimeout(this, 0, 
TimeUnit.MILLISECONDS);
-                    }
+                    rescheduleTimer(0);
                 });
 
                 if (!checkPendingLoadDone() || 
loadFuture.isCompletedExceptionally()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index 322992d7b1c..f8a9dd83042 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.delayed;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -33,13 +35,16 @@ import io.netty.util.TimerTask;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
 import java.time.Clock;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -114,6 +119,39 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                     new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8, 
clock,
                             true, 100)
             }};
+            case "testStrictModeNeverDeliversEarlyAndKeepsTimerArmed",
+                 "testStaleTimerTriggerDoesNotClearNewerTimer" -> {
+                // Mock timer that records the currently-armed timeouts so the 
test can observe whether a
+                // delivery timer is live and fire it like the wheel would 
(passing the armed Timeout instance,
+                // which the tracker's run() compares against its current 
timeout). Cancelling a timeout removes
+                // it from the map; firing (polling) it does not mark it 
cancelled, mirroring the wheel.
+                Timer mockTimer = mock(Timer.class);
+                NavigableMap<Long, Map.Entry<TimerTask, Timeout>> tasks = new 
TreeMap<>();
+                when(mockTimer.newTimeout(any(), anyLong(), 
any())).then(invocation -> {
+                    TimerTask task = invocation.getArgument(0, 
TimerTask.class);
+                    long delay = invocation.getArgument(1, Long.class);
+                    TimeUnit unit = invocation.getArgument(2, TimeUnit.class);
+                    long scheduleAt = clockTime.get() + unit.toMillis(delay);
+                    Timeout t = mock(Timeout.class);
+                    Map.Entry<TimerTask, Timeout> entry = Map.entry(task, t);
+                    AtomicBoolean cancelled = new AtomicBoolean();
+                    when(t.cancel()).then(i -> {
+                        cancelled.set(true);
+                        return tasks.remove(scheduleAt, entry);
+                    });
+                    when(t.isCancelled()).then(i -> cancelled.get());
+                    tasks.put(scheduleAt, entry);
+                    return t;
+                });
+                // tickTimeMillis=1000 -> delivery timestamps are bucketed at 
512ms granularity (lower 9 bits),
+                // rounded up in strict mode so that messages are never 
visible before their deliverAt time.
+                yield new Object[][]{{
+                        new InMemoryDelayedDeliveryTracker(dispatcher, 
mockTimer, 1000, clock,
+                                true, 0),
+                        tasks,
+                        mockTimer
+                }};
+            }
             default -> new Object[][]{{
                     new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
                             true, 0)
@@ -229,7 +267,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                 true, 0) {
             @Override
             public void run(Timeout timeout) throws Exception {
-                super.timeout = timer.newTimeout(this, 1, 
TimeUnit.MILLISECONDS);
+                rescheduleTimer(1);
                 if (timeout == null || timeout.isCancelled()) {
                     return;
                 }
@@ -274,6 +312,106 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
         tracker.close();
     }
 
+    /**
+     * Regression test for https://github.com/apache/pulsar/issues/25996 and 
for the strict-mode guarantee that
+     * messages are never delivered before their deliverAt time.
+     *
+     * With isDelayedDeliveryDeliverAtTimeStrict=true and tickTimeMillis=1000, 
delivery timestamps are bucketed
+     * at 512ms granularity and rounded UP, so a bucket only becomes due once 
every deliverAt time inside it has
+     * passed. Previously timestamps were rounded down, so a message could be 
popped up to ~511ms early; the
+     * dispatcher would re-add the not-yet-due message and the re-add left a 
stale {@code currentTimeoutTarget}
+     * behind that suppressed re-arming the delivery timer, stalling all 
remaining delayed messages until an
+     * unrelated dispatch event occurred.
+     *
+     * The rounded-up buckets (multiples of 512ms) used below:
+     *   M1 deliverAt=60400 -> bucket 60416
+     *   M2 deliverAt=61000 -> bucket 61440
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void 
testStrictModeNeverDeliversEarlyAndKeepsTimerArmed(InMemoryDelayedDeliveryTracker
 tracker,
+            NavigableMap<Long, Map.Entry<TimerTask, Timeout>> tasks, Timer 
mockTimer) throws Exception {
+        clockTime.set(0);
+
+        // Two delayed messages in different buckets. A delivery timer is 
armed for the earliest.
+        assertTrue(tracker.addMessage(1, 1, 60400));
+        assertTrue(tracker.addMessage(2, 2, 61000));
+        assertEquals(tasks.size(), 1, "a delivery timer should be armed for 
the earliest message");
+        assertEquals(tasks.firstKey().longValue(), 60416, "the timer should 
target M1's rounded-up bucket");
+
+        // Before M1's bucket time, nothing may be visible to the dispatcher 
(no early delivery), and a
+        // dispatch round that finds nothing must leave the delivery timer 
armed (issue #25996).
+        clockTime.set(60000);
+        assertFalse(tracker.hasMessageAvailable());
+        assertTrue(tracker.getScheduledMessages(100).isEmpty(),
+                "strict mode must not deliver a message before its deliverAt 
time");
+        assertEquals(tasks.size(), 1, "the delivery timer must remain armed");
+
+        // The timer fires at M1's bucket time; M1 is delivered at 60416 >= 
deliverAt 60400, so the
+        // dispatcher never needs to re-add it.
+        clockTime.set(60416);
+        Map.Entry<TimerTask, Timeout> firedTimeout = 
tasks.pollFirstEntry().getValue();
+        firedTimeout.getKey().run(firedTimeout.getValue());
+        Set<Position> scheduled = tracker.getScheduledMessages(100);
+        assertEquals(scheduled, Set.of(PositionFactory.create(1, 1)));
+
+        // M2 is still pending and not yet due, so a delivery timer must have 
been re-armed for it. With the
+        // issue #25996 bug, the timer state went stale at this point and M2 
stalled indefinitely.
+        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+        assertFalse(tracker.hasMessageAvailable());
+        assertEquals(tasks.size(), 1, "a delivery timer must remain armed for 
the pending message M2");
+        assertEquals(tasks.firstKey().longValue(), 61440, "the timer should 
target M2's rounded-up bucket");
+
+        // The timer fires again and M2 is delivered, also never early.
+        clockTime.set(61440);
+        firedTimeout = tasks.pollFirstEntry().getValue();
+        firedTimeout.getKey().run(firedTimeout.getValue());
+        scheduled = tracker.getScheduledMessages(100);
+        assertEquals(scheduled, Set.of(PositionFactory.create(2, 2)));
+        assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+
+        tracker.close();
+    }
+
+    /**
+     * A timeout that was superseded by a newer one may still fire: 
HashedWheelTimer can run a task that passed
+     * its isCancelled() check just before updateTimer() cancelled it. Such a 
stale trigger must not clear the
+     * state of the newer armed timer, otherwise the next updateTimer() call 
would arm a duplicate timer.
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void 
testStaleTimerTriggerDoesNotClearNewerTimer(InMemoryDelayedDeliveryTracker 
tracker,
+            NavigableMap<Long, Map.Entry<TimerTask, Timeout>> tasks, Timer 
mockTimer) throws Exception {
+        clockTime.set(0);
+
+        // Arm a timer for M2, then supersede it with an earlier message M1.
+        assertTrue(tracker.addMessage(2, 2, 61000));
+        assertEquals(tasks.firstKey().longValue(), 61440);
+        assertTrue(tracker.addMessage(1, 1, 60400));
+        assertEquals(tasks.size(), 1);
+        assertEquals(tasks.firstKey().longValue(), 60416, "M1's timer should 
have replaced M2's");
+
+        // The superseded (cancelled) timeout for M2 fires anyway, racing with 
the cancellation. The tracker
+        // must keep the state of the currently armed timer for M1.
+        Timeout staleTimeout = mock(Timeout.class);
+        tracker.run(staleTimeout);
+
+        // A subsequent updateTimer() (here through hasMessageAvailable()) 
must recognize the armed timer
+        // instead of arming a duplicate one: still exactly the two 
newTimeout() calls from the adds above.
+        assertFalse(tracker.hasMessageAvailable());
+        assertEquals(tasks.size(), 1);
+        assertEquals(tasks.firstKey().longValue(), 60416);
+        verify(mockTimer, times(2)).newTimeout(any(), anyLong(), any());
+
+        // The armed timer fires and delivery proceeds normally.
+        clockTime.set(60416);
+        Map.Entry<TimerTask, Timeout> firedTimeout = 
tasks.pollFirstEntry().getValue();
+        firedTimeout.getKey().run(firedTimeout.getValue());
+        Set<Position> scheduled = tracker.getScheduledMessages(100);
+        assertEquals(scheduled, Set.of(PositionFactory.create(1, 1)));
+        assertEquals(tasks.size(), 1, "a delivery timer must be re-armed for 
the still pending M2");
+
+        tracker.close();
+    }
+
     @Test(dataProvider = "delayedTracker")
     public void 
testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
         tracker.addMessage(1, 1, 50);
@@ -284,4 +422,50 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
 
         tracker.getScheduledMessages(10);
     }
+
+    /**
+     * Covers the partial drain of a per-ledger entry id bitmap in 
getScheduledMessages, where the bitmap holds
+     * more entries than the remaining maxMessages budget (the cardinality > n 
branch): only the lowest n entry
+     * ids may be returned and the rest must stay tracked, without duplicates 
across calls.
+     */
+    @Test(dataProvider = "delayedTracker")
+    public void 
testGetScheduledMessagesWithMaxMessagesSmallerThanBucket(InMemoryDelayedDeliveryTracker
 tracker)
+            throws Exception {
+        clockTime.set(0);
+
+        // Two ledgers within the same delivery bucket: ledger 1 with 2 
entries, ledger 2 with 5 entries.
+        assertTrue(tracker.addMessage(1, 0, 10));
+        assertTrue(tracker.addMessage(1, 1, 10));
+        for (int entryId = 0; entryId < 5; entryId++) {
+            assertTrue(tracker.addMessage(2, entryId, 10));
+        }
+        assertEquals(tracker.getNumberOfDelayedMessages(), 7);
+
+        clockTime.set(10);
+
+        // maxMessages drains ledger 1 fully and ledger 2 partially 
(cardinality > n on ledger 2's bitmap).
+        Set<Position> scheduled = tracker.getScheduledMessages(4);
+        assertEquals(scheduled, Set.of(
+                PositionFactory.create(1, 0),
+                PositionFactory.create(1, 1),
+                PositionFactory.create(2, 0),
+                PositionFactory.create(2, 1)));
+        assertEquals(tracker.getNumberOfDelayedMessages(), 3);
+
+        // Another partial drain of ledger 2's remaining entries (cardinality 
> n again): continues with the
+        // next lowest entry ids, no duplicates from the previous call.
+        scheduled = tracker.getScheduledMessages(2);
+        assertEquals(scheduled, Set.of(
+                PositionFactory.create(2, 2),
+                PositionFactory.create(2, 3)));
+        assertEquals(tracker.getNumberOfDelayedMessages(), 1);
+
+        // The last remaining entry is returned and the tracker is emptied.
+        scheduled = tracker.getScheduledMessages(10);
+        assertEquals(scheduled, Set.of(PositionFactory.create(2, 4)));
+        assertEquals(tracker.getNumberOfDelayedMessages(), 0);
+        assertFalse(tracker.hasMessageAvailable());
+
+        tracker.close();
+    }
 }

Reply via email to