This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new de7e0ebb441 KAFKA-20267: Move LogCleanerParameterizedIntegrationTest 
to storage module (#21657)
de7e0ebb441 is described below

commit de7e0ebb441bd5e048dd5e8c757d2cb22f2394a1
Author: Maros Orsak <[email protected]>
AuthorDate: Fri Mar 27 10:53:45 2026 +0100

    KAFKA-20267: Move LogCleanerParameterizedIntegrationTest to storage module 
(#21657)
    
    This PR is last/final which concludes the migration of
    AbstractLogCleanerIntegrationTest.scala and its sub test suites merging
    into one compact test suite.
    
    ---------
    
    Signed-off-by: see-quick <[email protected]>
    Reviewers: Mickael Maison <[email protected]>, Federico Valeri 
<[email protected]>
---
 .../log/AbstractLogCleanerIntegrationTest.scala    | 175 -------
 .../LogCleanerParameterizedIntegrationTest.scala   | 379 ----------------
 ...ionTest.java => LogCleanerIntegrationTest.java} | 503 +++++++++++++++++++--
 3 files changed, 470 insertions(+), 587 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
deleted file mode 100644
index c2310da3d98..00000000000
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log
-
-import kafka.utils.TestUtils
-import kafka.utils.Implicits._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch, 
RecordVersion}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, 
LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.{AfterEach, Tag}
-
-import java.io.File
-import java.nio.file.Files
-import java.util
-import java.util.{Optional, Properties}
-import scala.collection.Seq
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
-
-@Tag("integration")
-abstract class AbstractLogCleanerIntegrationTest {
-
-  var cleaner: LogCleaner = _
-  val logDir = TestUtils.tempDir()
-
-  private val logs = ListBuffer.empty[UnifiedLog]
-  private val defaultMaxMessageSize = 128
-  private val defaultMinCleanableDirtyRatio = 0.0F
-  private val defaultMinCompactionLagMS = 0L
-  private val defaultDeleteDelay = 1000
-  private val defaultSegmentSize = 2048
-  private val defaultMaxCompactionLagMs = Long.MaxValue
-
-  def time: MockTime
-
-  @AfterEach
-  def teardown(): Unit = {
-    if (cleaner != null)
-      cleaner.shutdown()
-    time.scheduler.shutdown()
-    logs.foreach(_.close())
-    Utils.delete(logDir)
-  }
-
-  def logConfigProperties(propertyOverrides: Properties = new Properties(),
-                          maxMessageSize: Int,
-                          minCleanableDirtyRatio: Float = 
defaultMinCleanableDirtyRatio,
-                          minCompactionLagMs: Long = defaultMinCompactionLagMS,
-                          deleteDelay: Int = defaultDeleteDelay,
-                          segmentSize: Int = defaultSegmentSize,
-                          maxCompactionLagMs: Long = 
defaultMaxCompactionLagMs): Properties = {
-    val props = new Properties()
-    props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize: 
java.lang.Integer)
-    props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: 
java.lang.Integer)
-    props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100*1024: 
java.lang.Integer)
-    props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: 
java.lang.Integer)
-    props.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    props.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 
minCleanableDirtyRatio: java.lang.Float)
-    props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, minCompactionLagMs: 
java.lang.Long)
-    props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, maxCompactionLagMs: 
java.lang.Long)
-    props ++= propertyOverrides
-    props
-  }
-
-  def makeCleaner(partitions: Iterable[TopicPartition],
-                  minCleanableDirtyRatio: Float = 
defaultMinCleanableDirtyRatio,
-                  numThreads: Int = 1,
-                  backoffMs: Long = 15000L,
-                  maxMessageSize: Int = defaultMaxMessageSize,
-                  minCompactionLagMs: Long = defaultMinCompactionLagMS,
-                  deleteDelay: Int = defaultDeleteDelay,
-                  segmentSize: Int = defaultSegmentSize,
-                  maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
-                  cleanerIoBufferSize: Option[Int] = None,
-                  propertyOverrides: Properties = new Properties()): 
LogCleaner = {
-
-    val logMap = new util.concurrent.ConcurrentHashMap[TopicPartition, 
UnifiedLog]()
-    for (partition <- partitions) {
-      val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
-      Files.createDirectories(dir.toPath)
-
-      val logConfig = new LogConfig(logConfigProperties(propertyOverrides,
-        maxMessageSize = maxMessageSize,
-        minCleanableDirtyRatio = minCleanableDirtyRatio,
-        minCompactionLagMs = minCompactionLagMs,
-        deleteDelay = deleteDelay,
-        segmentSize = segmentSize,
-        maxCompactionLagMs = maxCompactionLagMs))
-      val log = UnifiedLog.create(
-        dir,
-        logConfig,
-        0L,
-        0L,
-        time.scheduler,
-        new BrokerTopicStats,
-        time,
-        5 * 60 * 1000,
-        new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
-        TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
-        new LogDirFailureChannel(10),
-        true,
-        Optional.empty)
-      logMap.put(partition, log)
-      this.logs += log
-    }
-
-    val cleanerConfig = new CleanerConfig(
-      numThreads,
-      4 * 1024 * 1024L,
-      0.9,
-      cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
-      maxMessageSize,
-      Double.MaxValue,
-      backoffMs,
-      true)
-    new LogCleaner(cleanerConfig,
-      util.List.of(logDir),
-      logMap,
-      new LogDirFailureChannel(1),
-      time)
-  }
-
-  private var ctr = 0
-  def counter: Int = ctr
-  def incCounter(): Unit = ctr += 1
-
-  def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: 
Compression,
-                startKey: Int = 0, magicValue: Byte = 
RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
-    for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) 
yield {
-      val value = counter.toString
-      val appendInfo = 
log.appendAsLeaderWithRecordVersion(TestUtils.singletonRecords(value = 
value.getBytes, codec = codec,
-        key = key.toString.getBytes, magicValue = magicValue), 0, 
RecordVersion.lookup(magicValue))
-      // move LSO forward to increase compaction bound
-      log.updateHighWatermark(log.logEndOffset)
-      incCounter()
-      (key, value, appendInfo.firstOffset)
-    }
-  }
-
-  def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte, codec: 
Compression): (String, MemoryRecords) = {
-    def messageValue(length: Int): String = {
-      val random = new Random(0)
-      new String(random.alphanumeric.take(length).toArray)
-    }
-    val value = messageValue(128)
-    val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec 
= codec, key = key.toString.getBytes,
-      magicValue = messageFormatVersion)
-    (value, messageSet)
-  }
-
-  def closeLog(log: UnifiedLog): Unit = {
-    log.close()
-    logs -= log
-  }
-}
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
deleted file mode 100755
index 1a4436fd5c0..00000000000
--- 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.File
-import java.util.{Optional, Properties}
-import kafka.server.KafkaConfig
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig, 
LogCleanerManager, LogConfig, UnifiedLog}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.extension.ExtensionContext
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, 
ArgumentsSource}
-
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-/**
- * This is an integration test that tests the fully integrated log cleaner
- */
-class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrationTest {
-
-  val time = new MockTime()
-
-  val topicPartitions = Array(new TopicPartition("log", 0), new 
TopicPartition("log", 1), new TopicPartition("log", 2))
-
-  @ParameterizedTest
-  
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
-  def cleanerTest(compressionType: CompressionType): Unit = {
-    val codec: Compression = Compression.of(compressionType).build()
-    val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, 
codec)
-    val maxMessageSize = largeMessageSet.sizeInBytes
-
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize)
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = 
codec)
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.asScala.map(_.size).sum
-    assertTrue(startSize > compactedSize, s"log should have been compacted: 
startSize=$startSize compactedSize=$compactedSize")
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-
-    val appendInfo = log.appendAsLeader(largeMessageSet, 0)
-    // move LSO forward to increase compaction bound
-    log.updateHighWatermark(log.logEndOffset)
-    val largeMessageOffset = appendInfo.firstOffset
-
-    val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, 
numDups = 3, log = log, codec = codec)
-    val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, 
largeMessageOffset)) ++ dups
-    val firstDirty2 = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty2)
-
-    checkLogAfterAppendingDups(log, startSize, appends2)
-
-    // simulate deleting a partition, by removing it from logs
-    // force a checkpoint
-    // and make sure its gone from checkpoint file
-    cleaner.logs.remove(topicPartitions(0))
-    cleaner.updateCheckpoints(logDir, Optional.of(topicPartitions(0)))
-    val checkpoints = new OffsetCheckpointFile(new File(logDir, 
LogCleanerManager.OFFSET_CHECKPOINT_FILE), null).read()
-    // we expect partition 0 to be gone
-    assertFalse(checkpoints.containsKey(topicPartitions(0)))
-  }
-
-  @ParameterizedTest
-  
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
-  def testCleansCombinedCompactAndDeleteTopic(compressionType: 
CompressionType): Unit = {
-    val logProps  = new Properties()
-    val retentionMs: Integer = 100000
-    logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: Integer)
-    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
-
-    def runCleanerAndCheckCompacted(numKeys: Int): (UnifiedLog, Seq[(Int, 
String, Long)]) = {
-      cleaner = makeCleaner(partitions = topicPartitions.take(1), 
propertyOverrides = logProps, backoffMs = 100L)
-      val log = cleaner.logs.get(topicPartitions(0))
-
-      val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, 
codec = Compression.of(compressionType).build())
-      val startSize = log.size
-
-      log.updateHighWatermark(log.logEndOffset)
-
-      val firstDirty = log.activeSegment.baseOffset
-      cleaner.startup()
-
-      // should compact the log
-      checkLastCleaned("log", 0, firstDirty)
-      val compactedSize = log.logSegments.asScala.map(_.size).sum
-      assertTrue(startSize > compactedSize, s"log should have been compacted: 
startSize=$startSize compactedSize=$compactedSize")
-      (log, messages)
-    }
-
-    val (log, _) = runCleanerAndCheckCompacted(100)
-
-    // Set the last modified time to an old value to force deletion of old 
segments
-    val endOffset = log.logEndOffset
-    log.logSegments.forEach(_.setLastModified(time.milliseconds - (2 * 
retentionMs)))
-    TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset && 
log.numberOfSegments == 1,
-      "Timed out waiting for deletion of old segments")
-
-    cleaner.shutdown()
-    closeLog(log)
-
-    // run the cleaner again to make sure if there are no issues post deletion
-    val (log2, messages) = runCleanerAndCheckCompacted(20)
-    val read = readFromLog(log2)
-    assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't 
change")
-  }
-
-  @ParameterizedTest
-  @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
-  def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): 
Unit = {
-    val compression = Compression.of(compressionType).build()
-    val largeMessageKey = 20
-    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression)
-    val maxMessageSize = compression match {
-      case Compression.NONE => largeMessageSet.sizeInBytes
-      case _ =>
-        // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
-        // increase because the broker offsets are larger than the ones 
assigned by the client
-        // adding `6` to the message set size is good enough for this test: it 
covers the increased message size while
-        // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
-        largeMessageSet.sizeInBytes + 6
-    }
-
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize)
-
-    val log = cleaner.logs.get(topicPartitions(0))
-    val props = logConfigProperties(maxMessageSize = maxMessageSize)
-    props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.name)
-    val logConfig = new LogConfig(props)
-    log.updateConfig(logConfig)
-
-    val appends1 = writeDups(numKeys = 100, numDups = 3, log = log, codec = 
compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.asScala.map(_.size).sum
-    assertTrue(startSize > compactedSize, s"log should have been compacted: 
startSize=$startSize compactedSize=$compactedSize")
-
-    checkLogAfterAppendingDups(log, startSize, appends1)
-
-    val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = 
compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    val appendInfo = log.appendAsLeaderWithRecordVersion(largeMessageSet, 0, 
RecordVersion.V0)
-    // move LSO forward to increase compaction bound
-    log.updateHighWatermark(log.logEndOffset)
-    val largeMessageOffset = appendInfo.firstOffset
-
-    // also add some messages with version 1 and version 2 to check that we 
handle mixed format versions correctly
-    val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = 
log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
-    val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, 
codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V2)
-
-    val v0RecordKeysWithNoV1V2Updates = (appends1.map(_._1).toSet -- 
dupsV1.map(_._1) -- dupsV2.map(_._1)).map(_.toString)
-    val appends2: Seq[(Int, String, Long)] =
-      appends1 ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, 
largeMessageOffset)) ++ dupsV1 ++ dupsV2
-
-    // roll the log so that all appended messages can be compacted
-    log.roll()
-    val firstDirty2 = log.activeSegment.baseOffset
-    checkLastCleaned("log", 0, firstDirty2)
-
-    checkLogAfterAppendingDups(log, startSize, appends2)
-    checkLogAfterConvertingToV2(compressionType, log, 
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates)
-  }
-
-  @ParameterizedTest
-  @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
-  def testCleaningNestedMessagesWithV0V1(compressionType: CompressionType): 
Unit = {
-    val compression = Compression.of(compressionType).build()
-    val maxMessageSize = 192
-    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 
maxMessageSize, segmentSize = 256)
-
-    val log = cleaner.logs.get(topicPartitions(0))
-    val logConfig = new LogConfig(logConfigProperties(maxMessageSize = 
maxMessageSize, segmentSize = 256))
-    log.updateConfig(logConfig)
-
-    // with compression enabled, these messages will be written as a single 
message containing all the individual messages
-    var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = 
log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
-    appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups 
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
-
-    var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, 
numDups = 2, log = log, codec = compression, magicValue = 
RecordBatch.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups 
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
-    appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups 
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
-
-    val appends = appendsV0 ++ appendsV1
-
-    val v0RecordKeysWithNoV1V2Updates = (appendsV0.map(_._1).toSet -- 
appendsV1.map(_._1)).map(_.toString)
-
-    // roll the log so that all appended messages can be compacted
-    log.roll()
-    val startSize = log.size
-    cleaner.startup()
-
-    val firstDirty = log.activeSegment.baseOffset
-    assertTrue(firstDirty >= appends.size) // ensure we clean data from V0 and 
V1
-
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.asScala.map(_.size).sum
-    assertTrue(startSize > compactedSize, s"log should have been compacted: 
startSize=$startSize compactedSize=$compactedSize")
-
-    checkLogAfterAppendingDups(log, startSize, appends)
-    checkLogAfterConvertingToV2(compressionType, log, 
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates)
-  }
-
-  private def checkLogAfterConvertingToV2(compressionType: CompressionType, 
log: UnifiedLog, timestampType: TimestampType,
-                                          keysForV0RecordsWithNoV1V2Updates: 
Set[String]): Unit = {
-    for (segment <- log.logSegments.asScala; recordBatch <- 
segment.log.batches.asScala) {
-      // Uncompressed v0/v1 records are always converted into single record v2 
batches via compaction if they are retained
-      // Compressed v0/v1 record batches are converted into record batches v2 
with one or more records (depending on the
-      // number of retained records after compaction)
-      assertEquals(RecordVersion.V2.value, recordBatch.magic)
-      if (compressionType == CompressionType.NONE)
-        assertEquals(1, recordBatch.iterator().asScala.size)
-      else
-        assertTrue(recordBatch.iterator().asScala.nonEmpty)
-
-      val firstRecordKey = 
TestUtils.readString(recordBatch.iterator().next().key())
-      if (keysForV0RecordsWithNoV1V2Updates.contains(firstRecordKey))
-        assertEquals(TimestampType.CREATE_TIME, recordBatch.timestampType)
-      else
-        assertEquals(timestampType, recordBatch.timestampType)
-
-      recordBatch.iterator.asScala.foreach { record =>
-        val recordKey = TestUtils.readString(record.key)
-        if (keysForV0RecordsWithNoV1V2Updates.contains(recordKey))
-          assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp, "Record " + 
recordKey + " with unexpected timestamp ")
-        else
-          assertNotEquals(RecordBatch.NO_TIMESTAMP, record.timestamp, "Record 
" + recordKey + " with unexpected timestamp " + RecordBatch.NO_TIMESTAMP)
-      }
-    }
-  }
-
-  @ParameterizedTest
-  
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
-  def cleanerConfigUpdateTest(compressionType: CompressionType): Unit = {
-    val codec: Compression = Compression.of(compressionType).build()
-    val largeMessageKey = 20
-    val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, 
RecordBatch.CURRENT_MAGIC_VALUE, codec)
-    val maxMessageSize = largeMessageSet.sizeInBytes
-
-    cleaner = makeCleaner(partitions = topicPartitions, backoffMs = 1, 
maxMessageSize = maxMessageSize,
-      cleanerIoBufferSize = Some(1))
-    val log = cleaner.logs.get(topicPartitions(0))
-
-    writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
-    val startSize = log.size
-    cleaner.startup()
-    assertEquals(1, cleaner.cleanerCount)
-
-    // Verify no cleaning with LogCleanerIoBufferSizeProp=1
-    val firstDirty = log.activeSegment.baseOffset
-    val topicPartition = new TopicPartition("log", 0)
-    cleaner.awaitCleaned(topicPartition, firstDirty, 10)
-    assertTrue(cleaner.cleanerManager.allCleanerCheckpoints.isEmpty, "Should 
not have cleaned")
-
-    def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): 
KafkaConfig = {
-      val props = TestUtils.createBrokerConfig(0)
-      props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, 
cleanerConfig.numThreads.toString)
-      props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
cleanerConfig.dedupeBufferSize.toString)
-      props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, 
cleanerConfig.dedupeBufferLoadFactor.toString)
-      props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, 
cleanerConfig.ioBufferSize.toString)
-      props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, 
cleanerConfig.maxMessageSize.toString)
-      props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, 
cleanerConfig.backoffMs.toString)
-      props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, 
cleanerConfig.maxIoBytesPerSecond.toString)
-      KafkaConfig.fromProps(props)
-    }
-
-    // Verify cleaning done with larger LogCleanerIoBufferSizeProp
-    val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
-    val newConfig = kafkaConfigWithCleanerConfig(new CleanerConfig(2,
-      cleaner.currentConfig.dedupeBufferSize,
-      cleaner.currentConfig.dedupeBufferLoadFactor,
-      100000,
-      cleaner.currentConfig.maxMessageSize,
-      cleaner.currentConfig.maxIoBytesPerSecond,
-      cleaner.currentConfig.backoffMs,
-      true))
-    cleaner.reconfigure(oldConfig, newConfig)
-
-    assertEquals(2, cleaner.cleanerCount)
-    checkLastCleaned("log", 0, firstDirty)
-    val compactedSize = log.logSegments.asScala.map(_.size).sum
-    assertTrue(startSize > compactedSize, s"log should have been compacted: 
startSize=$startSize compactedSize=$compactedSize")
-  }
-
-  private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: 
Long): Unit = {
-    // wait until cleaning up to base_offset, note that cleaning happens only 
when "log dirty ratio" is higher than
-    // TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
-    val topicPartition = new TopicPartition(topic, partitionId)
-    cleaner.awaitCleaned(topicPartition, firstDirty, 60000L)
-    val lastCleaned = 
cleaner.cleanerManager.allCleanerCheckpoints.get(topicPartition)
-    assertTrue(lastCleaned >= firstDirty, s"log cleaner should have processed 
up to offset $firstDirty, but lastCleaned=$lastCleaned")
-  }
-
-  private def checkLogAfterAppendingDups(log: UnifiedLog, startSize: Long, 
appends: Seq[(Int, String, Long)]): Unit = {
-    val read = readFromLog(log)
-    assertEquals(toMap(appends), toMap(read), "Contents of the map shouldn't 
change")
-    assertTrue(startSize > log.size)
-  }
-
-  private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, 
(String, Long)] = {
-    messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
-  }
-
-  private def readFromLog(log: UnifiedLog): Iterable[(Int, String, Long)] = {
-    for (segment <- log.logSegments.asScala; deepLogEntry <- 
segment.log.records.asScala) yield {
-      val key = TestUtils.readString(deepLogEntry.key).toInt
-      val value = TestUtils.readString(deepLogEntry.value)
-      (key, value, deepLogEntry.offset)
-    }
-  }
-
-  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: 
UnifiedLog, codec: Compression,
-                                        startKey: Int = 0, magicValue: Byte): 
Seq[(Int, String, Long)] = {
-    val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + 
numKeys)) yield {
-      val payload = counter.toString
-      incCounter()
-      (key, payload)
-    }
-
-    val records = kvs.map { case (key, payload) =>
-      new SimpleRecord(Time.SYSTEM.milliseconds(), key.toString.getBytes, 
payload.getBytes)
-    }
-
-    val appendInfo = 
log.appendAsLeaderWithRecordVersion(MemoryRecords.withRecords(magicValue, 
codec, records: _*),
-      0, RecordVersion.lookup(magicValue))
-    // move LSO forward to increase compaction bound
-    log.updateHighWatermark(log.logEndOffset)
-    val offsets = appendInfo.firstOffset to appendInfo.lastOffset
-
-    kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
-  }
-
-}
-
-object LogCleanerParameterizedIntegrationTest {
-
-  class AllCompressions extends ArgumentsProvider {
-    override def provideArguments(context: ExtensionContext): 
java.util.stream.Stream[_ <: Arguments] =
-      java.util.Arrays.stream(CompressionType.values.map(codec => 
Arguments.of(codec)))
-  }
-
-  // zstd compression is not supported with older message formats (i.e 
supported by V0 and V1)
-  class ExcludeZstd extends ArgumentsProvider {
-    override def provideArguments(context: ExtensionContext): 
java.util.stream.Stream[_ <: Arguments] =
-      java.util.Arrays.stream(CompressionType.values.filter(_ != 
CompressionType.ZSTD).map(codec => Arguments.of(codec)))
-  }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
similarity index 53%
rename from 
storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
rename to 
storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
index 0195017bb2d..7abf4b63265 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
@@ -18,17 +18,23 @@ package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.CompressionType;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.RecordVersion;
+import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
 
@@ -37,6 +43,7 @@ import com.yammer.metrics.core.Metric;
 import com.yammer.metrics.core.MetricName;
 
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -50,6 +57,7 @@ import java.nio.file.Files;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -58,6 +66,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.LongStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -67,11 +76,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * This is an integration test that tests the fully integrated log cleaner
  */
-public class LogCleanerLagIntegrationTest {
-    private static final Logger log = 
LoggerFactory.getLogger(LogCleanerLagIntegrationTest.class);
+public class LogCleanerIntegrationTest {
+    private static final Logger log = 
LoggerFactory.getLogger(LogCleanerIntegrationTest.class);
 
-    protected LogCleaner cleaner;
-    protected final File logDir = TestUtils.tempDirectory();
+    private LogCleaner cleaner;
+    private final File logDir = TestUtils.tempDirectory();
 
     private final List<UnifiedLog> logs = new ArrayList<>();
     private static final int DEFAULT_MAX_MESSAGE_SIZE = 128;
@@ -88,7 +97,7 @@ public class LogCleanerLagIntegrationTest {
 
     private final Compression codec = Compression.lz4().build();
 
-    private int counter = 0;
+    private int counter;
 
     private final MockTime time = new MockTime(1400000000000L, 1000L);  // Tue 
May 13 16:53:20 UTC 2014
     private static final List<TopicPartition> TOPIC_PARTITIONS = List.of(
@@ -97,8 +106,27 @@ public class LogCleanerLagIntegrationTest {
         new TopicPartition("log", 2)
     );
 
-    public record KeyValueOffset(int key, String value, long firstOffset) { }
-    public record ValueAndRecords(String value, MemoryRecords records) { }
+    private record KeyValueOffset(int key, String value, long firstOffset) { }
+    private record ValueAndRecords(String value, MemoryRecords records) { }
+    private record LogAndMessages(UnifiedLog log, List<KeyValueOffset> 
messages) { }
+
+    @BeforeEach
+    public void setup() {
+        counter = 0;
+    }
+
+    @AfterEach
+    public void teardown() throws IOException, InterruptedException {
+        kafka.utils.TestUtils.clearYammerMetrics();
+        if (cleaner != null) {
+            cleaner.shutdown();
+        }
+        time.scheduler.shutdown();
+        for (UnifiedLog log : logs) {
+            log.close();
+        }
+        Utils.delete(logDir);
+    }
 
     @ParameterizedTest
     @EnumSource(CompressionType.class)
@@ -290,6 +318,386 @@ public class LogCleanerLagIntegrationTest {
         assertEquals(cleaner.cleaners().size(), cleaner.deadThreadCount());
     }
 
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testCleanerCompaction(CompressionType compressionType) throws 
Exception {
+        Compression codec = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, 
codec);
+        int maxMessageSize = largeMessage.records().sizeInBytes();
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        List<KeyValueOffset> appends = writeDups(100, 3, theLog, codec);
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends);
+
+        LogAppendInfo appendInfo = 
theLog.appendAsLeader(largeMessage.records(), 0);
+        // move LSO forward to increase compaction bound
+        theLog.updateHighWatermark(theLog.logEndOffset());
+        long largeMessageOffset = appendInfo.firstOffset();
+
+        List<KeyValueOffset> dups = writeDups(100, 3, theLog, codec, 
largeMessageKey + 1, RecordBatch.CURRENT_MAGIC_VALUE);
+        List<KeyValueOffset> appends2 = new ArrayList<>(appends);
+        appends2.add(new KeyValueOffset(largeMessageKey, largeMessage.value(), 
largeMessageOffset));
+        appends2.addAll(dups);
+        long firstDirty2 = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty2);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends2);
+
+        // simulate deleting a partition, by removing it from logs
+        // force a checkpoint
+        // and make sure its gone from checkpoint file
+        cleaner.logs().remove(TOPIC_PARTITIONS.get(0));
+        cleaner.updateCheckpoints(logDir, 
Optional.of(TOPIC_PARTITIONS.get(0)));
+        Map<TopicPartition, Long> checkpoints = new OffsetCheckpointFile(
+            new File(logDir, LogCleanerManager.OFFSET_CHECKPOINT_FILE), 
null).read();
+        // we expect partition 0 to be gone
+        assertFalse(checkpoints.containsKey(TOPIC_PARTITIONS.get(0)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testCleansCombinedCompactAndDeleteTopic(CompressionType 
compressionType) throws Exception {
+        Properties logProps = new Properties();
+        int retentionMs = 100000;
+        logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs);
+        logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
+
+        LogAndMessages result1 = runCleanerAndCheckCompacted(100, 
compressionType, logProps);
+        UnifiedLog theLog = result1.log();
+
+        for (LogSegment segment : theLog.logSegments()) {
+            segment.setLastModified(time.milliseconds() - (2L * retentionMs));
+        }
+        TestUtils.waitForCondition(
+            () -> theLog.logStartOffset() == theLog.logEndOffset() && 
theLog.numberOfSegments() == 1,
+            "Timed out waiting for deletion of old segments");
+
+        cleaner.shutdown();
+        closeLog(theLog);
+
+        // run the cleaner again to make sure if there are no issues post 
deletion
+        LogAndMessages result2 = runCleanerAndCheckCompacted(20, 
compressionType, logProps);
+
+        List<KeyValueOffset> read = readFromLogFull(result2.log());
+        assertEquals(toMap(result2.messages()), toMap(read), "Contents of the 
map shouldn't change");
+    }
+
+    private LogAndMessages runCleanerAndCheckCompacted(int numKeys, 
CompressionType compressionType,
+                                                       Properties logProps) 
throws Exception {
+        cleaner = makeCleaner(TOPIC_PARTITIONS.subList(0, 1), logProps, 100L);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        List<KeyValueOffset> messages = writeDups(numKeys, 3, theLog, 
Compression.of(compressionType).build());
+        long startSize = theLog.size();
+
+        theLog.updateHighWatermark(theLog.logEndOffset());
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        cleaner.startup();
+
+        // should compact the log
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+        return new LogAndMessages(theLog, messages);
+    }
+
+    @ParameterizedTest
+    @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = "ZSTD")
+    public void testCleanerWithMessageFormatV0V1V2(CompressionType 
compressionType) throws Exception {
+        Compression compression = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression);
+        int maxMessageSize;
+        if (compressionType == CompressionType.NONE) {
+            maxMessageSize = largeMessage.records().sizeInBytes();
+        } else {
+            // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
+            // increase because the broker offsets are larger than the ones 
assigned by the client
+            // adding `6` to the message set size is good enough for this 
test: it covers the increased message size while
+            // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
+            maxMessageSize = largeMessage.records().sizeInBytes() + 6;
+        }
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+        Properties props = logConfigProperties(maxMessageSize);
+        props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
TimestampType.LOG_APPEND_TIME.name);
+        LogConfig logConfig = new LogConfig(props);
+        theLog.updateConfig(logConfig);
+
+        List<KeyValueOffset> appends1 = writeDups(100, 3, theLog, compression, 
0, RecordBatch.MAGIC_VALUE_V0);
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends1);
+
+        List<KeyValueOffset> dupsV0 = writeDups(40, 3, theLog, compression, 0, 
RecordBatch.MAGIC_VALUE_V0);
+        LogAppendInfo appendInfo = 
theLog.appendAsLeaderWithRecordVersion(largeMessage.records(), 0, 
RecordVersion.V0);
+        // move LSO forward to increase compaction bound
+        theLog.updateHighWatermark(theLog.logEndOffset());
+        long largeMessageOffset = appendInfo.firstOffset();
+
+        // also add some messages with version 1 and version 2 to check that 
we handle mixed format versions correctly
+        List<KeyValueOffset> dupsV1 = writeDups(40, 3, theLog, compression, 
30, RecordBatch.MAGIC_VALUE_V1);
+        List<KeyValueOffset> dupsV2 = writeDups(5, 3, theLog, compression, 15, 
RecordBatch.MAGIC_VALUE_V2);
+
+        Set<String> v0RecordKeysWithNoV1V2Updates = new HashSet<>();
+        Set<Integer> dupsV1Keys = new HashSet<>();
+        for (KeyValueOffset kvo : dupsV1) dupsV1Keys.add(kvo.key());
+        Set<Integer> dupsV2Keys = new HashSet<>();
+        for (KeyValueOffset kvo : dupsV2) dupsV2Keys.add(kvo.key());
+        for (KeyValueOffset kvo : appends1) {
+            if (!dupsV1Keys.contains(kvo.key()) && 
!dupsV2Keys.contains(kvo.key())) {
+                v0RecordKeysWithNoV1V2Updates.add(String.valueOf(kvo.key()));
+            }
+        }
+
+        List<KeyValueOffset> appends2 = new ArrayList<>(appends1);
+        appends2.addAll(dupsV0);
+        appends2.add(new KeyValueOffset(largeMessageKey, largeMessage.value(), 
largeMessageOffset));
+        appends2.addAll(dupsV1);
+        appends2.addAll(dupsV2);
+
+        // roll the log so that all appended messages can be compacted
+        theLog.roll();
+        long firstDirty2 = theLog.activeSegment().baseOffset();
+        checkLastCleaned("log", 0, firstDirty2);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends2);
+        checkLogAfterConvertingToV2(compressionType, theLog, 
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates);
+    }
+
+    @ParameterizedTest
+    @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = "ZSTD")
+    public void testCleaningNestedMessagesWithV0V1(CompressionType 
compressionType) throws Exception {
+        Compression compression = Compression.of(compressionType).build();
+        int maxMessageSize = 192;
+        cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L, 256);
+
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+        LogConfig logConfig = new LogConfig(logConfigProperties(new 
Properties(), maxMessageSize,
+            DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, DEFAULT_MIN_COMPACTION_LAG_MS,
+            DEFAULT_DELETE_DELAY, 256, DEFAULT_MAX_COMPACTION_LAG_MS));
+        theLog.updateConfig(logConfig);
+
+        // with compression enabled, these messages will be written as a 
single message containing all the individual messages
+        List<KeyValueOffset> appendsV0 = new 
ArrayList<>(writeDupsSingleMessageSet(2, 3, theLog, compression, 0, 
RecordBatch.MAGIC_VALUE_V0));
+        appendsV0.addAll(writeDupsSingleMessageSet(2, 2, theLog, compression, 
3, RecordBatch.MAGIC_VALUE_V0));
+
+        List<KeyValueOffset> appendsV1 = new 
ArrayList<>(writeDupsSingleMessageSet(2, 2, theLog, compression, 4, 
RecordBatch.MAGIC_VALUE_V1));
+        appendsV1.addAll(writeDupsSingleMessageSet(2, 2, theLog, compression, 
4, RecordBatch.MAGIC_VALUE_V1));
+        appendsV1.addAll(writeDupsSingleMessageSet(2, 2, theLog, compression, 
6, RecordBatch.MAGIC_VALUE_V1));
+
+        List<KeyValueOffset> appends = new ArrayList<>(appendsV0);
+        appends.addAll(appendsV1);
+
+        Set<Integer> appendsV1Keys = new HashSet<>();
+        for (KeyValueOffset kvo : appendsV1) appendsV1Keys.add(kvo.key());
+        Set<String> v0RecordKeysWithNoV1V2Updates = new HashSet<>();
+        for (KeyValueOffset kvo : appendsV0) {
+            if (!appendsV1Keys.contains(kvo.key())) {
+                v0RecordKeysWithNoV1V2Updates.add(String.valueOf(kvo.key()));
+            }
+        }
+
+        // roll the log so that all appended messages can be compacted
+        theLog.roll();
+        long startSize = theLog.size();
+        cleaner.startup();
+
+        long firstDirty = theLog.activeSegment().baseOffset();
+        assertTrue(firstDirty >= appends.size()); // ensure we clean data from 
V0 and V1
+
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+
+        checkLogAfterAppendingDups(theLog, startSize, appends);
+        checkLogAfterConvertingToV2(compressionType, theLog, 
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates);
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void cleanerConfigUpdateTest(CompressionType compressionType) 
throws Exception {
+        Compression codec = Compression.of(compressionType).build();
+        int largeMessageKey = 20;
+        ValueAndRecords largeMessage = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, 
codec);
+        int maxMessageSize = largeMessage.records().sizeInBytes();
+
+        cleaner = makeCleaner(TOPIC_PARTITIONS, 1L, maxMessageSize, 1);
+        UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+        writeDups(100, 3, theLog, codec);
+        long startSize = theLog.size();
+        cleaner.startup();
+        assertEquals(1, cleaner.cleanerCount());
+
+        // Verify no cleaning with LogCleanerIoBufferSizeProp=1
+        long firstDirty = theLog.activeSegment().baseOffset();
+        TopicPartition topicPartition = new TopicPartition("log", 0);
+        cleaner.awaitCleaned(topicPartition, firstDirty, 10);
+        assertTrue(cleaner.cleanerManager().allCleanerCheckpoints().isEmpty(), 
"Should not have cleaned");
+
+        ConfigDef configDef = Utils.mergeConfigs(List.of(
+            CleanerConfig.CONFIG_DEF,
+            ServerConfigs.CONFIG_DEF
+        ));
+
+        CleanerConfig currentConfig = cleaner.currentConfig();
+        Map<String, Object> oldConfigMap = new HashMap<>();
+        oldConfigMap.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, 
currentConfig.numThreads);
+        oldConfigMap.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
currentConfig.dedupeBufferSize);
+        
oldConfigMap.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, 
currentConfig.dedupeBufferLoadFactor);
+        oldConfigMap.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, 
currentConfig.ioBufferSize);
+        oldConfigMap.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, 
currentConfig.maxMessageSize);
+        
oldConfigMap.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, 
currentConfig.maxIoBytesPerSecond);
+        oldConfigMap.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, 
currentConfig.backoffMs);
+        oldConfigMap.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, 
currentConfig.enableCleaner);
+        AbstractConfig oldAbstractConfig = new AbstractConfig(configDef, 
oldConfigMap);
+
+        Map<String, Object> newConfigMap = new HashMap<>(oldConfigMap);
+        newConfigMap.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, 2);
+        newConfigMap.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, 
100000);
+        AbstractConfig newAbstractConfig = new AbstractConfig(configDef, 
newConfigMap);
+
+        cleaner.reconfigure(oldAbstractConfig, newAbstractConfig);
+
+        assertEquals(2, cleaner.cleanerCount());
+        checkLastCleaned("log", 0, firstDirty);
+        long compactedSize = 
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+        assertTrue(startSize > compactedSize,
+            "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
+    }
+
+    private void checkLastCleaned(String topic, int partitionId, long 
firstDirty) throws InterruptedException {
+        // wait until cleaning up to base_offset, note that cleaning happens 
only when "log dirty ratio" is higher than
+        // TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
+        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+        cleaner.awaitCleaned(topicPartition, firstDirty, 60000L);
+        Long lastCleaned = 
cleaner.cleanerManager().allCleanerCheckpoints().get(topicPartition);
+        assertTrue(lastCleaned >= firstDirty,
+            "log cleaner should have processed up to offset " + firstDirty + 
", but lastCleaned=" + lastCleaned);
+    }
+
+    private void checkLogAfterAppendingDups(UnifiedLog log, long startSize, 
List<KeyValueOffset> appends) {
+        List<KeyValueOffset> read = readFromLogFull(log);
+        assertEquals(toMap(appends), toMap(read), "Contents of the map 
shouldn't change");
+        assertTrue(startSize > log.size());
+    }
+
+    private Map<Integer, KeyValueOffset> toMap(List<KeyValueOffset> messages) {
+        Map<Integer, KeyValueOffset> result = new HashMap<>();
+        for (KeyValueOffset kvo : messages) {
+            result.put(kvo.key(), kvo);
+        }
+        return result;
+    }
+
+    private List<KeyValueOffset> readFromLogFull(UnifiedLog log) {
+        List<KeyValueOffset> result = new ArrayList<>();
+        for (LogSegment segment : log.logSegments()) {
+            for (Record record : segment.log().records()) {
+                int key = 
Integer.parseInt(LogTestUtils.readString(record.key()));
+                String value = LogTestUtils.readString(record.value());
+                result.add(new KeyValueOffset(key, value, record.offset()));
+            }
+        }
+        return result;
+    }
+
+    private List<KeyValueOffset> writeDupsSingleMessageSet(int numKeys, int 
numDups, UnifiedLog log,
+                                                           Compression codec, 
int startKey,
+                                                           byte magicValue) 
throws IOException {
+        List<KeyValueOffset> kvs = new ArrayList<>();
+        List<SimpleRecord> records = new ArrayList<>();
+        for (int i = 0; i < numDups; i++) {
+            for (int key = startKey; key < startKey + numKeys; key++) {
+                String value = String.valueOf(counter());
+                incCounter();
+                kvs.add(new KeyValueOffset(key, value, 0));
+                records.add(new SimpleRecord(
+                    time.milliseconds(),
+                    String.valueOf(key).getBytes(),
+                    value.getBytes()));
+            }
+        }
+
+        LogAppendInfo appendInfo = log.appendAsLeaderWithRecordVersion(
+            MemoryRecords.withRecords(magicValue, codec, records.toArray(new 
SimpleRecord[0])),
+            0, RecordVersion.lookup(magicValue));
+        // move LSO forward to increase compaction bound
+        log.updateHighWatermark(log.logEndOffset());
+
+        long[] offsets = LongStream.rangeClosed(appendInfo.firstOffset(), 
appendInfo.lastOffset()).toArray();
+
+        List<KeyValueOffset> result = new ArrayList<>();
+        for (int i = 0; i < kvs.size(); i++) {
+            KeyValueOffset kvo = kvs.get(i);
+            result.add(new KeyValueOffset(kvo.key(), kvo.value(), offsets[i]));
+        }
+        return result;
+    }
+
+    private void checkLogAfterConvertingToV2(CompressionType compressionType, 
UnifiedLog log,
+                                             TimestampType timestampType, 
Set<String> keysForV0RecordsWithNoV1V2Updates) {
+        for (LogSegment segment : log.logSegments()) {
+            for (RecordBatch recordBatch : segment.log().batches()) {
+                // Uncompressed v0/v1 records are always converted into single 
record v2 batches via compaction if they are retained
+                // Compressed v0/v1 record batches are converted into record 
batches v2 with one or more records (depending on the
+                // number of retained records after compaction)
+                assertEquals(RecordVersion.V2.value, recordBatch.magic());
+                List<Record> recordList = new ArrayList<>();
+                recordBatch.forEach(recordList::add);
+                if (compressionType == CompressionType.NONE) {
+                    assertEquals(1, recordList.size());
+                } else {
+                    assertFalse(recordList.isEmpty());
+                }
+
+                Record firstRecord = recordBatch.iterator().next();
+                String firstRecordKey = 
LogTestUtils.readString(firstRecord.key());
+                if 
(keysForV0RecordsWithNoV1V2Updates.contains(firstRecordKey)) {
+                    assertEquals(TimestampType.CREATE_TIME, 
recordBatch.timestampType());
+                } else {
+                    assertEquals(timestampType, recordBatch.timestampType());
+                }
+
+                for (Record record : recordBatch) {
+                    String recordKey = LogTestUtils.readString(record.key());
+                    if (keysForV0RecordsWithNoV1V2Updates.contains(recordKey)) 
{
+                        assertEquals(RecordBatch.NO_TIMESTAMP, 
record.timestamp(),
+                            "Record " + recordKey + " with unexpected 
timestamp ");
+                    } else {
+                        assertNotEquals(RecordBatch.NO_TIMESTAMP, 
record.timestamp(),
+                            "Record " + recordKey + " with unexpected 
timestamp " + RecordBatch.NO_TIMESTAMP);
+                    }
+                }
+            }
+        }
+    }
+
     private void breakPartitionLog(TopicPartition tp) throws IOException {
         UnifiedLog theLog = cleaner.logs().get(tp);
         writeDups(20, 3, theLog, codec);
@@ -336,24 +744,16 @@ public class LogCleanerLagIntegrationTest {
 
     private Map<Integer, Integer> readFromLog(UnifiedLog log) {
         Map<Integer, Integer> result = new HashMap<>();
-        for (LogSegment segment : log.logSegments()) {
-            for (Record record : segment.log().records()) {
-                int key = 
Integer.parseInt(LogTestUtils.readString(record.key()));
-                int value = 
Integer.parseInt(LogTestUtils.readString(record.value()));
-                result.put(key, value);
-            }
+        for (KeyValueOffset kvo : readFromLogFull(log)) {
+            result.put(kvo.key(), Integer.parseInt(kvo.value()));
         }
         return result;
     }
 
     private List<int[]> readKeyValuePairsFromLog(UnifiedLog log) {
         List<int[]> result = new ArrayList<>();
-        for (LogSegment segment : log.logSegments()) {
-            for (Record record : segment.log().records()) {
-                int key = 
Integer.parseInt(LogTestUtils.readString(record.key()));
-                int value = 
Integer.parseInt(LogTestUtils.readString(record.value()));
-                result.add(new int[]{key, value});
-            }
+        for (KeyValueOffset kvo : readFromLogFull(log)) {
+            result.add(new int[]{kvo.key(), Integer.parseInt(kvo.value())});
         }
         return result;
     }
@@ -524,6 +924,56 @@ public class LogCleanerLagIntegrationTest {
             new Properties());
     }
 
+    private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+                                   int maxMessageSize,
+                                   long backoffMs,
+                                   int segmentSize) throws IOException {
+        return makeCleaner(partitions,
+            DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+            1,
+            backoffMs,
+            maxMessageSize,
+            DEFAULT_MIN_COMPACTION_LAG_MS,
+            DEFAULT_DELETE_DELAY,
+            segmentSize,
+            DEFAULT_MAX_COMPACTION_LAG_MS,
+            null,
+            new Properties());
+    }
+
+    private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+                                   Properties propertyOverrides,
+                                   long backoffMs) throws IOException {
+        return makeCleaner(partitions,
+            DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+            1,
+            backoffMs,
+            DEFAULT_MAX_MESSAGE_SIZE,
+            DEFAULT_MIN_COMPACTION_LAG_MS,
+            DEFAULT_DELETE_DELAY,
+            DEFAULT_SEGMENT_SIZE,
+            DEFAULT_MAX_COMPACTION_LAG_MS,
+            null,
+            propertyOverrides);
+    }
+
+    private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+                                   long backoffMs,
+                                   int maxMessageSize,
+                                   int cleanerIoBufferSize) throws IOException 
{
+        return makeCleaner(partitions,
+            DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+            1,
+            backoffMs,
+            maxMessageSize,
+            DEFAULT_MIN_COMPACTION_LAG_MS,
+            DEFAULT_DELETE_DELAY,
+            DEFAULT_SEGMENT_SIZE,
+            DEFAULT_MAX_COMPACTION_LAG_MS,
+            cleanerIoBufferSize,
+            new Properties());
+    }
+
     private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
                                    long backoffMs,
                                    long minCompactionLagMs,
@@ -594,21 +1044,8 @@ public class LogCleanerLagIntegrationTest {
         return new ValueAndRecords(value, records);
     }
 
-    private void closeLog(UnifiedLog log) throws IOException {
+    private void closeLog(UnifiedLog log) {
         log.close();
         logs.remove(log);
     }
-
-    @AfterEach
-    public void teardown() throws IOException, InterruptedException {
-        kafka.utils.TestUtils.clearYammerMetrics();
-        if (cleaner != null) {
-            cleaner.shutdown();
-        }
-        time.scheduler.shutdown();
-        for (UnifiedLog log : logs) {
-            log.close();
-        }
-        Utils.delete(logDir);
-    }
 }

Reply via email to