dao-jun commented on code in PR #25984:
URL: https://github.com/apache/pulsar/pull/25984#discussion_r3398448012
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -816,4 +840,58 @@ public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
return stats.genTopicMetricMap();
}
+
+ /**
+ * Delete orphaned bucket snapshots whose ledger range lies entirely
before the earliest
+ * surviving ledger. Buckets are deleted sequentially; the chain stops on
first failure
+ * to avoid wasted work when storage is unavailable.
+ */
+ private synchronized CompletableFuture<Void> asyncTrimImmutableBuckets() {
+ ManagedLedger ledger = context.getCursor().getManagedLedger();
+ if (ledger == null || ledger.getLedgersInfo().isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ Long firstLedgerId = ledger.getLedgersInfo().firstKey();
+ if (null == firstLedgerId) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ Map<Range<Long>, ImmutableBucket> toBeDeletedBuckets =
immutableBuckets.asMapOfRanges().entrySet().stream()
+ .filter(e -> e.getKey().upperEndpoint() < firstLedgerId)
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ if (toBeDeletedBuckets.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ String ledgerName = ledger.getName();
+ CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
+ for (Map.Entry<Range<Long>, ImmutableBucket> entry :
toBeDeletedBuckets.entrySet()) {
+ chain = chain.thenCompose(__ ->
+ deleteBucketSnapshot(ledgerName, entry.getKey(),
entry.getValue()));
+ }
+ return chain;
+ }
+
+ private CompletableFuture<Void> deleteBucketSnapshot(String ledgerName,
+ Range<Long> range,
ImmutableBucket bucket) {
+ synchronized (this) {
+ immutableBuckets.remove(range);
+
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
+ }
Review Comment:
Not clearing the data of `sharedBucketPriorityQueue` is intentionally
designed, as `ManagedCursorImpl.asyncReplayEntries` will filter out invalid
Positions.
I did miss cleaning the `snapshotSegmentLastIndexMap`, this is a problem.
Double decrease `numberDelayedMessages` is indeed a problem, good catch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]