This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67d1e22070b KAFKA-20036 Handle LogCleaner segment overflow caused by 
compression level changes (#21379)
67d1e22070b is described below

commit 67d1e22070b88cf3beb16bd7e89a8c35d2521ce4
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 31 13:24:53 2026 +0800

    KAFKA-20036 Handle LogCleaner segment overflow caused by compression level 
changes (#21379)
    
    We add a new map to record which topic partitions have experienced
    overflow.  When an overflow occurs, the next time the group is
    processed, we reduce the segment size by a factor of 0.9 to prevent the
    overflow from happening again.  If the partition still overflows, we
    continue to multiply the ratio by 0.9 on subsequent attempts until the
    partition is successfully cleaned.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  98 ++++++++-----
 .../kafka/storage/internals/log/Cleaner.java       | 155 +++++++++++++++------
 .../kafka/storage/internals/log/LocalLog.java      |   8 +-
 3 files changed, 179 insertions(+), 82 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 926aa14ddef..c51cba6a66f 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1634,47 +1634,70 @@ class LogCleanerTest extends Logging {
   }
 
   @Test
-  def testSegmentWithOffsetOverflow(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
+  def testCleanedSegmentSizeOverflow(): Unit = {
+    // Put one record per source segment so each filterTo() call reads exactly 
one batch.
+    // After cleaning source segment 0 into currentCleaned, cleaning source 
segment 1 would push
+    // currentCleaned over maxCleanedSegmentSize, triggering overflow and
+    // rolling to a second cleaned segment.
     val logProps = new Properties()
-    logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
-    logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 1000: 
java.lang.Integer)
-    val config = LogConfig.fromProps(logConfig.originals, logProps)
+    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 4096: 
java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    log.appendAsLeader(record(0, 0), 0)
+    log.roll()
+    log.appendAsLeader(record(1, 1), 0)
+    log.roll()
 
-    LogTestUtils.initializeLogDirWithOverflowedSegment(dir)
+    val sourceSegments = log.logSegments.asScala.take(2).toSeq
+    val singleBatchSize = 
sourceSegments.head.log.batches.asScala.map(_.sizeInBytes).max
+    // maxCleanedSize allows exactly 1 batch; adding a 2nd batch overflows.
+    val maxCleanedSize = singleBatchSize.toLong + 1L
 
-    val log = makeLog(config = config, recoveryPoint = Long.MaxValue)
-    val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse 
{
-      throw new AssertionError("Failed to create log with a segment which has 
overflowed offsets")
-    }
+    // No deletions; both records are retained.
+    val cleaner = makeCleaner(Int.MaxValue, maxCleanedSegmentSize = 
maxCleanedSize)
+    val offsetMap = new FakeOffsetMap(Int.MaxValue)
+
+    // Before: sourceSegment0, sourceSegment1, activeSegment = 3 total
+    val segmentCountBefore = log.logSegments.size
 
-    val numSegmentsInitial = log.logSegments.size
-    val allKeys = LogTestUtils.keysInLog(log).toList
-    val expectedKeysAfterCleaning = new mutable.ArrayBuffer[Long]()
+    cleaner.cleanSegments(log, sourceSegments.asJava, offsetMap, 0L,
+      new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, 
sourceSegments.last.readNextOffset)
+
+    // With overflow, 2 source segments → 2 cleaned segments; net segment 
count unchanged.
+    // Without overflow, 2 source → 1 cleaned; net count would be 
segmentCountBefore - 1.
+    assertEquals(segmentCountBefore, log.logSegments.size)
+    assertEquals(List(0L, 1L), LogTestUtils.keysInLog(log).toList)
+    log.close()
+  }
 
-    // pretend we want to clean every alternate key
+  @Test
+  def testCleanedSegmentOffsetOverflow(): Unit = {
+    // Put one record per source segment so each filterTo() call reads exactly 
one batch.
+    // After cleaning source segment 0 into currentCleaned (offset 0), 
cleaning source segment 1
+    // would push the offset range of currentCleaned over 
maxCleanedOffsetRange, triggering
+    // overflow and rolling to a second cleaned segment.
+    val logProps = new Properties()
+    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 4096: 
java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    log.appendAsLeader(record(0, 0), 0)
+    log.roll()
+    log.appendAsLeader(record(1, 1), 0)
+    log.roll()
+
+    val sourceSegments = log.logSegments.asScala.take(2).toSeq
+    // Allow an offset range of 0: after offset 0 is written, any record at 
offset > 0 overflows.
+    val cleaner = makeCleaner(Int.MaxValue, maxCleanedOffsetRange = 0L)
     val offsetMap = new FakeOffsetMap(Int.MaxValue)
-    for (k <- 1 until allKeys.size by 2) {
-      expectedKeysAfterCleaning += allKeys(k - 1)
-      offsetMap.put(key(allKeys(k)), Long.MaxValue)
-    }
 
-    // Try to clean segment with offset overflow. This will trigger log split 
and the cleaning itself must abort.
-    assertThrows(classOf[LogCleaningAbortedException], () =>
-      cleaner.cleanSegments(log, util.List.of(segmentWithOverflow), offsetMap, 
0L, new CleanerStats(Time.SYSTEM),
-        new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
-    )
-    assertEquals(numSegmentsInitial + 1, log.logSegments.size)
-    assertEquals(allKeys, LogTestUtils.keysInLog(log))
-    assertFalse(LogTestUtils.hasOffsetOverflow(log))
-
-    // Clean each segment now that split is complete.
-    val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
-    for (segmentToClean <- log.logSegments.asScala)
-      cleaner.cleanSegments(log, util.List.of(segmentToClean), offsetMap, 0L, 
new CleanerStats(Time.SYSTEM),
-        new CleanedTransactionMetadata, -1, upperBoundOffset)
-    assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
-    assertFalse(LogTestUtils.hasOffsetOverflow(log))
+    val segmentCountBefore = log.logSegments.size
+
+    cleaner.cleanSegments(log, sourceSegments.asJava, offsetMap, 0L,
+      new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, 
sourceSegments.last.readNextOffset)
+
+    assertEquals(segmentCountBefore, log.logSegments.size,
+      "offset overflow should produce 2 cleaned segments, keeping total 
segment count the same")
+    assertEquals(List(0L, 1L), LogTestUtils.keysInLog(log).toList)
     log.close()
   }
 
@@ -2270,7 +2293,8 @@ class LogCleanerTest extends Logging {
     )
   }
 
-  private def makeCleaner(capacity: Int, checkDone: Consumer[TopicPartition] = 
_ => (), maxMessageSize: Int = 64*1024) =
+  private def makeCleaner(capacity: Int, checkDone: Consumer[TopicPartition] = 
_ => (), maxMessageSize: Int = 64*1024,
+                          maxCleanedSegmentSize: Long = Int.MaxValue, 
maxCleanedOffsetRange: Long = Int.MaxValue) =
     new Cleaner(0,
                 new FakeOffsetMap(capacity),
                 maxMessageSize,
@@ -2278,7 +2302,9 @@ class LogCleanerTest extends Logging {
                 0.75,
                 throttler,
                 time,
-                checkDone)
+                checkDone,
+                maxCleanedSegmentSize,
+                maxCleanedOffsetRange)
 
   private def writeToLog(log: UnifiedLog, seq: Iterable[(Int, Int)]): 
Iterable[Long] = {
     for ((key, value) <- seq)
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
index de7c8893abd..c926b5a5409 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.message.AbortedTxn;
+import 
org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch;
 import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.MutableRecordBatch;
@@ -44,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 /**
  * This class holds the actual logic for cleaning a log.
@@ -60,6 +62,8 @@ public class Cleaner {
     private final Throttler throttler;
     private final Time time;
     private final Consumer<TopicPartition> checkDone;
+    private final long maxCleanedSegmentSize;
+    private final long maxCleanedOffsetRange;
 
     /**
      * Buffer used for read i/o
@@ -90,6 +94,20 @@ public class Cleaner {
                    Throttler throttler,
                    Time time,
                    Consumer<TopicPartition> checkDone) {
+        this(id, offsetMap, ioBufferSize, maxIoBufferSize, 
dupBufferLoadFactor, throttler, time, checkDone, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+    }
+
+    // Only for testing
+    public Cleaner(int id,
+                   OffsetMap offsetMap,
+                   int ioBufferSize,
+                   int maxIoBufferSize,
+                   double dupBufferLoadFactor,
+                   Throttler throttler,
+                   Time time,
+                   Consumer<TopicPartition> checkDone,
+                   long maxCleanedSegmentSize,
+                   long maxCleanedOffsetRange) {
         this.id = id;
         this.offsetMap = offsetMap;
         this.ioBufferSize = ioBufferSize;
@@ -98,6 +116,8 @@ public class Cleaner {
         this.throttler = throttler;
         this.time = time;
         this.checkDone = checkDone;
+        this.maxCleanedSegmentSize = maxCleanedSegmentSize;
+        this.maxCleanedOffsetRange = maxCleanedOffsetRange;
         logger = new LogContext("Cleaner " + id + ": ").logger(Cleaner.class);
 
         readBuffer = ByteBuffer.allocate(ioBufferSize);
@@ -190,7 +210,12 @@ public class Cleaner {
     }
 
     /**
-     * Clean a group of segments into a single replacement segment.
+     * Clean a group of segments into one or more replacement segments.
+     *
+     * <p>
+     * If cleaning causes the destination segment's size or offset range to 
exceed the configured limit
+     * (e.g., due to recompression or combining multiple source segments), the 
current cleaned segment is
+     * finalized and a new one is started.
      *
      * @param log The log being cleaned
      * @param segments The group of segments being cleaned
@@ -202,21 +227,21 @@ public class Cleaner {
      * @param legacyDeleteHorizonMs The delete horizon used for tombstones 
whose version is less than 2
      * @param upperBoundOffsetOfCleaningRound The upper bound offset of this 
round of cleaning
      */
-    @SuppressWarnings("finally")
     public void cleanSegments(UnifiedLog log,
-                               List<LogSegment> segments,
-                               OffsetMap map,
-                               long currentTime,
-                               CleanerStats stats,
-                               CleanedTransactionMetadata transactionMetadata,
-                               long legacyDeleteHorizonMs,
-                               long upperBoundOffsetOfCleaningRound) throws 
IOException {
-        // create a new segment with a suffix appended to the name of the log 
and indexes
-        LogSegment cleaned = UnifiedLog.createNewCleanedSegment(log.dir(), 
log.config(), segments.get(0).baseOffset());
-        transactionMetadata.setCleanedIndex(Optional.of(cleaned.txnIndex()));
+                              List<LogSegment> segments,
+                              OffsetMap map,
+                              long currentTime,
+                              CleanerStats stats,
+                              CleanedTransactionMetadata transactionMetadata,
+                              long legacyDeleteHorizonMs,
+                              long upperBoundOffsetOfCleaningRound) throws 
IOException {
+        List<LogSegment> cleanedSegments = new ArrayList<>();
+
+        // Create initial cleaned segment with the base offset of the first 
source segment
+        LogSegment currentCleaned = 
UnifiedLog.createNewCleanedSegment(log.dir(), log.config(), 
segments.get(0).baseOffset());
+        
transactionMetadata.setCleanedIndex(Optional.of(currentCleaned.txnIndex()));
 
         try {
-            // clean segments into the new destination segment
             Iterator<LogSegment> iter = segments.iterator();
             Optional<LogSegment> currentSegmentOpt = Optional.of(iter.next());
             Map<Long, LastRecord> lastOffsetOfActiveProducers = 
log.lastRecordsOfActiveProducers();
@@ -236,15 +261,19 @@ public class Cleaner {
                 logger.info(
                         "Cleaning {} in log {} into {} with an upper bound 
deletion horizon {} computed from " +
                         "the segment last modified time of {},{} deletes.",
-                        currentSegment, log.name(), cleaned.baseOffset(), 
legacyDeleteHorizonMs, currentSegment.lastModified(),
+                        currentSegment, log.name(), 
currentCleaned.baseOffset(), legacyDeleteHorizonMs, 
currentSegment.lastModified(),
                         retainLegacyDeletesAndTxnMarkers ? "retaining" : 
"discarding"
                 );
 
-                try {
-                    cleanInto(
+                // Start cleaning from position 0
+                int position = 0;
+
+                while (true) {
+                    Optional<Integer> overflowOpt = cleanInto(
                             log.topicPartition(),
                             currentSegment.log(),
-                            cleaned,
+                            currentCleaned,
+                            position,
                             map,
                             retainLegacyDeletesAndTxnMarkers,
                             log.config().deleteRetentionMs,
@@ -255,45 +284,71 @@ public class Cleaner {
                             stats,
                             currentTime
                     );
-                } catch (LogSegmentOffsetOverflowException e) {
-                    // Split the current segment. It's also safest to abort 
the current cleaning process, so that we retry from
-                    // scratch once the split is complete.
-                    logger.info("Caught segment overflow error during 
cleaning: {}", e.getMessage());
-                    log.splitOverflowedSegment(currentSegment);
-                    throw new LogCleaningAbortedException();
+
+                    if (overflowOpt.isPresent()) {
+                        // Overflow detected - complete current segment and 
create new one
+                        logger.info("Completing cleaned segment {} due to 
overflow, creating new segment", currentCleaned.baseOffset());
+
+                        currentCleaned.onBecomeInactiveSegment();
+                        currentCleaned.flush();
+                        
currentCleaned.setLastModified(currentSegment.lastModified());
+                        cleanedSegments.add(currentCleaned);
+
+                        // Use the base offset of the next batch to be cleaned 
as the new segment's base offset.
+                        // We cannot use currentCleaned.readNextOffset() 
because compaction may leave holes
+                        // in the offset sequence, so the next batch's base 
offset could be much larger.
+                        int overflowPosition = overflowOpt.get();
+                        Iterator<FileChannelRecordBatch> nextBatches = 
currentSegment.log().batchesFrom(overflowPosition).iterator();
+                        long nextBaseOffset = nextBatches.hasNext() ? 
nextBatches.next().baseOffset() : currentCleaned.readNextOffset();
+                        currentCleaned = 
UnifiedLog.createNewCleanedSegment(log.dir(), log.config(), nextBaseOffset);
+                        
transactionMetadata.setCleanedIndex(Optional.of(currentCleaned.txnIndex()));
+
+                        logger.info("Created new cleaned segment with base 
offset {} for partition {}", nextBaseOffset, log.topicPartition());
+                        position = overflowPosition;
+                    } else {
+                        break;
+                    }
                 }
+
                 currentSegmentOpt = nextSegmentOpt;
             }
 
-            cleaned.onBecomeInactiveSegment();
-            // flush new segment to disk before swap
-            cleaned.flush();
+            // Process the final segment
+            currentCleaned.onBecomeInactiveSegment();
+            currentCleaned.flush();
 
             // update the modification date to retain the last modified date 
of the original files
             long modified = segments.get(segments.size() - 1).lastModified();
-            cleaned.setLastModified(modified);
+            currentCleaned.setLastModified(modified);
+
+            cleanedSegments.add(currentCleaned);
+
+            // swap in all cleaned segments (maybe multiple if overflow 
occurred)
+            logger.info("Swapping in cleaned segment(s) {} for segment(s) {} 
in log {}", cleanedSegments, segments, log);
+            log.replaceSegments(cleanedSegments, segments);
 
-            // swap in new segment
-            logger.info("Swapping in cleaned segment {} for segment(s) {} in 
log {}", cleaned, segments, log);
-            log.replaceSegments(List.of(cleaned), segments);
         } catch (LogCleaningAbortedException e) {
-            try {
-                cleaned.deleteIfExists();
-            } catch (Exception deleteException) {
-                e.addSuppressed(deleteException);
-            } finally {
-                throw e;
-            }
+            Stream.concat(cleanedSegments.stream(), Stream.of(currentCleaned))
+                .distinct()
+                .forEach(segment -> {
+                    try {
+                        segment.deleteIfExists();
+                    } catch (Exception deleteException) {
+                        e.addSuppressed(deleteException);
+                    }
+                });
+            throw e;
         }
     }
 
     /**
-     * Clean the given source log segment into the destination segment using 
the key=>offset mapping
-     * provided.
+     * Clean the given source log segment into destination segment using the 
key=>offset mapping
+     * provided, starting from the given position.
      *
      * @param topicPartition The topic and partition of the log segment to 
clean
      * @param sourceRecords The dirty log segment
      * @param dest The cleaned log segment
+     * @param startPosition Starting position in sourceRecords (in bytes)
      * @param map The key=>offset mapping
      * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
      * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
@@ -303,10 +358,14 @@ public class Cleaner {
      * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in 
the source segment
      * @param stats Collector for cleaning statistics
      * @param currentTime The time at which the clean was initiated
+     *
+     * @return {@code Optional.of(position)} if the destination segment would 
overflow (position is where overflow
+     *         was detected in the source), or {@code Optional.empty()} if 
cleaning completed normally
      */
-    private void cleanInto(TopicPartition topicPartition,
+    private Optional<Integer> cleanInto(TopicPartition topicPartition,
                            FileRecords sourceRecords,
                            LogSegment dest,
+                           int startPosition,
                            OffsetMap map,
                            boolean retainLegacyDeletesAndTxnMarkers,
                            long deleteRetentionMs,
@@ -379,7 +438,7 @@ public class Cleaner {
             }
         };
 
-        int position = 0;
+        int position = startPosition;
         while (position < sourceRecords.sizeInBytes()) {
             checkDone.accept(topicPartition);
             // read a chunk of messages and copy any that are to be retained 
to the write buffer to be written out
@@ -401,6 +460,19 @@ public class Cleaner {
             if (outputBuffer.position() > 0) {
                 outputBuffer.flip();
                 MemoryRecords retained = 
MemoryRecords.readableRecords(outputBuffer);
+
+                // While groupSegmentsBySize() ensures source segments don't 
exceed Integer.MAX_VALUE,
+                // recompression during cleaning can cause the cleaned segment 
to exceed that size.
+                // Similarly, combining multiple source segments into one 
cleaned segment can cause
+                // the offset range to exceed Integer.MAX_VALUE.
+                // Always allow the first write to an empty segment to avoid 
an infinite loop where
+                // a single oversized batch can never make progress.
+                boolean sizeOverflow = dest.size() > 0 && 
retained.sizeInBytes() > maxCleanedSegmentSize - dest.size();
+                boolean offsetOverflow = dest.size() > 0 && result.maxOffset() 
- dest.baseOffset() > maxCleanedOffsetRange;
+                if (sizeOverflow || offsetOverflow) {
+                    return Optional.of(position - result.bytesRead());
+                }
+
                 // it's OK not to hold the Log's lock in this case, because 
this segment is only accessed by other threads
                 // after `Log.replaceSegments` (which acquires the lock) is 
called
                 dest.append(result.maxOffset(), retained);
@@ -413,6 +485,7 @@ public class Cleaner {
                 growBuffersOrFail(sourceRecords, position, maxLogMessageSize, 
records);
         }
         restoreBuffers();
+        return Optional.empty();
     }
 
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index 0cc7f704d22..030a0d30594 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -1037,9 +1037,10 @@ public class LocalLog {
         // delete the old files
         List<LogSegment> deletedNotReplaced = new ArrayList<>();
         for (LogSegment segment : sortedOldSegments) {
-            // remove the index entry
-            if (segment.baseOffset() != sortedNewSegments.get(0).baseOffset()) 
{
+            // remove the index entry; skip removal for base offsets that a 
new segment is replacing in-place
+            if (!newSegmentBaseOffsets.contains(segment.baseOffset())) {
                 existingSegments.remove(segment.baseOffset());
+                deletedNotReplaced.add(segment);
             }
             deleteSegmentFiles(
                     List.of(segment),
@@ -1050,9 +1051,6 @@ public class LocalLog {
                     scheduler,
                     logDirFailureChannel,
                     logPrefix);
-            if (!newSegmentBaseOffsets.contains(segment.baseOffset())) {
-                deletedNotReplaced.add(segment);
-            }
         }
 
         // okay we are safe now, remove the swap suffix

Reply via email to