This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de7e0ebb441 KAFKA-20267: Move LogCleanerParameterizedIntegrationTest
to storage module (#21657)
de7e0ebb441 is described below
commit de7e0ebb441bd5e048dd5e8c757d2cb22f2394a1
Author: Maros Orsak <[email protected]>
AuthorDate: Fri Mar 27 10:53:45 2026 +0100
KAFKA-20267: Move LogCleanerParameterizedIntegrationTest to storage module
(#21657)
This PR is last/final which concludes the migration of
AbstractLogCleanerIntegrationTest.scala and its sub test suites merging
into one compact test suite.
---------
Signed-off-by: see-quick <[email protected]>
Reviewers: Mickael Maison <[email protected]>, Federico Valeri
<[email protected]>
---
.../log/AbstractLogCleanerIntegrationTest.scala | 175 -------
.../LogCleanerParameterizedIntegrationTest.scala | 379 ----------------
...ionTest.java => LogCleanerIntegrationTest.java} | 503 +++++++++++++++++++--
3 files changed, 470 insertions(+), 587 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
deleted file mode 100644
index c2310da3d98..00000000000
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log
-
-import kafka.utils.TestUtils
-import kafka.utils.Implicits._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record.internal.{MemoryRecords, RecordBatch,
RecordVersion}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner,
LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, UnifiedLog}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.{AfterEach, Tag}
-
-import java.io.File
-import java.nio.file.Files
-import java.util
-import java.util.{Optional, Properties}
-import scala.collection.Seq
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
-
-@Tag("integration")
-abstract class AbstractLogCleanerIntegrationTest {
-
- var cleaner: LogCleaner = _
- val logDir = TestUtils.tempDir()
-
- private val logs = ListBuffer.empty[UnifiedLog]
- private val defaultMaxMessageSize = 128
- private val defaultMinCleanableDirtyRatio = 0.0F
- private val defaultMinCompactionLagMS = 0L
- private val defaultDeleteDelay = 1000
- private val defaultSegmentSize = 2048
- private val defaultMaxCompactionLagMs = Long.MaxValue
-
- def time: MockTime
-
- @AfterEach
- def teardown(): Unit = {
- if (cleaner != null)
- cleaner.shutdown()
- time.scheduler.shutdown()
- logs.foreach(_.close())
- Utils.delete(logDir)
- }
-
- def logConfigProperties(propertyOverrides: Properties = new Properties(),
- maxMessageSize: Int,
- minCleanableDirtyRatio: Float =
defaultMinCleanableDirtyRatio,
- minCompactionLagMs: Long = defaultMinCompactionLagMS,
- deleteDelay: Int = defaultDeleteDelay,
- segmentSize: Int = defaultSegmentSize,
- maxCompactionLagMs: Long =
defaultMaxCompactionLagMs): Properties = {
- val props = new Properties()
- props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize:
java.lang.Integer)
- props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize:
java.lang.Integer)
- props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100*1024:
java.lang.Integer)
- props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay:
java.lang.Integer)
- props.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
- props.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,
minCleanableDirtyRatio: java.lang.Float)
- props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, minCompactionLagMs:
java.lang.Long)
- props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, maxCompactionLagMs:
java.lang.Long)
- props ++= propertyOverrides
- props
- }
-
- def makeCleaner(partitions: Iterable[TopicPartition],
- minCleanableDirtyRatio: Float =
defaultMinCleanableDirtyRatio,
- numThreads: Int = 1,
- backoffMs: Long = 15000L,
- maxMessageSize: Int = defaultMaxMessageSize,
- minCompactionLagMs: Long = defaultMinCompactionLagMS,
- deleteDelay: Int = defaultDeleteDelay,
- segmentSize: Int = defaultSegmentSize,
- maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
- cleanerIoBufferSize: Option[Int] = None,
- propertyOverrides: Properties = new Properties()):
LogCleaner = {
-
- val logMap = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
- for (partition <- partitions) {
- val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
- Files.createDirectories(dir.toPath)
-
- val logConfig = new LogConfig(logConfigProperties(propertyOverrides,
- maxMessageSize = maxMessageSize,
- minCleanableDirtyRatio = minCleanableDirtyRatio,
- minCompactionLagMs = minCompactionLagMs,
- deleteDelay = deleteDelay,
- segmentSize = segmentSize,
- maxCompactionLagMs = maxCompactionLagMs))
- val log = UnifiedLog.create(
- dir,
- logConfig,
- 0L,
- 0L,
- time.scheduler,
- new BrokerTopicStats,
- time,
- 5 * 60 * 1000,
- new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false),
- TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
- new LogDirFailureChannel(10),
- true,
- Optional.empty)
- logMap.put(partition, log)
- this.logs += log
- }
-
- val cleanerConfig = new CleanerConfig(
- numThreads,
- 4 * 1024 * 1024L,
- 0.9,
- cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
- maxMessageSize,
- Double.MaxValue,
- backoffMs,
- true)
- new LogCleaner(cleanerConfig,
- util.List.of(logDir),
- logMap,
- new LogDirFailureChannel(1),
- time)
- }
-
- private var ctr = 0
- def counter: Int = ctr
- def incCounter(): Unit = ctr += 1
-
- def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec:
Compression,
- startKey: Int = 0, magicValue: Byte =
RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
- for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys))
yield {
- val value = counter.toString
- val appendInfo =
log.appendAsLeaderWithRecordVersion(TestUtils.singletonRecords(value =
value.getBytes, codec = codec,
- key = key.toString.getBytes, magicValue = magicValue), 0,
RecordVersion.lookup(magicValue))
- // move LSO forward to increase compaction bound
- log.updateHighWatermark(log.logEndOffset)
- incCounter()
- (key, value, appendInfo.firstOffset)
- }
- }
-
- def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte, codec:
Compression): (String, MemoryRecords) = {
- def messageValue(length: Int): String = {
- val random = new Random(0)
- new String(random.alphanumeric.take(length).toArray)
- }
- val value = messageValue(128)
- val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec
= codec, key = key.toString.getBytes,
- magicValue = messageFormatVersion)
- (value, messageSet)
- }
-
- def closeLog(log: UnifiedLog): Unit = {
- log.close()
- logs -= log
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
deleted file mode 100755
index 1a4436fd5c0..00000000000
---
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.File
-import java.util.{Optional, Properties}
-import kafka.server.KafkaConfig
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.{CleanerConfig,
LogCleanerManager, LogConfig, UnifiedLog}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.extension.ExtensionContext
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider,
ArgumentsSource}
-
-import scala.collection._
-import scala.jdk.CollectionConverters._
-
-/**
- * This is an integration test that tests the fully integrated log cleaner
- */
-class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrationTest {
-
- val time = new MockTime()
-
- val topicPartitions = Array(new TopicPartition("log", 0), new
TopicPartition("log", 1), new TopicPartition("log", 2))
-
- @ParameterizedTest
-
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
- def cleanerTest(compressionType: CompressionType): Unit = {
- val codec: Compression = Compression.of(compressionType).build()
- val largeMessageKey = 20
- val (largeMessageValue, largeMessageSet) =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE,
codec)
- val maxMessageSize = largeMessageSet.sizeInBytes
-
- cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize)
- val log = cleaner.logs.get(topicPartitions(0))
-
- val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec =
codec)
- val startSize = log.size
- cleaner.startup()
-
- val firstDirty = log.activeSegment.baseOffset
- checkLastCleaned("log", 0, firstDirty)
- val compactedSize = log.logSegments.asScala.map(_.size).sum
- assertTrue(startSize > compactedSize, s"log should have been compacted:
startSize=$startSize compactedSize=$compactedSize")
-
- checkLogAfterAppendingDups(log, startSize, appends)
-
- val appendInfo = log.appendAsLeader(largeMessageSet, 0)
- // move LSO forward to increase compaction bound
- log.updateHighWatermark(log.logEndOffset)
- val largeMessageOffset = appendInfo.firstOffset
-
- val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100,
numDups = 3, log = log, codec = codec)
- val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue,
largeMessageOffset)) ++ dups
- val firstDirty2 = log.activeSegment.baseOffset
- checkLastCleaned("log", 0, firstDirty2)
-
- checkLogAfterAppendingDups(log, startSize, appends2)
-
- // simulate deleting a partition, by removing it from logs
- // force a checkpoint
- // and make sure its gone from checkpoint file
- cleaner.logs.remove(topicPartitions(0))
- cleaner.updateCheckpoints(logDir, Optional.of(topicPartitions(0)))
- val checkpoints = new OffsetCheckpointFile(new File(logDir,
LogCleanerManager.OFFSET_CHECKPOINT_FILE), null).read()
- // we expect partition 0 to be gone
- assertFalse(checkpoints.containsKey(topicPartitions(0)))
- }
-
- @ParameterizedTest
-
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
- def testCleansCombinedCompactAndDeleteTopic(compressionType:
CompressionType): Unit = {
- val logProps = new Properties()
- val retentionMs: Integer = 100000
- logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: Integer)
- logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
-
- def runCleanerAndCheckCompacted(numKeys: Int): (UnifiedLog, Seq[(Int,
String, Long)]) = {
- cleaner = makeCleaner(partitions = topicPartitions.take(1),
propertyOverrides = logProps, backoffMs = 100L)
- val log = cleaner.logs.get(topicPartitions(0))
-
- val messages = writeDups(numKeys = numKeys, numDups = 3, log = log,
codec = Compression.of(compressionType).build())
- val startSize = log.size
-
- log.updateHighWatermark(log.logEndOffset)
-
- val firstDirty = log.activeSegment.baseOffset
- cleaner.startup()
-
- // should compact the log
- checkLastCleaned("log", 0, firstDirty)
- val compactedSize = log.logSegments.asScala.map(_.size).sum
- assertTrue(startSize > compactedSize, s"log should have been compacted:
startSize=$startSize compactedSize=$compactedSize")
- (log, messages)
- }
-
- val (log, _) = runCleanerAndCheckCompacted(100)
-
- // Set the last modified time to an old value to force deletion of old
segments
- val endOffset = log.logEndOffset
- log.logSegments.forEach(_.setLastModified(time.milliseconds - (2 *
retentionMs)))
- TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset &&
log.numberOfSegments == 1,
- "Timed out waiting for deletion of old segments")
-
- cleaner.shutdown()
- closeLog(log)
-
- // run the cleaner again to make sure if there are no issues post deletion
- val (log2, messages) = runCleanerAndCheckCompacted(20)
- val read = readFromLog(log2)
- assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't
change")
- }
-
- @ParameterizedTest
- @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
- def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType):
Unit = {
- val compression = Compression.of(compressionType).build()
- val largeMessageKey = 20
- val (largeMessageValue, largeMessageSet) =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0,
compression)
- val maxMessageSize = compression match {
- case Compression.NONE => largeMessageSet.sizeInBytes
- case _ =>
- // the broker assigns absolute offsets for message format 0 which
potentially causes the compressed size to
- // increase because the broker offsets are larger than the ones
assigned by the client
- // adding `6` to the message set size is good enough for this test: it
covers the increased message size while
- // still being less than the overhead introduced by the conversion
from message format version 0 to 1
- largeMessageSet.sizeInBytes + 6
- }
-
- cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize)
-
- val log = cleaner.logs.get(topicPartitions(0))
- val props = logConfigProperties(maxMessageSize = maxMessageSize)
- props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
TimestampType.LOG_APPEND_TIME.name)
- val logConfig = new LogConfig(props)
- log.updateConfig(logConfig)
-
- val appends1 = writeDups(numKeys = 100, numDups = 3, log = log, codec =
compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
- val startSize = log.size
- cleaner.startup()
-
- val firstDirty = log.activeSegment.baseOffset
- checkLastCleaned("log", 0, firstDirty)
- val compactedSize = log.logSegments.asScala.map(_.size).sum
- assertTrue(startSize > compactedSize, s"log should have been compacted:
startSize=$startSize compactedSize=$compactedSize")
-
- checkLogAfterAppendingDups(log, startSize, appends1)
-
- val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec =
compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
- val appendInfo = log.appendAsLeaderWithRecordVersion(largeMessageSet, 0,
RecordVersion.V0)
- // move LSO forward to increase compaction bound
- log.updateHighWatermark(log.logEndOffset)
- val largeMessageOffset = appendInfo.firstOffset
-
- // also add some messages with version 1 and version 2 to check that we
handle mixed format versions correctly
- val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log =
log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
- val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log,
codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V2)
-
- val v0RecordKeysWithNoV1V2Updates = (appends1.map(_._1).toSet --
dupsV1.map(_._1) -- dupsV2.map(_._1)).map(_.toString)
- val appends2: Seq[(Int, String, Long)] =
- appends1 ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue,
largeMessageOffset)) ++ dupsV1 ++ dupsV2
-
- // roll the log so that all appended messages can be compacted
- log.roll()
- val firstDirty2 = log.activeSegment.baseOffset
- checkLastCleaned("log", 0, firstDirty2)
-
- checkLogAfterAppendingDups(log, startSize, appends2)
- checkLogAfterConvertingToV2(compressionType, log,
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates)
- }
-
- @ParameterizedTest
- @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
- def testCleaningNestedMessagesWithV0V1(compressionType: CompressionType):
Unit = {
- val compression = Compression.of(compressionType).build()
- val maxMessageSize = 192
- cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize, segmentSize = 256)
-
- val log = cleaner.logs.get(topicPartitions(0))
- val logConfig = new LogConfig(logConfigProperties(maxMessageSize =
maxMessageSize, segmentSize = 256))
- log.updateConfig(logConfig)
-
- // with compression enabled, these messages will be written as a single
message containing all the individual messages
- var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log =
log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
- appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
-
- var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2,
numDups = 2, log = log, codec = compression, magicValue =
RecordBatch.MAGIC_VALUE_V1)
- appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
- appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups
= 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
-
- val appends = appendsV0 ++ appendsV1
-
- val v0RecordKeysWithNoV1V2Updates = (appendsV0.map(_._1).toSet --
appendsV1.map(_._1)).map(_.toString)
-
- // roll the log so that all appended messages can be compacted
- log.roll()
- val startSize = log.size
- cleaner.startup()
-
- val firstDirty = log.activeSegment.baseOffset
- assertTrue(firstDirty >= appends.size) // ensure we clean data from V0 and
V1
-
- checkLastCleaned("log", 0, firstDirty)
- val compactedSize = log.logSegments.asScala.map(_.size).sum
- assertTrue(startSize > compactedSize, s"log should have been compacted:
startSize=$startSize compactedSize=$compactedSize")
-
- checkLogAfterAppendingDups(log, startSize, appends)
- checkLogAfterConvertingToV2(compressionType, log,
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates)
- }
-
- private def checkLogAfterConvertingToV2(compressionType: CompressionType,
log: UnifiedLog, timestampType: TimestampType,
- keysForV0RecordsWithNoV1V2Updates:
Set[String]): Unit = {
- for (segment <- log.logSegments.asScala; recordBatch <-
segment.log.batches.asScala) {
- // Uncompressed v0/v1 records are always converted into single record v2
batches via compaction if they are retained
- // Compressed v0/v1 record batches are converted into record batches v2
with one or more records (depending on the
- // number of retained records after compaction)
- assertEquals(RecordVersion.V2.value, recordBatch.magic)
- if (compressionType == CompressionType.NONE)
- assertEquals(1, recordBatch.iterator().asScala.size)
- else
- assertTrue(recordBatch.iterator().asScala.nonEmpty)
-
- val firstRecordKey =
TestUtils.readString(recordBatch.iterator().next().key())
- if (keysForV0RecordsWithNoV1V2Updates.contains(firstRecordKey))
- assertEquals(TimestampType.CREATE_TIME, recordBatch.timestampType)
- else
- assertEquals(timestampType, recordBatch.timestampType)
-
- recordBatch.iterator.asScala.foreach { record =>
- val recordKey = TestUtils.readString(record.key)
- if (keysForV0RecordsWithNoV1V2Updates.contains(recordKey))
- assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp, "Record " +
recordKey + " with unexpected timestamp ")
- else
- assertNotEquals(RecordBatch.NO_TIMESTAMP, record.timestamp, "Record
" + recordKey + " with unexpected timestamp " + RecordBatch.NO_TIMESTAMP)
- }
- }
- }
-
- @ParameterizedTest
-
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
- def cleanerConfigUpdateTest(compressionType: CompressionType): Unit = {
- val codec: Compression = Compression.of(compressionType).build()
- val largeMessageKey = 20
- val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey,
RecordBatch.CURRENT_MAGIC_VALUE, codec)
- val maxMessageSize = largeMessageSet.sizeInBytes
-
- cleaner = makeCleaner(partitions = topicPartitions, backoffMs = 1,
maxMessageSize = maxMessageSize,
- cleanerIoBufferSize = Some(1))
- val log = cleaner.logs.get(topicPartitions(0))
-
- writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
- val startSize = log.size
- cleaner.startup()
- assertEquals(1, cleaner.cleanerCount)
-
- // Verify no cleaning with LogCleanerIoBufferSizeProp=1
- val firstDirty = log.activeSegment.baseOffset
- val topicPartition = new TopicPartition("log", 0)
- cleaner.awaitCleaned(topicPartition, firstDirty, 10)
- assertTrue(cleaner.cleanerManager.allCleanerCheckpoints.isEmpty, "Should
not have cleaned")
-
- def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig):
KafkaConfig = {
- val props = TestUtils.createBrokerConfig(0)
- props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP,
cleanerConfig.numThreads.toString)
- props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
cleanerConfig.dedupeBufferSize.toString)
- props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
cleanerConfig.dedupeBufferLoadFactor.toString)
- props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
cleanerConfig.ioBufferSize.toString)
- props.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG,
cleanerConfig.maxMessageSize.toString)
- props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP,
cleanerConfig.backoffMs.toString)
- props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
cleanerConfig.maxIoBytesPerSecond.toString)
- KafkaConfig.fromProps(props)
- }
-
- // Verify cleaning done with larger LogCleanerIoBufferSizeProp
- val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
- val newConfig = kafkaConfigWithCleanerConfig(new CleanerConfig(2,
- cleaner.currentConfig.dedupeBufferSize,
- cleaner.currentConfig.dedupeBufferLoadFactor,
- 100000,
- cleaner.currentConfig.maxMessageSize,
- cleaner.currentConfig.maxIoBytesPerSecond,
- cleaner.currentConfig.backoffMs,
- true))
- cleaner.reconfigure(oldConfig, newConfig)
-
- assertEquals(2, cleaner.cleanerCount)
- checkLastCleaned("log", 0, firstDirty)
- val compactedSize = log.logSegments.asScala.map(_.size).sum
- assertTrue(startSize > compactedSize, s"log should have been compacted:
startSize=$startSize compactedSize=$compactedSize")
- }
-
- private def checkLastCleaned(topic: String, partitionId: Int, firstDirty:
Long): Unit = {
- // wait until cleaning up to base_offset, note that cleaning happens only
when "log dirty ratio" is higher than
- // TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
- val topicPartition = new TopicPartition(topic, partitionId)
- cleaner.awaitCleaned(topicPartition, firstDirty, 60000L)
- val lastCleaned =
cleaner.cleanerManager.allCleanerCheckpoints.get(topicPartition)
- assertTrue(lastCleaned >= firstDirty, s"log cleaner should have processed
up to offset $firstDirty, but lastCleaned=$lastCleaned")
- }
-
- private def checkLogAfterAppendingDups(log: UnifiedLog, startSize: Long,
appends: Seq[(Int, String, Long)]): Unit = {
- val read = readFromLog(log)
- assertEquals(toMap(appends), toMap(read), "Contents of the map shouldn't
change")
- assertTrue(startSize > log.size)
- }
-
- private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int,
(String, Long)] = {
- messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
- }
-
- private def readFromLog(log: UnifiedLog): Iterable[(Int, String, Long)] = {
- for (segment <- log.logSegments.asScala; deepLogEntry <-
segment.log.records.asScala) yield {
- val key = TestUtils.readString(deepLogEntry.key).toInt
- val value = TestUtils.readString(deepLogEntry.value)
- (key, value, deepLogEntry.offset)
- }
- }
-
- private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log:
UnifiedLog, codec: Compression,
- startKey: Int = 0, magicValue: Byte):
Seq[(Int, String, Long)] = {
- val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey +
numKeys)) yield {
- val payload = counter.toString
- incCounter()
- (key, payload)
- }
-
- val records = kvs.map { case (key, payload) =>
- new SimpleRecord(Time.SYSTEM.milliseconds(), key.toString.getBytes,
payload.getBytes)
- }
-
- val appendInfo =
log.appendAsLeaderWithRecordVersion(MemoryRecords.withRecords(magicValue,
codec, records: _*),
- 0, RecordVersion.lookup(magicValue))
- // move LSO forward to increase compaction bound
- log.updateHighWatermark(log.logEndOffset)
- val offsets = appendInfo.firstOffset to appendInfo.lastOffset
-
- kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
- }
-
-}
-
-object LogCleanerParameterizedIntegrationTest {
-
- class AllCompressions extends ArgumentsProvider {
- override def provideArguments(context: ExtensionContext):
java.util.stream.Stream[_ <: Arguments] =
- java.util.Arrays.stream(CompressionType.values.map(codec =>
Arguments.of(codec)))
- }
-
- // zstd compression is not supported with older message formats (i.e
supported by V0 and V1)
- class ExcludeZstd extends ArgumentsProvider {
- override def provideArguments(context: ExtensionContext):
java.util.stream.Stream[_ <: Arguments] =
- java.util.Arrays.stream(CompressionType.values.filter(_ !=
CompressionType.ZSTD).map(codec => Arguments.of(codec)))
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
similarity index 53%
rename from
storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
rename to
storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
index 0195017bb2d..7abf4b63265 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerLagIntegrationTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
@@ -18,17 +18,23 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.CompressionType;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.RecordVersion;
+import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
@@ -37,6 +43,7 @@ import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -50,6 +57,7 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -58,6 +66,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.LongStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -67,11 +76,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This is an integration test that tests the fully integrated log cleaner
*/
-public class LogCleanerLagIntegrationTest {
- private static final Logger log =
LoggerFactory.getLogger(LogCleanerLagIntegrationTest.class);
+public class LogCleanerIntegrationTest {
+ private static final Logger log =
LoggerFactory.getLogger(LogCleanerIntegrationTest.class);
- protected LogCleaner cleaner;
- protected final File logDir = TestUtils.tempDirectory();
+ private LogCleaner cleaner;
+ private final File logDir = TestUtils.tempDirectory();
private final List<UnifiedLog> logs = new ArrayList<>();
private static final int DEFAULT_MAX_MESSAGE_SIZE = 128;
@@ -88,7 +97,7 @@ public class LogCleanerLagIntegrationTest {
private final Compression codec = Compression.lz4().build();
- private int counter = 0;
+ private int counter;
private final MockTime time = new MockTime(1400000000000L, 1000L); // Tue
May 13 16:53:20 UTC 2014
private static final List<TopicPartition> TOPIC_PARTITIONS = List.of(
@@ -97,8 +106,27 @@ public class LogCleanerLagIntegrationTest {
new TopicPartition("log", 2)
);
- public record KeyValueOffset(int key, String value, long firstOffset) { }
- public record ValueAndRecords(String value, MemoryRecords records) { }
+ private record KeyValueOffset(int key, String value, long firstOffset) { }
+ private record ValueAndRecords(String value, MemoryRecords records) { }
+ private record LogAndMessages(UnifiedLog log, List<KeyValueOffset>
messages) { }
+
+ @BeforeEach
+ public void setup() {
+ counter = 0;
+ }
+
+ @AfterEach
+ public void teardown() throws IOException, InterruptedException {
+ kafka.utils.TestUtils.clearYammerMetrics();
+ if (cleaner != null) {
+ cleaner.shutdown();
+ }
+ time.scheduler.shutdown();
+ for (UnifiedLog log : logs) {
+ log.close();
+ }
+ Utils.delete(logDir);
+ }
@ParameterizedTest
@EnumSource(CompressionType.class)
@@ -290,6 +318,386 @@ public class LogCleanerLagIntegrationTest {
assertEquals(cleaner.cleaners().size(), cleaner.deadThreadCount());
}
+ @ParameterizedTest
+ @EnumSource(CompressionType.class)
+ public void testCleanerCompaction(CompressionType compressionType) throws
Exception {
+ Compression codec = Compression.of(compressionType).build();
+ int largeMessageKey = 20;
+ ValueAndRecords largeMessage =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE,
codec);
+ int maxMessageSize = largeMessage.records().sizeInBytes();
+
+ cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+ List<KeyValueOffset> appends = writeDups(100, 3, theLog, codec);
+ long startSize = theLog.size();
+ cleaner.startup();
+
+ long firstDirty = theLog.activeSegment().baseOffset();
+ checkLastCleaned("log", 0, firstDirty);
+ long compactedSize =
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+ assertTrue(startSize > compactedSize,
+ "log should have been compacted: startSize=" + startSize + "
compactedSize=" + compactedSize);
+
+ checkLogAfterAppendingDups(theLog, startSize, appends);
+
+ LogAppendInfo appendInfo =
theLog.appendAsLeader(largeMessage.records(), 0);
+ // move LSO forward to increase compaction bound
+ theLog.updateHighWatermark(theLog.logEndOffset());
+ long largeMessageOffset = appendInfo.firstOffset();
+
+ List<KeyValueOffset> dups = writeDups(100, 3, theLog, codec,
largeMessageKey + 1, RecordBatch.CURRENT_MAGIC_VALUE);
+ List<KeyValueOffset> appends2 = new ArrayList<>(appends);
+ appends2.add(new KeyValueOffset(largeMessageKey, largeMessage.value(),
largeMessageOffset));
+ appends2.addAll(dups);
+ long firstDirty2 = theLog.activeSegment().baseOffset();
+ checkLastCleaned("log", 0, firstDirty2);
+
+ checkLogAfterAppendingDups(theLog, startSize, appends2);
+
+ // simulate deleting a partition, by removing it from logs
+ // force a checkpoint
+ // and make sure its gone from checkpoint file
+ cleaner.logs().remove(TOPIC_PARTITIONS.get(0));
+ cleaner.updateCheckpoints(logDir,
Optional.of(TOPIC_PARTITIONS.get(0)));
+ Map<TopicPartition, Long> checkpoints = new OffsetCheckpointFile(
+ new File(logDir, LogCleanerManager.OFFSET_CHECKPOINT_FILE),
null).read();
+ // we expect partition 0 to be gone
+ assertFalse(checkpoints.containsKey(TOPIC_PARTITIONS.get(0)));
+ }
+
+ @ParameterizedTest
+ @EnumSource(CompressionType.class)
+ public void testCleansCombinedCompactAndDeleteTopic(CompressionType
compressionType) throws Exception {
+ Properties logProps = new Properties();
+ int retentionMs = 100000;
+ logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs);
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
+
+ LogAndMessages result1 = runCleanerAndCheckCompacted(100,
compressionType, logProps);
+ UnifiedLog theLog = result1.log();
+
+ for (LogSegment segment : theLog.logSegments()) {
+ segment.setLastModified(time.milliseconds() - (2L * retentionMs));
+ }
+ TestUtils.waitForCondition(
+ () -> theLog.logStartOffset() == theLog.logEndOffset() &&
theLog.numberOfSegments() == 1,
+ "Timed out waiting for deletion of old segments");
+
+ cleaner.shutdown();
+ closeLog(theLog);
+
+ // run the cleaner again to make sure if there are no issues post
deletion
+ LogAndMessages result2 = runCleanerAndCheckCompacted(20,
compressionType, logProps);
+
+ List<KeyValueOffset> read = readFromLogFull(result2.log());
+ assertEquals(toMap(result2.messages()), toMap(read), "Contents of the
map shouldn't change");
+ }
+
+ private LogAndMessages runCleanerAndCheckCompacted(int numKeys,
CompressionType compressionType,
+ Properties logProps)
throws Exception {
+ cleaner = makeCleaner(TOPIC_PARTITIONS.subList(0, 1), logProps, 100L);
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+ List<KeyValueOffset> messages = writeDups(numKeys, 3, theLog,
Compression.of(compressionType).build());
+ long startSize = theLog.size();
+
+ theLog.updateHighWatermark(theLog.logEndOffset());
+
+ long firstDirty = theLog.activeSegment().baseOffset();
+ cleaner.startup();
+
+ // should compact the log
+ checkLastCleaned("log", 0, firstDirty);
+ long compactedSize =
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+ assertTrue(startSize > compactedSize,
+ "log should have been compacted: startSize=" + startSize + "
compactedSize=" + compactedSize);
+ return new LogAndMessages(theLog, messages);
+ }
+
+ @ParameterizedTest
+ @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = "ZSTD")
+ public void testCleanerWithMessageFormatV0V1V2(CompressionType
compressionType) throws Exception {
+ Compression compression = Compression.of(compressionType).build();
+ int largeMessageKey = 20;
+ ValueAndRecords largeMessage =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0,
compression);
+ int maxMessageSize;
+ if (compressionType == CompressionType.NONE) {
+ maxMessageSize = largeMessage.records().sizeInBytes();
+ } else {
+ // the broker assigns absolute offsets for message format 0 which
potentially causes the compressed size to
+ // increase because the broker offsets are larger than the ones
assigned by the client
+ // adding `6` to the message set size is good enough for this
test: it covers the increased message size while
+ // still being less than the overhead introduced by the conversion
from message format version 0 to 1
+ maxMessageSize = largeMessage.records().sizeInBytes() + 6;
+ }
+
+ cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L);
+
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+ Properties props = logConfigProperties(maxMessageSize);
+ props.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
TimestampType.LOG_APPEND_TIME.name);
+ LogConfig logConfig = new LogConfig(props);
+ theLog.updateConfig(logConfig);
+
+ List<KeyValueOffset> appends1 = writeDups(100, 3, theLog, compression,
0, RecordBatch.MAGIC_VALUE_V0);
+ long startSize = theLog.size();
+ cleaner.startup();
+
+ long firstDirty = theLog.activeSegment().baseOffset();
+ checkLastCleaned("log", 0, firstDirty);
+ long compactedSize =
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+ assertTrue(startSize > compactedSize,
+ "log should have been compacted: startSize=" + startSize + "
compactedSize=" + compactedSize);
+
+ checkLogAfterAppendingDups(theLog, startSize, appends1);
+
+ List<KeyValueOffset> dupsV0 = writeDups(40, 3, theLog, compression, 0,
RecordBatch.MAGIC_VALUE_V0);
+ LogAppendInfo appendInfo =
theLog.appendAsLeaderWithRecordVersion(largeMessage.records(), 0,
RecordVersion.V0);
+ // move LSO forward to increase compaction bound
+ theLog.updateHighWatermark(theLog.logEndOffset());
+ long largeMessageOffset = appendInfo.firstOffset();
+
+ // also add some messages with version 1 and version 2 to check that
we handle mixed format versions correctly
+ List<KeyValueOffset> dupsV1 = writeDups(40, 3, theLog, compression,
30, RecordBatch.MAGIC_VALUE_V1);
+ List<KeyValueOffset> dupsV2 = writeDups(5, 3, theLog, compression, 15,
RecordBatch.MAGIC_VALUE_V2);
+
+ Set<String> v0RecordKeysWithNoV1V2Updates = new HashSet<>();
+ Set<Integer> dupsV1Keys = new HashSet<>();
+ for (KeyValueOffset kvo : dupsV1) dupsV1Keys.add(kvo.key());
+ Set<Integer> dupsV2Keys = new HashSet<>();
+ for (KeyValueOffset kvo : dupsV2) dupsV2Keys.add(kvo.key());
+ for (KeyValueOffset kvo : appends1) {
+ if (!dupsV1Keys.contains(kvo.key()) &&
!dupsV2Keys.contains(kvo.key())) {
+ v0RecordKeysWithNoV1V2Updates.add(String.valueOf(kvo.key()));
+ }
+ }
+
+ List<KeyValueOffset> appends2 = new ArrayList<>(appends1);
+ appends2.addAll(dupsV0);
+ appends2.add(new KeyValueOffset(largeMessageKey, largeMessage.value(),
largeMessageOffset));
+ appends2.addAll(dupsV1);
+ appends2.addAll(dupsV2);
+
+ // roll the log so that all appended messages can be compacted
+ theLog.roll();
+ long firstDirty2 = theLog.activeSegment().baseOffset();
+ checkLastCleaned("log", 0, firstDirty2);
+
+ checkLogAfterAppendingDups(theLog, startSize, appends2);
+ checkLogAfterConvertingToV2(compressionType, theLog,
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates);
+ }
+
+ @ParameterizedTest
+ @EnumSource(mode = EnumSource.Mode.EXCLUDE, names = "ZSTD")
+ public void testCleaningNestedMessagesWithV0V1(CompressionType
compressionType) throws Exception {
+ Compression compression = Compression.of(compressionType).build();
+ int maxMessageSize = 192;
+ cleaner = makeCleaner(TOPIC_PARTITIONS, maxMessageSize, 15000L, 256);
+
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+ LogConfig logConfig = new LogConfig(logConfigProperties(new
Properties(), maxMessageSize,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, DEFAULT_MIN_COMPACTION_LAG_MS,
+ DEFAULT_DELETE_DELAY, 256, DEFAULT_MAX_COMPACTION_LAG_MS));
+ theLog.updateConfig(logConfig);
+
+ // with compression enabled, these messages will be written as a
single message containing all the individual messages
+ List<KeyValueOffset> appendsV0 = new
ArrayList<>(writeDupsSingleMessageSet(2, 3, theLog, compression, 0,
RecordBatch.MAGIC_VALUE_V0));
+ appendsV0.addAll(writeDupsSingleMessageSet(2, 2, theLog, compression,
3, RecordBatch.MAGIC_VALUE_V0));
+
+ List<KeyValueOffset> appendsV1 = new
ArrayList<>(writeDupsSingleMessageSet(2, 2, theLog, compression, 4,
RecordBatch.MAGIC_VALUE_V1));
+ appendsV1.addAll(writeDupsSingleMessageSet(2, 2, theLog, compression,
4, RecordBatch.MAGIC_VALUE_V1));
+ appendsV1.addAll(writeDupsSingleMessageSet(2, 2, theLog, compression,
6, RecordBatch.MAGIC_VALUE_V1));
+
+ List<KeyValueOffset> appends = new ArrayList<>(appendsV0);
+ appends.addAll(appendsV1);
+
+ Set<Integer> appendsV1Keys = new HashSet<>();
+ for (KeyValueOffset kvo : appendsV1) appendsV1Keys.add(kvo.key());
+ Set<String> v0RecordKeysWithNoV1V2Updates = new HashSet<>();
+ for (KeyValueOffset kvo : appendsV0) {
+ if (!appendsV1Keys.contains(kvo.key())) {
+ v0RecordKeysWithNoV1V2Updates.add(String.valueOf(kvo.key()));
+ }
+ }
+
+ // roll the log so that all appended messages can be compacted
+ theLog.roll();
+ long startSize = theLog.size();
+ cleaner.startup();
+
+ long firstDirty = theLog.activeSegment().baseOffset();
+ assertTrue(firstDirty >= appends.size()); // ensure we clean data from
V0 and V1
+
+ checkLastCleaned("log", 0, firstDirty);
+ long compactedSize =
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+ assertTrue(startSize > compactedSize,
+ "log should have been compacted: startSize=" + startSize + "
compactedSize=" + compactedSize);
+
+ checkLogAfterAppendingDups(theLog, startSize, appends);
+ checkLogAfterConvertingToV2(compressionType, theLog,
logConfig.messageTimestampType, v0RecordKeysWithNoV1V2Updates);
+ }
+
+ @ParameterizedTest
+ @EnumSource(CompressionType.class)
+ public void cleanerConfigUpdateTest(CompressionType compressionType)
throws Exception {
+ Compression codec = Compression.of(compressionType).build();
+ int largeMessageKey = 20;
+ ValueAndRecords largeMessage =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE,
codec);
+ int maxMessageSize = largeMessage.records().sizeInBytes();
+
+ cleaner = makeCleaner(TOPIC_PARTITIONS, 1L, maxMessageSize, 1);
+ UnifiedLog theLog = cleaner.logs().get(TOPIC_PARTITIONS.get(0));
+
+ writeDups(100, 3, theLog, codec);
+ long startSize = theLog.size();
+ cleaner.startup();
+ assertEquals(1, cleaner.cleanerCount());
+
+ // Verify no cleaning with LogCleanerIoBufferSizeProp=1
+ long firstDirty = theLog.activeSegment().baseOffset();
+ TopicPartition topicPartition = new TopicPartition("log", 0);
+ cleaner.awaitCleaned(topicPartition, firstDirty, 10);
+ assertTrue(cleaner.cleanerManager().allCleanerCheckpoints().isEmpty(),
"Should not have cleaned");
+
+ ConfigDef configDef = Utils.mergeConfigs(List.of(
+ CleanerConfig.CONFIG_DEF,
+ ServerConfigs.CONFIG_DEF
+ ));
+
+ CleanerConfig currentConfig = cleaner.currentConfig();
+ Map<String, Object> oldConfigMap = new HashMap<>();
+ oldConfigMap.put(CleanerConfig.LOG_CLEANER_THREADS_PROP,
currentConfig.numThreads);
+ oldConfigMap.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
currentConfig.dedupeBufferSize);
+
oldConfigMap.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
currentConfig.dedupeBufferLoadFactor);
+ oldConfigMap.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
currentConfig.ioBufferSize);
+ oldConfigMap.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG,
currentConfig.maxMessageSize);
+
oldConfigMap.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
currentConfig.maxIoBytesPerSecond);
+ oldConfigMap.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP,
currentConfig.backoffMs);
+ oldConfigMap.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP,
currentConfig.enableCleaner);
+ AbstractConfig oldAbstractConfig = new AbstractConfig(configDef,
oldConfigMap);
+
+ Map<String, Object> newConfigMap = new HashMap<>(oldConfigMap);
+ newConfigMap.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, 2);
+ newConfigMap.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
100000);
+ AbstractConfig newAbstractConfig = new AbstractConfig(configDef,
newConfigMap);
+
+ cleaner.reconfigure(oldAbstractConfig, newAbstractConfig);
+
+ assertEquals(2, cleaner.cleanerCount());
+ checkLastCleaned("log", 0, firstDirty);
+ long compactedSize =
theLog.logSegments().stream().mapToLong(LogSegment::size).sum();
+ assertTrue(startSize > compactedSize,
+ "log should have been compacted: startSize=" + startSize + "
compactedSize=" + compactedSize);
+ }
+
+ private void checkLastCleaned(String topic, int partitionId, long
firstDirty) throws InterruptedException {
+ // wait until cleaning up to base_offset, note that cleaning happens
only when "log dirty ratio" is higher than
+ // TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ cleaner.awaitCleaned(topicPartition, firstDirty, 60000L);
+ Long lastCleaned =
cleaner.cleanerManager().allCleanerCheckpoints().get(topicPartition);
+ assertTrue(lastCleaned >= firstDirty,
+ "log cleaner should have processed up to offset " + firstDirty +
", but lastCleaned=" + lastCleaned);
+ }
+
+ private void checkLogAfterAppendingDups(UnifiedLog log, long startSize,
List<KeyValueOffset> appends) {
+ List<KeyValueOffset> read = readFromLogFull(log);
+ assertEquals(toMap(appends), toMap(read), "Contents of the map
shouldn't change");
+ assertTrue(startSize > log.size());
+ }
+
+ private Map<Integer, KeyValueOffset> toMap(List<KeyValueOffset> messages) {
+ Map<Integer, KeyValueOffset> result = new HashMap<>();
+ for (KeyValueOffset kvo : messages) {
+ result.put(kvo.key(), kvo);
+ }
+ return result;
+ }
+
+ private List<KeyValueOffset> readFromLogFull(UnifiedLog log) {
+ List<KeyValueOffset> result = new ArrayList<>();
+ for (LogSegment segment : log.logSegments()) {
+ for (Record record : segment.log().records()) {
+ int key =
Integer.parseInt(LogTestUtils.readString(record.key()));
+ String value = LogTestUtils.readString(record.value());
+ result.add(new KeyValueOffset(key, value, record.offset()));
+ }
+ }
+ return result;
+ }
+
+ private List<KeyValueOffset> writeDupsSingleMessageSet(int numKeys, int
numDups, UnifiedLog log,
+ Compression codec,
int startKey,
+ byte magicValue)
throws IOException {
+ List<KeyValueOffset> kvs = new ArrayList<>();
+ List<SimpleRecord> records = new ArrayList<>();
+ for (int i = 0; i < numDups; i++) {
+ for (int key = startKey; key < startKey + numKeys; key++) {
+ String value = String.valueOf(counter());
+ incCounter();
+ kvs.add(new KeyValueOffset(key, value, 0));
+ records.add(new SimpleRecord(
+ time.milliseconds(),
+ String.valueOf(key).getBytes(),
+ value.getBytes()));
+ }
+ }
+
+ LogAppendInfo appendInfo = log.appendAsLeaderWithRecordVersion(
+ MemoryRecords.withRecords(magicValue, codec, records.toArray(new
SimpleRecord[0])),
+ 0, RecordVersion.lookup(magicValue));
+ // move LSO forward to increase compaction bound
+ log.updateHighWatermark(log.logEndOffset());
+
+ long[] offsets = LongStream.rangeClosed(appendInfo.firstOffset(),
appendInfo.lastOffset()).toArray();
+
+ List<KeyValueOffset> result = new ArrayList<>();
+ for (int i = 0; i < kvs.size(); i++) {
+ KeyValueOffset kvo = kvs.get(i);
+ result.add(new KeyValueOffset(kvo.key(), kvo.value(), offsets[i]));
+ }
+ return result;
+ }
+
+ private void checkLogAfterConvertingToV2(CompressionType compressionType,
UnifiedLog log,
+ TimestampType timestampType,
Set<String> keysForV0RecordsWithNoV1V2Updates) {
+ for (LogSegment segment : log.logSegments()) {
+ for (RecordBatch recordBatch : segment.log().batches()) {
+ // Uncompressed v0/v1 records are always converted into single
record v2 batches via compaction if they are retained
+ // Compressed v0/v1 record batches are converted into record
batches v2 with one or more records (depending on the
+ // number of retained records after compaction)
+ assertEquals(RecordVersion.V2.value, recordBatch.magic());
+ List<Record> recordList = new ArrayList<>();
+ recordBatch.forEach(recordList::add);
+ if (compressionType == CompressionType.NONE) {
+ assertEquals(1, recordList.size());
+ } else {
+ assertFalse(recordList.isEmpty());
+ }
+
+ Record firstRecord = recordBatch.iterator().next();
+ String firstRecordKey =
LogTestUtils.readString(firstRecord.key());
+ if
(keysForV0RecordsWithNoV1V2Updates.contains(firstRecordKey)) {
+ assertEquals(TimestampType.CREATE_TIME,
recordBatch.timestampType());
+ } else {
+ assertEquals(timestampType, recordBatch.timestampType());
+ }
+
+ for (Record record : recordBatch) {
+ String recordKey = LogTestUtils.readString(record.key());
+ if (keysForV0RecordsWithNoV1V2Updates.contains(recordKey))
{
+ assertEquals(RecordBatch.NO_TIMESTAMP,
record.timestamp(),
+ "Record " + recordKey + " with unexpected
timestamp ");
+ } else {
+ assertNotEquals(RecordBatch.NO_TIMESTAMP,
record.timestamp(),
+ "Record " + recordKey + " with unexpected
timestamp " + RecordBatch.NO_TIMESTAMP);
+ }
+ }
+ }
+ }
+ }
+
private void breakPartitionLog(TopicPartition tp) throws IOException {
UnifiedLog theLog = cleaner.logs().get(tp);
writeDups(20, 3, theLog, codec);
@@ -336,24 +744,16 @@ public class LogCleanerLagIntegrationTest {
private Map<Integer, Integer> readFromLog(UnifiedLog log) {
Map<Integer, Integer> result = new HashMap<>();
- for (LogSegment segment : log.logSegments()) {
- for (Record record : segment.log().records()) {
- int key =
Integer.parseInt(LogTestUtils.readString(record.key()));
- int value =
Integer.parseInt(LogTestUtils.readString(record.value()));
- result.put(key, value);
- }
+ for (KeyValueOffset kvo : readFromLogFull(log)) {
+ result.put(kvo.key(), Integer.parseInt(kvo.value()));
}
return result;
}
private List<int[]> readKeyValuePairsFromLog(UnifiedLog log) {
List<int[]> result = new ArrayList<>();
- for (LogSegment segment : log.logSegments()) {
- for (Record record : segment.log().records()) {
- int key =
Integer.parseInt(LogTestUtils.readString(record.key()));
- int value =
Integer.parseInt(LogTestUtils.readString(record.value()));
- result.add(new int[]{key, value});
- }
+ for (KeyValueOffset kvo : readFromLogFull(log)) {
+ result.add(new int[]{kvo.key(), Integer.parseInt(kvo.value())});
}
return result;
}
@@ -524,6 +924,56 @@ public class LogCleanerLagIntegrationTest {
new Properties());
}
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ int maxMessageSize,
+ long backoffMs,
+ int segmentSize) throws IOException {
+ return makeCleaner(partitions,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+ 1,
+ backoffMs,
+ maxMessageSize,
+ DEFAULT_MIN_COMPACTION_LAG_MS,
+ DEFAULT_DELETE_DELAY,
+ segmentSize,
+ DEFAULT_MAX_COMPACTION_LAG_MS,
+ null,
+ new Properties());
+ }
+
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ Properties propertyOverrides,
+ long backoffMs) throws IOException {
+ return makeCleaner(partitions,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+ 1,
+ backoffMs,
+ DEFAULT_MAX_MESSAGE_SIZE,
+ DEFAULT_MIN_COMPACTION_LAG_MS,
+ DEFAULT_DELETE_DELAY,
+ DEFAULT_SEGMENT_SIZE,
+ DEFAULT_MAX_COMPACTION_LAG_MS,
+ null,
+ propertyOverrides);
+ }
+
+ private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
+ long backoffMs,
+ int maxMessageSize,
+ int cleanerIoBufferSize) throws IOException
{
+ return makeCleaner(partitions,
+ DEFAULT_MIN_CLEANABLE_DIRTY_RATIO,
+ 1,
+ backoffMs,
+ maxMessageSize,
+ DEFAULT_MIN_COMPACTION_LAG_MS,
+ DEFAULT_DELETE_DELAY,
+ DEFAULT_SEGMENT_SIZE,
+ DEFAULT_MAX_COMPACTION_LAG_MS,
+ cleanerIoBufferSize,
+ new Properties());
+ }
+
private LogCleaner makeCleaner(Iterable<TopicPartition> partitions,
long backoffMs,
long minCompactionLagMs,
@@ -594,21 +1044,8 @@ public class LogCleanerLagIntegrationTest {
return new ValueAndRecords(value, records);
}
- private void closeLog(UnifiedLog log) throws IOException {
+ private void closeLog(UnifiedLog log) {
log.close();
logs.remove(log);
}
-
- @AfterEach
- public void teardown() throws IOException, InterruptedException {
- kafka.utils.TestUtils.clearYammerMetrics();
- if (cleaner != null) {
- cleaner.shutdown();
- }
- time.scheduler.shutdown();
- for (UnifiedLog log : logs) {
- log.close();
- }
- Utils.delete(logDir);
- }
}