dao-jun opened a new pull request, #25984:
URL: https://github.com/apache/pulsar/pull/25984
### Motivation
When a topic's managed ledger trims old ledgers (via retention or
compaction), the immutable bucket snapshots in BucketDelayedDeliveryTracker
that cover those deleted ledger ranges become orphaned — their underlying
message data no longer exists, yet they:
1. Continue to occupy storage as bucket snapshots.
2. Count against the maxNumBuckets limit, potentially triggering
unnecessary and expensive merge operations.
Prior to this change, the tracker only merged buckets when the count
exceeded the limit; it never cleaned up buckets whose ledger range had been
fully trimmed. This change introduces a trim step that runs before merge,
deleting orphaned buckets and decrementing the delayed-message counter
accordingly.
### Modifications
Source (BucketDelayedDeliveryTracker.java)
- Added trimFuture: A volatile CompletableFuture<Void> that represents the
currently in-flight trim/merge/clear operation. It acts as both a gate
(preventing concurrent trim chains) and a synchronization point for clear().
- Added asyncTrimImmutableBuckets(): Scans the managed ledger's surviving
ledger range. Buckets whose upperEndpoint falls entirely before the earliest
surviving ledger are selected for deletion — these are orphaned and safe to
remove.
- Added deleteBucketSnapshot(): Removes a single bucket from
immutableBuckets, asynchronously deletes its snapshot, and re-adds the bucket
on failure (rolling back the message count). Failures propagate via
CompletionException to stop the sequential deletion chain, avoiding wasted
attempts when storage is unavailable.
- Changed the merge trigger in addMessage(): Before merging, the tracker
now runs asyncTrimImmutableBuckets() first. The trimFuture gate ensures only
one trim/merge chain is in-flight at a time.
- Added early-return to asyncMergeBucketSnapshot(): If the bucket count is
already within the limit, merge returns immediately without scanning for merge
candidates.
- Rewrote clear(): Instead of immediately clearing state (which could race
with an in-flight trim's failure callback), clear() now chains itself after any
pending trimFuture. It also sets trimFuture to the clear chain, blocking new
trim triggers until the clear completes.
Test (BucketDelayedDeliveryTrackerTest.java)
Added 4 tests using a helper that injects a mocked ManagedLedger:
| Test | Scenario |
| -- | -- |
| testTrimRemovesOrphanedBuckets | Buckets covering ledgers below
firstLedgerId are removed, remaining ones have ranges ≥ firstLedgerId |
| testTrimHandlesDeleteFailure | Injected delete exception is consumed,
tracker remains operational |
| testTrimWithNoOrphanedBuckets | With firstLedgerId=0, trim is a no-op,
merge runs normally |
| testMergeEarlyReturnWhenWithinLimit | With maxNumBuckets far above
actual count, merge returns early without compaction |
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe
tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
### Does this pull request potentially affect one of the following parts:
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]