lhotari opened a new issue, #25613:
URL: https://github.com/apache/pulsar/issues/25613

   PR #25592 causes a regression in pending acks cleanup which is essential for 
at least Key_Shared subscriptions. There's a need to cover the gap in pending 
acks cleanup for BacklogQuotaManager.
   
   ---
   
   Assisted analysis of BacklogQuotaManager:
   
   That said, while verifying this I noticed a related gap on the 
**`BacklogQuotaManager`** side that I think the PR does miss. The new 
`Dispatcher#markDeletePositionMoveForward` hook is wired in three places:
   
   | Source | Location | Triggering API |
   |---|---|---|
   | Subscription clear-backlog | 
[`PersistentSubscription#clearBacklog`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L780)
 | `cursor.asyncClearBacklog(...)` |
   | Subscription skip-messages | 
[`PersistentSubscription#skipMessages`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L823)
 | `cursor.asyncSkipEntries(...)` |
   | Expiry monitor | 
[`PersistentMessageExpiryMonitor#findEntryComplete`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java#L235)
 | `cursor.asyncMarkDelete(...)` |
   
   But 
[`BacklogQuotaManager`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java)
 advances durable subscription cursors directly through the `ManagedCursor` 
API, bypassing the `PersistentSubscription` wrappers:
   
   | BQM path | Advancement call | Hook fires? |
   |---|---|---|
   | 
[`dropBacklogForSizeLimit`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L190)
 | `slowestConsumer.skipEntries(messagesToSkip, 
IndividualDeletedEntries.Include)` — directly on `ManagedCursor`, bypassing 
`PersistentSubscription#skipMessages` | ❌ No |
   | [`dropBacklogForTimeLimit` 
precise=true](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L232)
 | `subscription.getExpiryMonitor().expireMessages(target)` | ✅ Yes (via expiry 
monitor) |
   | [`dropBacklogForTimeLimit` 
precise=false](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L255-L265)
 | `slowestConsumer.markDelete(nextPosition)` — directly on `ManagedCursor`, no 
subscription-level callback | ❌ No |
   
   So two of the three quota-eviction paths advance the durable subscription 
mark-delete without firing the new cleanup hook. Pre-PR these were silently 
cleaned up by the per-dispatch loop in `readMoreEntries` that this PR removes, 
so it's a regression introduced here. Triggers under 
`BacklogQuota.RetentionPolicy.consumer_backlog_eviction`. Side-effects when it 
does trigger:
   
   - `Consumer.pendingAcks` accumulates entries for already-mark-deleted 
positions until the consumer disconnects.
   - `Consumer.unackedMessages` counter drifts (entries the consumer "still 
owes" but the broker has already discarded).
   - `redeliveryMessages` may retain stale entries.
   
   Two ways to close it:
   
   1. Have `BacklogQuotaManager` route through 
`PersistentSubscription#skipMessages` (and an analogous subscription-level 
"markDelete forward" method) so the existing callbacks fire, or
   2. Have `BacklogQuotaManager` invoke 
`dispatcher.markDeletePositionMoveForward()` directly after each eviction step.
   
   Option 1 is cleaner architecturally; option 2 is a smaller diff. Either is 
fine with me — happy to take this in a follow-up if you'd prefer to keep this 
PR scoped to its current changes.
   
   _Originally posted by @lhotari in 
https://github.com/apache/pulsar/pull/25592#discussion_r3159016462_
               


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to