This is an automated email from the ASF dual-hosted git repository.
payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fb3d6fb546f KAFKA-19752 Move parts of UnifiedLogTest to storage module
(#21763)
fb3d6fb546f is described below
commit fb3d6fb546ffd1eec4d21407e13056247c7d19b1
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 17 22:15:58 2026 +0800
KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21763)
testRetentionIdempotency ~ testLogRollAfterLogHandlerClosed
Reviewers: PoAn Yang <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 675 +------------------
.../kafka/storage/internals/log/LogTestUtils.java | 43 ++
.../storage/internals/log/UnifiedLogTest.java | 734 ++++++++++++++++++++-
4 files changed, 777 insertions(+), 677 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3008a79ce34..8d00d2dca5b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -305,7 +305,7 @@
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
files="(UnifiedLog|RemoteLogManager|RemoteLogManagerTest|UnifiedLogTest).java"/>
<suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
- <suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>
+ <suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|UnifiedLogTest).java"/>
<!-- benchmarks -->
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 63cb90ab3cd..b01375e59ae 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -41,8 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffs
import org.apache.kafka.server.util.{MockTime, Scheduler}
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
UnifiedLog, VerificationGuard}
-import org.apache.kafka.storage.internals.utils.Throttler
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -58,7 +57,7 @@ import java.nio.ByteBuffer
import java.nio.file.Files
import java.util
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
-import java.util.{Optional, OptionalLong, Properties}
+import java.util.{Optional, Properties}
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@@ -92,676 +91,6 @@ class UnifiedLogTest {
}
}
- @Test
- def testRetentionIdempotency(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
-
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes))), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), 0)
-
- mockTime.sleep(901)
-
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(1L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- assertEquals(2, log.deleteOldSegments(),
- "Expecting two segment deletions as log start offset retention should
unblock time based retention")
- assertEquals(0, log.deleteOldSegments())
- }
-
-
- @Test
- def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
- val pid1 = 1L
- val epoch = 0.toShort
-
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 1), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 2), 0)
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
- // Increment the log start offset to exclude the first two segments.
- log.maybeIncrementLogStartOffset(log.logEndOffset - 1,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- // Sleep to breach the file delete delay and run scheduled file deletion
tasks
- mockTime.sleep(1)
- assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
- "expect a single producer state snapshot remaining")
- }
-
- @Test
- def testCompactionDeletesProducerStateSnapshots(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
cleanupPolicy = TopicConfig.CLEANUP_POLICY_COMPACT, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
- val pid1 = 1L
- val epoch = 0.toShort
- val cleaner = new Cleaner(0,
- new FakeOffsetMap(Int.MaxValue),
- 64 * 1024,
- 64 * 1024,
- 0.75,
- new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries",
mockTime),
- mockTime,
- tp => {})
-
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes,
"a".getBytes())), producerId = pid1,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes,
"b".getBytes())), producerId = pid1,
- producerEpoch = epoch, sequence = 1), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes,
"c".getBytes())), producerId = pid1,
- producerEpoch = epoch, sequence = 2), 0)
- log.updateHighWatermark(log.logEndOffset)
-
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1),
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
- "expected a snapshot file per segment base offset, except the first
segment")
- assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
- // Clean segments, this should delete everything except the active segment
since there only
- // exists the key "a".
- cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
- // There is no other key so we don't delete anything
- assertEquals(0, log.deleteOldSegments())
- // Sleep to breach the file delete delay and run scheduled file deletion
tasks
- mockTime.sleep(1)
-
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1),
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
- "expected a snapshot file per segment base offset, excluding the first")
- }
-
- /**
- * After loading the log, producer state is truncated such that there are no
producer state snapshot files which
- * exceed the log end offset. This test verifies that these are removed.
- */
- @Test
- def testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset(): Unit = {
- val straySnapshotFile = LogFileUtils.producerSnapshotFile(logDir,
42).toPath
- Files.createFile(straySnapshotFile)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
- createLog(logDir, logConfig)
- assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size,
- "expected producer state snapshots greater than the log end offset to be
cleaned up")
- }
-
- @Test
- def testProducerIdMapTruncateFullyAndStartAt(): Unit = {
- val records = TestUtils.singletonRecords("foo".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
- val log = createLog(logDir, logConfig)
- log.appendAsLeader(records, 0)
- log.takeProducerSnapshot()
-
- log.appendAsLeader(TestUtils.singletonRecords("bar".getBytes), 0)
- log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), 0)
- log.takeProducerSnapshot()
-
- assertEquals(3, log.logSegments.size)
- assertEquals(3, log.latestProducerStateEndOffset)
- assertEquals(OptionalLong.of(3), log.latestProducerSnapshotOffset)
-
- log.truncateFullyAndStartAt(29, Optional.empty)
- assertEquals(1, log.logSegments.size)
- assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset)
- assertEquals(29, log.latestProducerStateEndOffset)
- }
-
- @Test
- def testProducerIdExpirationOnSegmentDeletion(): Unit = {
- val pid1 = 1L
- val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)),
producerId = pid1, producerEpoch = 0, sequence = 0)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
- val log = createLog(logDir, logConfig)
- log.appendAsLeader(records, 0)
- log.takeProducerSnapshot()
-
- val pid2 = 2L
- log.appendAsLeader(TestUtils.records(Seq(new
SimpleRecord("bar".getBytes)), producerId = pid2, producerEpoch = 0, sequence =
0),
- 0)
- log.appendAsLeader(TestUtils.records(Seq(new
SimpleRecord("baz".getBytes)), producerId = pid2, producerEpoch = 0, sequence =
1),
- 0)
- log.takeProducerSnapshot()
-
- assertEquals(3, log.logSegments.size)
- assertEquals(util.Set.of(pid1, pid2),
log.activeProducersWithLastSequence.keySet)
-
- log.updateHighWatermark(log.logEndOffset)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
-
- // Producer state should not be removed when deleting log segment
- assertEquals(2, log.logSegments.size)
- assertEquals(util.Set.of(pid1, pid2),
log.activeProducersWithLastSequence.keySet)
- }
-
- @Test
- def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint(): Unit
= {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), 0)
- log.roll(Optional.of(1L))
- assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset)
- assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset)
-
- log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), 0)
- log.roll(Optional.of(2L))
- assertEquals(OptionalLong.of(2L), log.latestProducerSnapshotOffset)
- assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset)
-
- log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), 0)
- log.roll(Optional.of(3L))
- assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset)
-
- // roll triggers a flush at the starting offset of the new segment, we
should retain all snapshots
- assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset)
-
- // even if we flush within the active segment, the snapshot should remain
- log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), 0)
- log.flushUptoOffsetExclusive(4L)
- assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset)
- }
-
- @Test
- def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = {
- val producerId = 1L
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024)
- val log = createLog(logDir, logConfig)
-
- log.appendAsLeader(TestUtils.records(Seq(new
SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
- producerId = producerId, producerEpoch = 0, sequence = 0),
- 0)
-
- // The next append should overflow the segment and cause it to roll
- log.appendAsLeader(TestUtils.records(Seq(new
SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
- producerId = producerId, producerEpoch = 0, sequence = 1),
- 0)
-
- assertEquals(2, log.logSegments.size)
- assertEquals(1L, log.activeSegment.baseOffset)
- assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset)
-
- // Force a reload from the snapshot to check its consistency
- log.truncateTo(1L)
-
- assertEquals(2, log.logSegments.size)
- assertEquals(1L, log.activeSegment.baseOffset)
- assertTrue(log.activeSegment.log.batches.asScala.isEmpty)
- assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset)
-
- val lastEntry = log.producerStateManager.lastEntry(producerId)
- assertTrue(lastEntry.isPresent)
- assertEquals(0L, lastEntry.get.firstDataOffset)
- assertEquals(0L, lastEntry.get.lastDataOffset)
- }
-
- @Test
- def testRebuildTransactionalState(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val pid = 137L
- val epoch = 5.toShort
- val seq = 0
-
- // add some transactional records
- val records = MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("foo".getBytes),
- new SimpleRecord("bar".getBytes),
- new SimpleRecord("baz".getBytes))
- log.appendAsLeader(records, 0)
- val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid,
epoch, ControlRecordType.ABORT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
- log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
-
- // now there should be no first unstable offset
- assertEquals(Optional.empty, log.firstUnstableOffset)
-
- log.close()
-
- val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
- reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset + 1)
- assertEquals(Optional.empty, reopenedLog.firstUnstableOffset)
- }
-
- @Test
- def testPeriodicProducerIdExpiration(): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(200, false)
- val producerIdExpirationCheckIntervalMs = 100
-
- val pid = 23L
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig,
- producerIdExpirationCheckIntervalMs =
producerIdExpirationCheckIntervalMs)
- val records = Seq(new SimpleRecord(mockTime.milliseconds(),
"foo".getBytes))
- log.appendAsLeader(TestUtils.records(records, producerId = pid,
producerEpoch = 0, sequence = 0), 0)
-
- assertEquals(util.Set.of(pid), log.activeProducersWithLastSequence.keySet)
-
- mockTime.sleep(producerIdExpirationCheckIntervalMs)
- assertEquals(util.Set.of(pid), log.activeProducersWithLastSequence.keySet)
-
- mockTime.sleep(producerIdExpirationCheckIntervalMs)
- assertEquals(util.Set.of(), log.activeProducersWithLastSequence.keySet)
- }
-
- @Test
- def testDuplicateAppends(): Unit = {
- // create a log
- val log = createLog(logDir, new LogConfig(new Properties))
- val pid = 1L
- val epoch: Short = 0
-
- var seq = 0
- // Pad the beginning of the log.
- for (_ <- 0 to 5) {
- val record = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = seq)
- log.appendAsLeader(record, 0)
- seq = seq + 1
- }
- // Append an entry with multiple log records.
- def createRecords = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes,
s"value-$seq".getBytes),
- new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes,
s"value-$seq".getBytes),
- new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes,
s"value-$seq".getBytes)
- ), producerId = pid, producerEpoch = epoch, sequence = seq)
- val multiEntryAppendInfo = log.appendAsLeader(createRecords, 0)
- assertEquals(
- multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1,
- 3,
- "should have appended 3 entries"
- )
-
- // Append a Duplicate of the tail, when the entry at the tail has multiple
records.
- val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, 0)
- assertEquals(
- multiEntryAppendInfo.firstOffset,
- dupMultiEntryAppendInfo.firstOffset,
- "Somehow appended a duplicate entry with multiple log records to the
tail"
- )
- assertEquals(multiEntryAppendInfo.lastOffset,
dupMultiEntryAppendInfo.lastOffset,
- "Somehow appended a duplicate entry with multiple log records to the
tail")
-
- seq = seq + 3
-
- // Append a partial duplicate of the tail. This is not allowed.
- var records = TestUtils.records(
- List(
- new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes,
s"value-$seq".getBytes),
- new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes,
s"value-$seq".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = seq - 2)
- assertThrows(classOf[OutOfOrderSequenceException], () =>
log.appendAsLeader(records, 0),
- () => "Should have received an OutOfOrderSequenceException since we
attempted to append a duplicate of a records in the middle of the log.")
-
- // Append a duplicate of the batch which is 4th from the tail. This should
succeed without error since we
- // retain the batch metadata of the last 5 batches.
- val duplicateOfFourth = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 2)
- log.appendAsLeader(duplicateOfFourth, 0)
-
- // Duplicates at older entries are reported as OutOfOrderSequence errors
- records = TestUtils.records(
- List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes,
s"value-1".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 1)
- assertThrows(classOf[OutOfOrderSequenceException], () =>
log.appendAsLeader(records, 0),
- () => "Should have received an OutOfOrderSequenceException since we
attempted to append a duplicate of a batch which is older than the last 5
appended batches.")
-
- // Append a duplicate entry with a single records at the tail of the log.
This should return the appendInfo of the original entry.
- def createRecordsWithDuplicate = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = seq)
- val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 0)
- val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 0)
- assertEquals(
- origAppendInfo.firstOffset,
- newAppendInfo.firstOffset,
- "Inserted a duplicate records into the log"
- )
- assertEquals(origAppendInfo.lastOffset, newAppendInfo.lastOffset,
- "Inserted a duplicate records into the log")
- }
-
- @Test
- def testMultipleProducerIdsPerMemoryRecord(): Unit = {
- // create a log
- val log = createLog(logDir, new LogConfig(new Properties))
-
- val producerEpoch: Short = 0
- val partitionLeaderEpoch = 0
- val buffer = ByteBuffer.allocate(512)
-
- var builder = MemoryRecords.builder(
- buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 1L,
producerEpoch, 0, false,
- partitionLeaderEpoch
- )
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 2L,
producerEpoch, 0, false,
- partitionLeaderEpoch)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- builder = MemoryRecords.builder(
- buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 3L,
producerEpoch, 0, false,
- partitionLeaderEpoch
- )
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- builder = MemoryRecords.builder(
- buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 4L,
producerEpoch, 0, false,
- partitionLeaderEpoch
- )
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- buffer.flip()
- val memoryRecords = MemoryRecords.readableRecords(buffer)
-
- log.appendAsFollower(memoryRecords, partitionLeaderEpoch)
- log.flush(false)
-
- val fetchedData = LogTestUtils.readLog(log, 0, Int.MaxValue)
-
- val origIterator = memoryRecords.batches.iterator()
- for (batch <- fetchedData.records.batches.asScala) {
- assertTrue(origIterator.hasNext)
- val origEntry = origIterator.next()
- assertEquals(origEntry.producerId, batch.producerId)
- assertEquals(origEntry.baseOffset, batch.baseOffset)
- assertEquals(origEntry.baseSequence, batch.baseSequence)
- }
- }
-
- @Test
- def testDuplicateAppendToFollower(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
- val producerEpoch: Short = 0
- val pid = 1L
- val baseSequence = 0
- val partitionLeaderEpoch = 0
- // The point of this test is to ensure that validation isn't performed on
the follower.
- // this is a bit contrived. to trigger the duplicate case for a follower
append, we have to append
- // a batch with matching sequence numbers, but valid increasing offsets
- assertEquals(0L, log.logEndOffset)
- log.appendAsFollower(
- MemoryRecords.withIdempotentRecords(
- 0L,
- Compression.NONE,
- pid,
- producerEpoch,
- baseSequence,
- partitionLeaderEpoch,
- new SimpleRecord("a".getBytes),
- new SimpleRecord("b".getBytes)
- ),
- partitionLeaderEpoch
- )
- log.appendAsFollower(
- MemoryRecords.withIdempotentRecords(
- 2L,
- Compression.NONE,
- pid,
- producerEpoch,
- baseSequence,
- partitionLeaderEpoch,
- new SimpleRecord("a".getBytes),
- new SimpleRecord("b".getBytes)
- ),
- partitionLeaderEpoch
- )
-
- // Ensure that even the duplicate sequences are accepted on the follower.
- assertEquals(4L, log.logEndOffset)
- }
-
- @Test
- def testMultipleProducersWithDuplicatesInSingleAppend(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val pid1 = 1L
- val pid2 = 2L
- val producerEpoch: Short = 0
-
- val buffer = ByteBuffer.allocate(512)
-
- // pid1 seq = 0
- var builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), pid1,
producerEpoch, 0)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- // pid2 seq = 0
- builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), pid2,
producerEpoch, 0)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- // pid1 seq = 1
- builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), pid1,
producerEpoch, 1)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- // pid2 seq = 1
- builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), pid2,
producerEpoch, 1)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- // // pid1 seq = 1 (duplicate)
- builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE,
- TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), pid1,
producerEpoch, 1)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
-
- buffer.flip()
-
- val epoch = 0
- val records = MemoryRecords.readableRecords(buffer)
- records.batches.forEach(_.setPartitionLeaderEpoch(epoch))
-
- // Ensure that batches with duplicates are accepted on the follower.
- assertEquals(0L, log.logEndOffset)
- log.appendAsFollower(records, epoch)
- assertEquals(5L, log.logEndOffset)
- }
-
- @Test
- def testOldProducerEpoch(): Unit = {
- // create a log
- val log = createLog(logDir, new LogConfig(new Properties))
- val pid = 1L
- val newEpoch: Short = 1
- val oldEpoch: Short = 0
-
- val records = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
producerId = pid, producerEpoch = newEpoch, sequence = 0)
- log.appendAsLeader(records, 0)
-
- val nextRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
producerId = pid, producerEpoch = oldEpoch, sequence = 0)
- assertThrows(classOf[InvalidProducerEpochException], () =>
log.appendAsLeader(nextRecords, 0))
- }
-
- @Test
- def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig)
- val pid1 = 1L
- val pid2 = 2L
- val epoch = 0.toShort
-
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
-
- assertEquals(2, log.activeProducersWithLastSequence.size)
- assertEquals(2, ProducerStateManager.listSnapshotFiles(log.dir).size)
-
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(2L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")// force retention to kick in so that the snapshot files are cleaned
up.
- mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so
file deletion takes place
-
- // Deleting records should not remove producer state but should delete
snapshots after the file deletion delay.
- assertEquals(2, log.activeProducersWithLastSequence.size)
- assertEquals(1, ProducerStateManager.listSnapshotFiles(log.dir).size)
- val retainedLastSeq = log.activeProducersWithLastSequence.get(pid2)
- assertEquals(0, retainedLastSeq)
- }
-
- /**
- * Test for jitter s for time based log roll. This test appends messages
then changes the time
- * using the mock clock to force the log to roll and checks the number of
segments.
- */
- @Test
- def testTimeBasedLogRollJitter(): Unit = {
- var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp =
mockTime.milliseconds)
- val maxJitter = 20 * 60L
- // create a log
- val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L,
segmentJitterMs = maxJitter)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "Log begins with a single empty
segment.")
- log.appendAsLeader(set, 0)
-
- mockTime.sleep(log.config.segmentMs - maxJitter)
- set = TestUtils.singletonRecords(value = "test".getBytes, timestamp =
mockTime.milliseconds)
- log.appendAsLeader(set, 0)
- assertEquals(1, log.numberOfSegments,
- "Log does not roll on this append because it occurs earlier than max
jitter")
- mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
- set = TestUtils.singletonRecords(value = "test".getBytes, timestamp =
mockTime.milliseconds)
- log.appendAsLeader(set, 0)
- assertEquals(2, log.numberOfSegments,
- "Log should roll after segmentMs adjusted by random jitter")
- }
-
- /**
- * Test that appending more than the maximum segment size rolls the log
- */
- @Test
- def testSizeBasedLogRoll(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds)
- val setSize = createRecords.sizeInBytes
- val msgPerSeg = 10
- val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10
messages
- // create a log
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
- // segments expire in size
- for (_ <- 1 to (msgPerSeg + 1))
- log.appendAsLeader(createRecords, 0)
- assertEquals(2, log.numberOfSegments,
- "There should be exactly 2 segments.")
- }
-
- /**
- * Test that we can open and append to an empty log
- */
- @Test
- def testLoadEmptyLog(): Unit = {
- createEmptyLogs(logDir, 0)
- val log = createLog(logDir, new LogConfig(new Properties))
- log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds), 0)
- }
-
- /**
- * This test case appends a bunch of messages and checks that we can read
them all back using sequential offsets.
- */
- @Test
- def testAppendAndReadWithSequentialOffsets(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 71)
- val log = createLog(logDir, logConfig)
- val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
-
- for (value <- values)
- log.appendAsLeader(TestUtils.singletonRecords(value = value), 0)
-
- for (i <- values.indices) {
- val read = LogTestUtils.readLog(log, i,
1).records.batches.iterator.next()
- assertEquals(i, read.lastOffset, "Offset read should match order
appended.")
- val actual = read.iterator.next()
- assertNull(actual.key, "Key should be null")
- assertEquals(ByteBuffer.wrap(values(i)), actual.value, "Values not
equal")
- }
- assertEquals(0, LogTestUtils.readLog(log, values.length,
100).records.batches.asScala.size,
- "Reading beyond the last message returns nothing.")
- }
-
- /**
- * This test appends a bunch of messages with non-sequential offsets and
checks that we can an the correct message
- * from any offset less than the logEndOffset including offsets not appended.
- */
- @Test
- def testAppendAndReadWithNonSequentialOffsets(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
- val log = createLog(logDir, logConfig)
- val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
- val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
-
- // now test the case that we give the offsets and use non-sequential
offsets
- for (i <- records.indices) {
- log.appendAsFollower(
- MemoryRecords.withRecords(messageIds(i), Compression.NONE, 0,
records(i)),
- Int.MaxValue
- )
- }
- for (i <- 50 until messageIds.max) {
- val idx = messageIds.indexWhere(_ >= i)
- val read = LogTestUtils.readLog(log, i,
100).records.records.iterator.next()
- assertEquals(messageIds(idx), read.offset, "Offset read should match
message id.")
- assertEquals(records(idx), new SimpleRecord(read), "Message should match
appended.")
- }
- }
-
- /**
- * This test covers an odd case where we have a gap in the offsets that
falls at the end of a log segment.
- * Specifically we create a log where the last message in the first segment
has offset 0. If we
- * then read offset 1, we should expect this read to come from the second
segment, even though the
- * first segment has the greatest lower bound on the offset.
- */
- @Test
- def testReadAtLogGap(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 300)
- val log = createLog(logDir, logConfig)
-
- // keep appending until we have two segments with only a single message in
the second segment
- while (log.numberOfSegments == 1)
- log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), 0)
-
- // now manually truncate off all but one message from the first segment to
create a gap in the messages
- log.logSegments.asScala.head.truncateTo(1)
-
- assertEquals(log.logEndOffset - 1, LogTestUtils.readLog(log, 1,
200).records.batches.iterator.next().lastOffset,
- "A read should now return the last message in the log")
- }
-
- @Test
- def testLogRollAfterLogHandlerClosed(): Unit = {
- val logConfig = LogTestUtils.createLogConfig()
- val log = createLog(logDir, logConfig)
- log.closeHandlers()
- assertThrows(classOf[KafkaStorageException], () =>
log.roll(Optional.of(1L)))
- }
-
private def createKafkaConfigWithRLM: KafkaConfig = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index ed30e25032e..92a1e8cf0d9 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -34,6 +34,7 @@ import org.apache.kafka.server.common.RequestLocal;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -327,4 +328,46 @@ public class LogTestUtils {
return new LogConfig(configs);
}
}
+
+ public static class FakeOffsetMap implements OffsetMap {
+
+ private final Map<String, Long> map = new HashMap<>();
+ private long latestOff = -1L;
+
+ @Override
+ public int slots() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void put(ByteBuffer key, long offset) {
+ latestOff = offset;
+ map.put(new String(Utils.readBytes(key.duplicate()),
StandardCharsets.UTF_8), offset);
+ }
+
+ @Override
+ public long get(ByteBuffer key) {
+ return map.getOrDefault(new
String(Utils.readBytes(key.duplicate()), StandardCharsets.UTF_8), -1L);
+ }
+
+ @Override
+ public void updateLatestOffset(long offset) {
+ latestOff = offset;
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public long latestOffset() {
+ return latestOff;
+ }
+ }
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index 1dc4134c490..a0a207c512a 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
@@ -59,6 +61,7 @@ import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.utils.Throttler;
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
@@ -77,14 +80,17 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.security.DigestException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Random;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@@ -93,6 +99,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -142,7 +149,7 @@ public class UnifiedLogTest {
@Test
public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException
{
- SimpleRecord[] records = java.util.stream.IntStream.range(0, 50)
+ SimpleRecord[] records = IntStream.range(0, 50)
.mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
.toArray(SimpleRecord[]::new);
@@ -167,7 +174,7 @@ public class UnifiedLogTest {
@Test
public void
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache()
throws IOException {
- int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray();
+ int[] messageIds = IntStream.range(0, 50).toArray();
SimpleRecord[] records = Arrays.stream(messageIds)
.mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
.toArray(SimpleRecord[]::new);
@@ -1632,6 +1639,709 @@ public class UnifiedLogTest {
assertEquals(3, log.logStartOffset());
}
+ @Test
+ public void testRetentionIdempotency() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .retentionMs(900)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes()))), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes()))), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes()))), 0);
+
+ mockTime.sleep(901);
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(1L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertEquals(
+ 2,
+ log.deleteOldSegments(),
+ "Expecting two segment deletions as log start offset retention
should unblock time based retention"
+ );
+ assertEquals(0, log.deleteOldSegments());
+ }
+
+ @Test
+ public void testLogStartOffsetMovementDeletesSnapshots() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ short epoch = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes())), pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("b".getBytes())), pid1, epoch, 1, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("c".getBytes())), pid1, epoch, 2, 0L), 0);
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+ // Increment the log start offset to exclude the first two segments.
+ log.maybeIncrementLogStartOffset(log.logEndOffset() - 1,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ // Sleep to breach the file delete delay and run scheduled file
deletion tasks
+ mockTime.sleep(1);
+ assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+ "expect a single producer state snapshot remaining");
+ }
+
+ @Test
+ public void testCompactionDeletesProducerStateSnapshots() throws
IOException, DigestException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ short epoch = 0;
+
+ OffsetMap fakeOffsetMap = new LogTestUtils.FakeOffsetMap();
+
+ Cleaner cleaner = new Cleaner(0, fakeOffsetMap, 64 * 1024, 64 * 1024,
0.75,
+ new Throttler(Double.MAX_VALUE, Long.MAX_VALUE, "throttler",
"entries", mockTime),
+ mockTime, tp -> { });
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes(), "a".getBytes())), pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes(), "b".getBytes())), pid1, epoch, 1, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes(), "c".getBytes())), pid1, epoch, 2, 0L), 0);
+ log.updateHighWatermark(log.logEndOffset());
+
+ List<Long> expectedSnapshotOffsets = log.logSegments()
+ .stream()
+ .map(LogSegment::baseOffset)
+ .sorted()
+ .skip(1)
+ .collect(Collectors.toList());
+ List<Long> snapshotOffsets =
ProducerStateManager.listSnapshotFiles(logDir)
+ .stream()
+ .map(f -> f.offset)
+ .sorted()
+ .collect(Collectors.toList());
+ assertEquals(
+ expectedSnapshotOffsets,
+ snapshotOffsets,
+ "expected a snapshot file per segment base offset, except the
first segment"
+ );
+ assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+ // Clean segments, this should delete everything except the active
segment since there only
+ // exists the key "a".
+ cleaner.clean(new LogToClean(log, 0, log.logEndOffset(), false));
+ // There is no other key so we don't delete anything
+ assertEquals(0, log.deleteOldSegments());
+ // Sleep to breach the file delete delay and run scheduled file
deletion tasks
+ mockTime.sleep(1);
+
+ List<Long> expectedSnapshotOffsets2 = log.logSegments().stream()
+
.map(LogSegment::baseOffset).sorted().skip(1).collect(Collectors.toList());
+ List<Long> snapshotOffsets2 =
ProducerStateManager.listSnapshotFiles(logDir).stream()
+ .map(f -> f.offset).sorted().collect(Collectors.toList());
+ assertEquals(expectedSnapshotOffsets2, snapshotOffsets2,
+ "expected a snapshot file per segment base offset, excluding
the first");
+ }
+
+ /**
+ * After loading the log, producer state is truncated such that there are
no producer state snapshot files which
+ * exceed the log end offset. This test verifies that these are removed.
+ */
+ @Test
+ public void testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset()
throws IOException {
+ Files.createFile(LogFileUtils.producerSnapshotFile(logDir,
42).toPath());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .fileDeleteDelayMs(0)
+ .build();
+ createLog(logDir, logConfig);
+ assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size(),
+ "expected producer state snapshots greater than the log end
offset to be cleaned up");
+ }
+
+ @Test
+ public void testProducerIdMapTruncateFullyAndStartAt() throws IOException {
+ MemoryRecords records = singletonRecords("foo".getBytes());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.sizeInBytes())
+ .retentionBytes((long) records.sizeInBytes() * 2)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(records, 0);
+ log.takeProducerSnapshot();
+
+ log.appendAsLeader(singletonRecords("bar".getBytes()), 0);
+ log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+ log.takeProducerSnapshot();
+
+ assertEquals(3, log.logSegments().size());
+ assertEquals(3, log.latestProducerStateEndOffset());
+ assertEquals(OptionalLong.of(3), log.latestProducerSnapshotOffset());
+
+ log.truncateFullyAndStartAt(29, Optional.empty());
+ assertEquals(1, log.logSegments().size());
+ assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset());
+ assertEquals(29, log.latestProducerStateEndOffset());
+ }
+
+ @Test
+ public void testProducerIdExpirationOnSegmentDeletion() throws IOException
{
+ long pid1 = 1L;
+ short epoch = 0;
+ MemoryRecords records = LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes())), pid1, epoch, 0, 0L);
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.sizeInBytes())
+ .retentionBytes((long) records.sizeInBytes() * 2)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(records, 0);
+ log.takeProducerSnapshot();
+
+ long pid2 = 2L;
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("bar".getBytes())), pid2, epoch, 0, 0L), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("baz".getBytes())), pid2, epoch, 1, 0L), 0);
+ log.takeProducerSnapshot();
+
+ assertEquals(3, log.logSegments().size());
+ assertEquals(Set.of(pid1, pid2),
log.activeProducersWithLastSequence().keySet());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+
+ // Producer state should not be removed when deleting log segment
+ assertEquals(2, log.logSegments().size());
+ assertEquals(Set.of(pid1, pid2),
log.activeProducersWithLastSequence().keySet());
+ }
+
+ @Test
+ public void
testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(singletonRecords("a".getBytes()), 0);
+ log.roll(Optional.of(1L));
+ assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+ assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+ log.appendAsLeader(singletonRecords("b".getBytes()), 0);
+ log.roll(Optional.of(2L));
+ assertEquals(OptionalLong.of(2L), log.latestProducerSnapshotOffset());
+ assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+ log.appendAsLeader(singletonRecords("c".getBytes()), 0);
+ log.roll(Optional.of(3L));
+ assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+
+ // roll triggers a flush at the starting offset of the new segment, we
should retain all snapshots
+ assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+ // even if we flush within the active segment, the snapshot should
remain
+ log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+ log.flushUptoOffsetExclusive(4L);
+ assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+ }
+
+ @Test
+ public void testProducerSnapshotAfterSegmentRollOnAppend() throws
IOException {
+ long producerId = 1L;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(), new
byte[512])),
+ producerId, (short) 0, 0, 0L), 0);
+
+ // The next append should overflow the segment and cause it to roll
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(), new
byte[512])),
+ producerId, (short) 0, 1, 0L), 0);
+
+ assertEquals(2, log.logSegments().size());
+ assertEquals(1L, log.activeSegment().baseOffset());
+ assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+ // Force a reload from the snapshot to check its consistency
+ log.truncateTo(1L);
+
+ assertEquals(2, log.logSegments().size());
+ assertEquals(1L, log.activeSegment().baseOffset());
+ assertFalse(log.activeSegment().log().batches().iterator().hasNext());
+ assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+ ProducerStateEntry lastEntry =
log.producerStateManager().lastEntry(producerId).orElse(null);
+ assertNotNull(lastEntry);
+ assertEquals(0L, lastEntry.firstDataOffset());
+ assertEquals(0L, lastEntry.lastDataOffset());
+ }
+
+ @Test
+ public void testRebuildTransactionalState() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ long pid = 137L;
+ short epoch = 5;
+ int seq = 0;
+
+ // add some transactional records
+ MemoryRecords txnRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq,
+ new SimpleRecord("foo".getBytes()),
+ new SimpleRecord("bar".getBytes()),
+ new SimpleRecord("baz".getBytes()));
+ log.appendAsLeader(txnRecords, 0);
+ LogAppendInfo abortAppendInfo =
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
+ ControlRecordType.ABORT, mockTime.milliseconds(), 0, 0,
+ TransactionVersion.TV_0.featureLevel());
+ log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+
+ // now there should be no first unstable offset
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+
+ log.close();
+
+ UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats,
+ mockTime.scheduler, mockTime, producerStateManagerConfig,
false,
+ Optional.empty(), false);
+ reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+ assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset());
+ }
+
+ @Test
+ public void testPeriodicProducerIdExpiration() throws IOException {
+ ProducerStateManagerConfig customConfig = new
ProducerStateManagerConfig(200, false);
+ int producerIdExpirationCheckIntervalMs = 100;
+
+ long pid = 23L;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats,
+ mockTime.scheduler, mockTime, customConfig, true,
Optional.empty(), false,
+ producerIdExpirationCheckIntervalMs);
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"foo".getBytes())),
+ pid, (short) 0, 0, 0L), 0);
+
+ assertEquals(Set.of(pid),
log.activeProducersWithLastSequence().keySet());
+
+ mockTime.sleep(producerIdExpirationCheckIntervalMs);
+ assertEquals(Set.of(pid),
log.activeProducersWithLastSequence().keySet());
+
+ mockTime.sleep(producerIdExpirationCheckIntervalMs);
+ assertEquals(Set.of(), log.activeProducersWithLastSequence().keySet());
+ }
+
+ @Test
+ public void testDuplicateAppends() throws IOException {
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+ long pid = 1L;
+ short epoch = 0;
+
+ int[] seq = {0};
+ // Pad the beginning of the log.
+ for (int i = 0; i <= 5; i++) {
+ MemoryRecords record = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, seq[0], 0L);
+ log.appendAsLeader(record, 0);
+ seq[0]++;
+ }
+ // Append an entry with multiple log records.
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+ ), pid, epoch, seq[0], 0L);
+ LogAppendInfo multiEntryAppendInfo =
log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(3, multiEntryAppendInfo.lastOffset() -
multiEntryAppendInfo.firstOffset() + 1,
+ "should have appended 3 entries");
+
+ // Append a Duplicate of the tail, when the entry at the tail has
multiple records.
+ LogAppendInfo dupMultiEntryAppendInfo =
log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(multiEntryAppendInfo.firstOffset(),
dupMultiEntryAppendInfo.firstOffset(),
+ "Somehow appended a duplicate entry with multiple log records
to the tail");
+ assertEquals(multiEntryAppendInfo.lastOffset(),
dupMultiEntryAppendInfo.lastOffset(),
+ "Somehow appended a duplicate entry with multiple log records
to the tail");
+
+ seq[0] += 3;
+
+ // Append a partial duplicate of the tail. This is not allowed.
+ MemoryRecords partialDup = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+ ), pid, epoch, seq[0] - 2, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(partialDup, 0),
+ () -> "Should have received an OutOfOrderSequenceException
since we attempted to append a duplicate of a records in the middle of the
log.");
+
+ // Append a duplicate of the batch which is 4th from the tail. This
should succeed without error since we
+ // retain the batch metadata of the last 5 batches.
+ MemoryRecords duplicateOfFourth = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 2, 0L);
+ log.appendAsLeader(duplicateOfFourth, 0);
+
+ // Duplicates at older entries are reported as OutOfOrderSequence
errors
+ MemoryRecords oldDup = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key-1".getBytes(), "value-1".getBytes())),
+ pid, epoch, 1, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(oldDup, 0),
+ () -> "Should have received an OutOfOrderSequenceException
since we attempted to append a duplicate of a batch which is older than the
last 5 appended batches.");
+
+ // Append a duplicate entry with a single records at the tail of the
log. This should return the appendInfo of the original entry.
+ Supplier<MemoryRecords> createRecordsWithDuplicate = () ->
LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, seq[0], 0L);
+ LogAppendInfo origAppendInfo =
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+ LogAppendInfo newAppendInfo =
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+ assertEquals(origAppendInfo.firstOffset(),
newAppendInfo.firstOffset(), "Inserted a duplicate records into the log");
+ assertEquals(origAppendInfo.lastOffset(), newAppendInfo.lastOffset(),
"Inserted a duplicate records into the log");
+ }
+
+ @Test
+ public void testMultipleProducerIdsPerMemoryRecord() throws IOException {
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(),
1L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(),
2L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ builder = MemoryRecords.builder(
+ buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(),
3L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ builder = MemoryRecords.builder(
+ buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(),
4L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ buffer.flip();
+ MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+ log.appendAsFollower(memoryRecords, partitionLeaderEpoch);
+ log.flush(false);
+
+ FetchDataInfo fetchedData = log.read(0, Integer.MAX_VALUE,
FetchIsolation.LOG_END, true);
+
+ Iterator<? extends RecordBatch> origIterator =
memoryRecords.batches().iterator();
+ for (RecordBatch batch : fetchedData.records.batches()) {
+ assertTrue(origIterator.hasNext());
+ RecordBatch origEntry = origIterator.next();
+ assertEquals(origEntry.producerId(), batch.producerId());
+ assertEquals(origEntry.baseOffset(), batch.baseOffset());
+ assertEquals(origEntry.baseSequence(), batch.baseSequence());
+ }
+ }
+
+ @Test
+ public void testDuplicateAppendToFollower() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ short producerEpoch = 0;
+ long pid = 1L;
+ int baseSequence = 0;
+ int partitionLeaderEpoch = 0;
+ // The point of this test is to ensure that validation isn't performed
on the follower.
+ // this is a bit contrived. to trigger the duplicate case for a
follower append, we have to append
+ // a batch with matching sequence numbers, but valid increasing offsets
+ assertEquals(0L, log.logEndOffset());
+ log.appendAsFollower(
+ MemoryRecords.withIdempotentRecords(0L, Compression.NONE, pid,
producerEpoch, baseSequence,
+ partitionLeaderEpoch, new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+ partitionLeaderEpoch);
+ log.appendAsFollower(
+ MemoryRecords.withIdempotentRecords(2L, Compression.NONE, pid,
producerEpoch, baseSequence,
+ partitionLeaderEpoch, new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+ partitionLeaderEpoch);
+
+ // Ensure that even the duplicate sequences are accepted on the
follower.
+ assertEquals(4L, log.logEndOffset());
+ }
+
+ @Test
+ public void testMultipleProducersWithDuplicatesInSingleAppend() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ long pid1 = 1L;
+ long pid2 = 2L;
+ short producerEpoch = 0;
+
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+
+ // pid1 seq = 0
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(),
pid1, producerEpoch, 0);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ // pid2 seq = 0
+ builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(),
pid2, producerEpoch, 0);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ // pid1 seq = 1
+ builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(),
pid1, producerEpoch, 1);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ // pid2 seq = 1
+ builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(),
pid2, producerEpoch, 1);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ // pid1 seq = 1 (duplicate)
+ builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(),
pid1, producerEpoch, 1);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ buffer.flip();
+
+ int epoch = 0;
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ records.batches().forEach(b -> b.setPartitionLeaderEpoch(epoch));
+
+ // Ensure that batches with duplicates are accepted on the follower.
+ assertEquals(0L, log.logEndOffset());
+ log.appendAsFollower(records, epoch);
+ assertEquals(5L, log.logEndOffset());
+ }
+
+ @Test
+ public void testOldProducerEpoch() throws IOException {
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+ long pid = 1L;
+ short newEpoch = 1;
+ short oldEpoch = 0;
+
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, newEpoch, 0, 0L);
+ log.appendAsLeader(records, 0);
+
+ MemoryRecords nextRecords = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, oldEpoch, 0, 0L);
+ assertThrows(InvalidProducerEpochException.class, () ->
log.appendAsLeader(nextRecords, 0));
+ }
+
+ @Test
+ public void testDeleteSnapshotsOnIncrementLogStartOffset() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ long pid2 = 2L;
+ short epoch = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())), pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes())), pid2, epoch, 0, 0L), 0);
+ log.roll();
+
+ assertEquals(2, log.activeProducersWithLastSequence().size());
+ assertEquals(2,
ProducerStateManager.listSnapshotFiles(log.dir()).size());
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(2L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ // force retention to kick in so that the snapshot files are cleaned
up.
+ mockTime.sleep(logConfig.fileDeleteDelayMs + 1000); // advance the
clock so file deletion takes place
+
+ // Deleting records should not remove producer state but should delete
snapshots after the file deletion delay.
+ assertEquals(2, log.activeProducersWithLastSequence().size());
+ assertEquals(1,
ProducerStateManager.listSnapshotFiles(log.dir()).size());
+ int retainedLastSeq = log.activeProducersWithLastSequence().get(pid2);
+ assertEquals(0, retainedLastSeq);
+ }
+
+ /**
+ * Test for jitter for time based log roll. This test appends messages
then changes the time
+ * using the mock clock to force the log to roll and checks the number of
segments.
+ */
+ @Test
+ public void testTimeBasedLogRollJitter() throws IOException {
+ MemoryRecords set = singletonRecords("test".getBytes(),
mockTime.milliseconds());
+ long maxJitter = 20 * 60L;
+ // create a log
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentMs(ONE_HOUR)
+ .segmentJitterMs(maxJitter)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+ log.appendAsLeader(set, 0);
+
+ mockTime.sleep(log.config().segmentMs - maxJitter);
+ set = singletonRecords("test".getBytes(), mockTime.milliseconds());
+ log.appendAsLeader(set, 0);
+ assertEquals(1, log.numberOfSegments(),
+ "Log does not roll on this append because it occurs earlier
than max jitter");
+ mockTime.sleep(maxJitter - log.activeSegment().rollJitterMs() + 1);
+ set = singletonRecords("test".getBytes(), mockTime.milliseconds());
+ log.appendAsLeader(set, 0);
+ assertEquals(2, log.numberOfSegments(),
+ "Log should roll after segmentMs adjusted by random jitter");
+ }
+
+ /**
+ * Test that appending more than the maximum segment size rolls the log
+ */
+ @Test
+ public void testSizeBasedLogRoll() throws IOException {
+ MemoryRecords createRecords = singletonRecords("test".getBytes(),
mockTime.milliseconds());
+ int setSize = createRecords.sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * (setSize - 1); // each segment will be
10 messages
+ // create a log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ // segments expire in size
+ for (int i = 0; i <= msgPerSeg; i++) {
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+ }
+ assertEquals(2, log.numberOfSegments(), "There should be exactly 2
segments.");
+ }
+
+ /**
+ * Test that we can open and append to an empty log
+ */
+ @Test
+ public void testLoadEmptyLog() throws IOException {
+ Files.createFile(LogFileUtils.logFile(logDir, 0).toPath());
+ Files.createFile(LogFileUtils.offsetIndexFile(logDir, 0).toPath());
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+ }
+
+ /**
+ * This test case appends a bunch of messages and checks that we can read
them all back using sequential offsets.
+ */
+ @Test
+ public void testAppendAndReadWithSequentialOffsets() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(71).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ byte[][] values = IntStream.iterate(0, i -> i + 2).limit(50)
+ .mapToObj(id -> String.valueOf(id).getBytes())
+ .toArray(byte[][]::new);
+
+ for (byte[] value : values) {
+ log.appendAsLeader(singletonRecords(value), 0);
+ }
+
+ for (int i = 0; i < values.length; i++) {
+ RecordBatch read = log.read(i, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next();
+ assertEquals(i, read.lastOffset(), "Offset read should match order
appended.");
+ Record actual = read.iterator().next();
+ assertNull(actual.key(), "Key should be null");
+ assertEquals(ByteBuffer.wrap(values[i]), actual.value(), "Values
not equal");
+ }
+ long count = 0;
+ for (RecordBatch batch : log.read(values.length, 100,
FetchIsolation.LOG_END, true).records.batches()) {
+ assertNotNull(batch);
+ count++;
+ }
+ assertEquals(0, count, "Reading beyond the last message returns
nothing.");
+ }
+
+ /**
+ * This test appends a bunch of messages with non-sequential offsets and
checks that we can read the correct message
+ * from any offset less than the logEndOffset including offsets not
appended.
+ */
+ @Test
+ public void testAppendAndReadWithNonSequentialOffsets() throws IOException
{
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(72).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int[] seqPart = IntStream.range(0, 50).toArray();
+ int[] nonSeqPart = IntStream.iterate(50, n -> n +
7).limit(22).toArray(); // 50,57,...,197
+ // combined message ids
+ int[] messageIds = new int[seqPart.length + nonSeqPart.length];
+ System.arraycopy(seqPart, 0, messageIds, 0, seqPart.length);
+ System.arraycopy(nonSeqPart, 0, messageIds, seqPart.length,
nonSeqPart.length);
+ SimpleRecord[] records = Arrays.stream(messageIds)
+ .mapToObj(id -> new
SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // now test the case that we give the offsets and use non-sequential
offsets
+ for (int i = 0; i < records.length; i++) {
+ log.appendAsFollower(
+ MemoryRecords.withRecords(messageIds[i], Compression.NONE,
0, records[i]),
+ Integer.MAX_VALUE);
+ }
+
+ int maxId = Arrays.stream(messageIds).max().getAsInt();
+ for (int i = 50; i < maxId; i++) {
+ final int offset = i;
+ int idx = 0;
+ while (idx < messageIds.length && messageIds[idx] < offset) idx++;
+ Record read = log.read(i, 100, FetchIsolation.LOG_END,
true).records.records().iterator().next();
+ assertEquals(messageIds[idx], read.offset(), "Offset read should
match message id.");
+ assertEquals(records[idx], new SimpleRecord(read), "Message should
match appended.");
+ }
+ }
+
+ /**
+ * This test covers an odd case where we have a gap in the offsets that
falls at the end of a log segment.
+ * Specifically we create a log where the last message in the first
segment has offset 0. If we
+ * then read offset 1, we should expect this read to come from the second
segment, even though the
+ * first segment has the greatest lower bound on the offset.
+ */
+ @Test
+ public void testReadAtLogGap() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(300).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // keep appending until we have two segments with only a single
message in the second segment
+ while (log.numberOfSegments() == 1) {
+ log.appendAsLeader(singletonRecords("42".getBytes()), 0);
+ }
+
+ // now manually truncate off all but one message from the first
segment to create a gap in the messages
+ log.logSegments().get(0).truncateTo(1);
+
+ assertEquals(log.logEndOffset() - 1,
+ log.read(1, 200, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset(),
+ "A read should now return the last message in the log");
+ }
+
+ @Test
+ public void testLogRollAfterLogHandlerClosed() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.closeHandlers();
+ assertThrows(KafkaStorageException.class, () ->
log.roll(Optional.of(1L)));
+ }
+
private void assertValidLogOffsetMetadata(UnifiedLog log,
LogOffsetMetadata offsetMetadata) throws IOException {
assertFalse(offsetMetadata.messageOffsetOnly());
@@ -1693,6 +2403,24 @@ public class UnifiedLogTest {
boolean lastShutdownClean,
Optional<Uuid> topicId,
boolean remoteStorageSystemEnable) throws IOException {
+ return createLog(dir, config, logStartOffset, recoveryPoint,
brokerTopicStats, scheduler, time,
+ producerStateManagerConfig, lastShutdownClean, topicId,
remoteStorageSystemEnable,
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT);
+ }
+
+ private UnifiedLog createLog(
+ File dir,
+ LogConfig config,
+ long logStartOffset,
+ long recoveryPoint,
+ BrokerTopicStats brokerTopicStats,
+ Scheduler scheduler,
+ MockTime time,
+ ProducerStateManagerConfig producerStateManagerConfig,
+ boolean lastShutdownClean,
+ Optional<Uuid> topicId,
+ boolean remoteStorageSystemEnable,
+ int producerIdExpirationCheckIntervalMs) throws IOException {
UnifiedLog log = UnifiedLog.create(
dir,
@@ -1704,7 +2432,7 @@ public class UnifiedLogTest {
time,
3600000,
producerStateManagerConfig,
-
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ producerIdExpirationCheckIntervalMs,
new LogDirFailureChannel(10),
lastShutdownClean,
topicId,