chia7712 commented on code in PR #21379:
URL: https://github.com/apache/kafka/pull/21379#discussion_r2797080701


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java:
##########
@@ -169,9 +176,17 @@ public Map.Entry<Long, CleanerStats> doClean(LogToClean 
cleanable, long currentT
                 log.name(), new Date(cleanableHorizonMs), new 
Date(legacyDeleteHorizonMs));
         CleanedTransactionMetadata transactionMetadata = new 
CleanedTransactionMetadata();
 
+        double sizeRatio = 
segmentOverflowPartitions.getOrDefault(log.topicPartition(), 1.0);
+        if (sizeRatio != 1.0) {
+            logger.info("Partition {} has overflow history. " + "Reducing 
effective segment size to {}% for this round.",
+                    log.topicPartition(), sizeRatio * 100);
+        }
+
+        int effectiveMaxSize = (int) (log.config().segmentSize() * sizeRatio);
+
         List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
                 log.logSegments(0, endOffset),
-                log.config().segmentSize(),
+                effectiveMaxSize,

Review Comment:
   That is a good point. We could follow the approach used for handling offset 
overflow: split the segment and then restart the cleanup. The trade-off is that 
the first hale of the segment will be cleaned in isolation, so there might be 
little to nothing to clean up :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to