lhotari commented on code in PR #25592:
URL: https://github.com/apache/pulsar/pull/25592#discussion_r3159016462
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -362,17 +361,6 @@ public synchronized void readMoreEntries() {
// increment the counter for readMoreEntries calls, to track the
number of times readMoreEntries is called
readMoreEntriesCallCount++;
- // remove possible expired messages from redelivery tracker and
pending acks
- Position markDeletePosition = cursor.getMarkDeletedPosition();
- if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition)
{
Review Comment:
@nodece you're right that retention trimming isn't the path I should have
flagged. I read through the code and made an analysis with Claude Code. There's
a remaining gap in BacklogQuotaManager handling.
Assisted analysis of `internalTrimLedgers` by Claude Code:
- The slowest-cursor heap that drives `cursors.getSlowestCursorPosition()`
only contains **durable** cursors.
[`ManagedLedgerImpl#addCursor`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L744-L753)
passes `positionForOrdering = null` for non-durable cursors, and
[`ManagedCursorContainerImpl#add`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerImpl.java#L85-L103)
skips adding to the heap when position is null.
- So the slowest durable cursor's mark-delete position *defines* the trim
boundary in
[`internalTrimLedgers`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3008-L3044),
and `ledgersToDelete` only contains ledgers strictly before it (with the
boundary case when the slowest is on the last entry of a ledger, allowing that
ledger to be deleted).
- In
[`advanceCursorsIfNecessary`](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3241-L3275),
the strict-greater check `highestPositionToDelete >
cursor.getMarkDeletedPosition()` is therefore always false for any durable
cursor (their mark-delete is by definition ≥ the slowest durable cursor's,
which already bounds `highestPositionToDelete`). Only **non-durable** cursors —
Reader/compaction-style — can fall behind and be advanced here. The misleading
rename of the method from `advanceNonDurableCursors` plus the missing javadoc
from #10667 is what threw me off.
So retention trimming doesn't move durable subscription mark-delete forward,
and the PR doesn't need to handle it. My review comment was wrong on that point.
---
Assisted analysis of BacklogQuotaManager:
That said, while verifying this I noticed a related gap on the
**`BacklogQuotaManager`** side that I think the PR does miss. The new
`Dispatcher#markDeletePositionMoveForward` hook is wired in three places:
| Source | Location | Triggering API |
|---|---|---|
| Subscription clear-backlog |
[`PersistentSubscription#clearBacklog`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L780)
| `cursor.asyncClearBacklog(...)` |
| Subscription skip-messages |
[`PersistentSubscription#skipMessages`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L823)
| `cursor.asyncSkipEntries(...)` |
| Expiry monitor |
[`PersistentMessageExpiryMonitor#findEntryComplete`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java#L235)
| `cursor.asyncMarkDelete(...)` |
But
[`BacklogQuotaManager`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java)
advances durable subscription cursors directly through the `ManagedCursor`
API, bypassing the `PersistentSubscription` wrappers:
| BQM path | Advancement call | Hook fires? |
|---|---|---|
|
[`dropBacklogForSizeLimit`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L190)
| `slowestConsumer.skipEntries(messagesToSkip,
IndividualDeletedEntries.Include)` — directly on `ManagedCursor`, bypassing
`PersistentSubscription#skipMessages` | ❌ No |
| [`dropBacklogForTimeLimit`
precise=true](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L232)
| `subscription.getExpiryMonitor().expireMessages(target)` | ✅ Yes (via expiry
monitor) |
| [`dropBacklogForTimeLimit`
precise=false](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L255-L265)
| `slowestConsumer.markDelete(nextPosition)` — directly on `ManagedCursor`, no
subscription-level callback | ❌ No |
So two of the three quota-eviction paths advance the durable subscription
mark-delete without firing the new cleanup hook. Pre-PR these were silently
cleaned up by the per-dispatch loop in `readMoreEntries` that this PR removes,
so it's a regression introduced here. Triggers under
`BacklogQuota.RetentionPolicy.consumer_backlog_eviction`. Side-effects when it
does trigger:
- `Consumer.pendingAcks` accumulates entries for already-mark-deleted
positions until the consumer disconnects.
- `Consumer.unackedMessages` counter drifts (entries the consumer "still
owes" but the broker has already discarded).
- `redeliveryMessages` may retain stale entries.
Two ways to close it:
1. Have `BacklogQuotaManager` route through
`PersistentSubscription#skipMessages` (and an analogous subscription-level
"markDelete forward" method) so the existing callbacks fire, or
2. Have `BacklogQuotaManager` invoke
`dispatcher.markDeletePositionMoveForward()` directly after each eviction step.
Option 1 is cleaner architecturally; option 2 is a smaller diff. Either is
fine with me — happy to take this in a follow-up if you'd prefer to keep this
PR scoped to its current changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]