lhotari commented on code in PR #25592:
URL: https://github.com/apache/pulsar/pull/25592#discussion_r3159016462


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -362,17 +361,6 @@ public synchronized void readMoreEntries() {
         // increment the counter for readMoreEntries calls, to track the 
number of times readMoreEntries is called
         readMoreEntriesCallCount++;
 
-        // remove possible expired messages from redelivery tracker and 
pending acks
-        Position markDeletePosition = cursor.getMarkDeletedPosition();
-        if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) 
{

Review Comment:
   @nodece you're right that retention trimming isn't the path I should have 
flagged. I read through the code and made an analysis with Claude Code. There's 
a remaining gap in BacklogQuotaManager handling.
   
   Assisted analysis of `internalTrimLedgers` by Claude Code:
   
   - The slowest-cursor heap that drives `cursors.getSlowestCursorPosition()` 
only contains **durable** cursors. 
[`ManagedLedgerImpl#addCursor`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L744-L753)
 passes `positionForOrdering = null` for non-durable cursors, and 
[`ManagedCursorContainerImpl#add`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerImpl.java#L85-L103)
 skips adding to the heap when position is null.
   - So the slowest durable cursor's mark-delete position *defines* the trim 
boundary in 
[`internalTrimLedgers`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3008-L3044),
 and `ledgersToDelete` only contains ledgers strictly before it (with the 
boundary case when the slowest is on the last entry of a ledger, allowing that 
ledger to be deleted).
   - In 
[`advanceCursorsIfNecessary`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3241-L3275),
 the strict-greater check `highestPositionToDelete > 
cursor.getMarkDeletedPosition()` is therefore always false for any durable 
cursor (their mark-delete is by definition ≥ the slowest durable cursor's, 
which already bounds `highestPositionToDelete`). Only **non-durable** cursors — 
Reader/compaction-style — can fall behind and be advanced here. The misleading 
rename of the method from `advanceNonDurableCursors` plus the missing javadoc 
from #10667 is what threw me off.
   
   So retention trimming doesn't move durable subscription mark-delete forward, 
and the PR doesn't need to handle it. My review comment was wrong on that point.
   
   ---
   
   Assisted analysis of BacklogQuotaManager:
   
   That said, while verifying this I noticed a related gap on the 
**`BacklogQuotaManager`** side that I think the PR does miss. The new 
`Dispatcher#markDeletePositionMoveForward` hook is wired in three places:
   
   | Source | Location | Triggering API |
   |---|---|---|
   | Subscription clear-backlog | 
[`PersistentSubscription#clearBacklog`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L780)
 | `cursor.asyncClearBacklog(...)` |
   | Subscription skip-messages | 
[`PersistentSubscription#skipMessages`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L823)
 | `cursor.asyncSkipEntries(...)` |
   | Expiry monitor | 
[`PersistentMessageExpiryMonitor#findEntryComplete`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java#L235)
 | `cursor.asyncMarkDelete(...)` |
   
   But 
[`BacklogQuotaManager`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java)
 advances durable subscription cursors directly through the `ManagedCursor` 
API, bypassing the `PersistentSubscription` wrappers:
   
   | BQM path | Advancement call | Hook fires? |
   |---|---|---|
   | 
[`dropBacklogForSizeLimit`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L190)
 | `slowestConsumer.skipEntries(messagesToSkip, 
IndividualDeletedEntries.Include)` — directly on `ManagedCursor`, bypassing 
`PersistentSubscription#skipMessages` | ❌ No |
   | [`dropBacklogForTimeLimit` 
precise=true](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L232)
 | `subscription.getExpiryMonitor().expireMessages(target)` | ✅ Yes (via expiry 
monitor) |
   | [`dropBacklogForTimeLimit` 
precise=false](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L255-L265)
 | `slowestConsumer.markDelete(nextPosition)` — directly on `ManagedCursor`, no 
subscription-level callback | ❌ No |
   
   So two of the three quota-eviction paths advance the durable subscription 
mark-delete without firing the new cleanup hook. Pre-PR these were silently 
cleaned up by the per-dispatch loop in `readMoreEntries` that this PR removes, 
so it's a regression introduced here. Triggers under 
`BacklogQuota.RetentionPolicy.consumer_backlog_eviction`. Side-effects when it 
does trigger:
   
   - `Consumer.pendingAcks` accumulates entries for already-mark-deleted 
positions until the consumer disconnects.
   - `Consumer.unackedMessages` counter drifts (entries the consumer "still 
owes" but the broker has already discarded).
   - `redeliveryMessages` may retain stale entries.
   
   Two ways to close it:
   
   1. Have `BacklogQuotaManager` route through 
`PersistentSubscription#skipMessages` (and an analogous subscription-level 
"markDelete forward" method) so the existing callbacks fire, or
   2. Have `BacklogQuotaManager` invoke 
`dispatcher.markDeletePositionMoveForward()` directly after each eviction step.
   
   Option 1 is cleaner architecturally; option 2 is a smaller diff. Either is 
fine with me — happy to take this in a follow-up if you'd prefer to keep this 
PR scoped to its current changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to