lhotari opened a new issue, #25613: URL: https://github.com/apache/pulsar/issues/25613
PR #25592 causes a regression in pending acks cleanup which is essential for at least Key_Shared subscriptions. There's a need to cover the gap in pending acks cleanup for BacklogQuotaManager. --- Assisted analysis of BacklogQuotaManager: That said, while verifying this I noticed a related gap on the **`BacklogQuotaManager`** side that I think the PR does miss. The new `Dispatcher#markDeletePositionMoveForward` hook is wired in three places: | Source | Location | Triggering API | |---|---|---| | Subscription clear-backlog | [`PersistentSubscription#clearBacklog`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L780) | `cursor.asyncClearBacklog(...)` | | Subscription skip-messages | [`PersistentSubscription#skipMessages`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L823) | `cursor.asyncSkipEntries(...)` | | Expiry monitor | [`PersistentMessageExpiryMonitor#findEntryComplete`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java#L235) | `cursor.asyncMarkDelete(...)` | But [`BacklogQuotaManager`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java) advances durable subscription cursors directly through the `ManagedCursor` API, bypassing the `PersistentSubscription` wrappers: | BQM path | Advancement call | Hook fires? | |---|---|---| | [`dropBacklogForSizeLimit`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L190) | `slowestConsumer.skipEntries(messagesToSkip, IndividualDeletedEntries.Include)` — directly on `ManagedCursor`, bypassing `PersistentSubscription#skipMessages` | ❌ No | | [`dropBacklogForTimeLimit` precise=true](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L232) | `subscription.getExpiryMonitor().expireMessages(target)` | ✅ Yes (via expiry monitor) | | [`dropBacklogForTimeLimit` precise=false](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L255-L265) | `slowestConsumer.markDelete(nextPosition)` — directly on `ManagedCursor`, no subscription-level callback | ❌ No | So two of the three quota-eviction paths advance the durable subscription mark-delete without firing the new cleanup hook. Pre-PR these were silently cleaned up by the per-dispatch loop in `readMoreEntries` that this PR removes, so it's a regression introduced here. Triggers under `BacklogQuota.RetentionPolicy.consumer_backlog_eviction`. Side-effects when it does trigger: - `Consumer.pendingAcks` accumulates entries for already-mark-deleted positions until the consumer disconnects. - `Consumer.unackedMessages` counter drifts (entries the consumer "still owes" but the broker has already discarded). - `redeliveryMessages` may retain stale entries. Two ways to close it: 1. Have `BacklogQuotaManager` route through `PersistentSubscription#skipMessages` (and an analogous subscription-level "markDelete forward" method) so the existing callbacks fire, or 2. Have `BacklogQuotaManager` invoke `dispatcher.markDeletePositionMoveForward()` directly after each eviction step. Option 1 is cleaner architecturally; option 2 is a smaller diff. Either is fine with me — happy to take this in a follow-up if you'd prefer to keep this PR scoped to its current changes. _Originally posted by @lhotari in https://github.com/apache/pulsar/pull/25592#discussion_r3159016462_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
