This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e3e6858d3f KAFKA-19752 Move parts of UnifiedLogTest to storage module
(#21912)
4e3e6858d3f is described below
commit 4e3e6858d3f689c3610e8f786c08fce8a1679747
Author: Ken Huang <[email protected]>
AuthorDate: Wed Apr 1 18:28:40 2026 +0800
KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21912)
testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure ~
testStaleProducerEpochReturnsRecoverableErrorForTV2Clients
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 619 ---------------------
.../storage/internals/log/UnifiedLogTest.java | 551 +++++++++++++++++-
2 files changed, 536 insertions(+), 634 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
deleted file mode 100755
index 69c3e3efb69..00000000000
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors._
-import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.log.remote.storage.RemoteLogManager
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments,
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{doThrow, spy}
-
-import java.io._
-import java.nio.file.Files
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Optional
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-
-class UnifiedLogTest {
- var config: KafkaConfig = _
- val brokerTopicStats = new BrokerTopicStats
- val tmpDir = TestUtils.tempDir()
- val logDir = TestUtils.randomPartitionLogDir(tmpDir)
- val mockTime = new MockTime()
- var logsToClose: Seq[UnifiedLog] = Seq()
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
- def metricsKeySet =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
-
- @BeforeEach
- def setUp(): Unit = {
- val props = TestUtils.createBrokerConfig(0, port = -1)
- config = KafkaConfig.fromProps(props)
- }
-
- @AfterEach
- def tearDown(): Unit = {
- brokerTopicStats.close()
- logsToClose.foreach(l => Utils.closeQuietly(l, "UnifiedLog"))
- Utils.delete(tmpDir)
- }
-
- def createEmptyLogs(dir: File, offsets: Int*): Unit = {
- for (offset <- offsets) {
- Files.createFile(LogFileUtils.logFile(dir, offset).toPath)
- Files.createFile(LogFileUtils.offsetIndexFile(dir, offset).toPath)
- }
- }
-
- @Test
- def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure():
Unit = {
- val logConfig = LogTestUtils.createLogConfig()
- val log = spy(createLog(logDir, logConfig))
-
- doThrow(new KafkaStorageException("Injected
exception")).when(log).flushProducerStateSnapshot(any())
-
- log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), 0)
- try {
- log.roll(Optional.of(1L))
- } catch {
- case _: KafkaStorageException => // ignore
- }
-
- // check that the recovery point isn't incremented
- assertEquals(0L, log.recoveryPoint)
- }
-
- @Test
- def testDeletableSegmentsFilter(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- for (_ <- 0 to 8) {
- val records = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes),
- ))
- log.appendAsLeader(records, 0)
- log.roll()
- }
- log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-
- assertEquals(10, log.logSegments.size())
-
- {
- val deletable = log.deletableSegments(
- (segment: LogSegment, _: Optional[LogSegment]) => segment.baseOffset
<= 5)
- val expected = log.nonActiveLogSegmentsFrom(0L).stream().filter(segment
=> segment.baseOffset <= 5).toList
- assertEquals(6, expected.size)
- assertEquals(expected, deletable)
- }
-
- {
- val deletable = log.deletableSegments((_: LogSegment, _:
Optional[LogSegment]) => true)
- val expected = log.nonActiveLogSegmentsFrom(0L).stream().toList
- assertEquals(9, expected.size)
- assertEquals(expected, deletable)
- }
-
- {
- val records = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes),
- ))
- log.appendAsLeader(records, 0)
- log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
- val deletable = log.deletableSegments((_: LogSegment, _:
Optional[LogSegment]) => true)
- val expected = log.logSegments.stream().toList
- assertEquals(10, expected.size)
- assertEquals(expected, deletable)
- }
- }
-
- @Test
- def testDeletableSegmentsIteration(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- for (_ <- 0 to 8) {
- val records = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes),
- ))
- log.appendAsLeader(records, 0)
- log.roll()
- }
- log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-
- assertEquals(10, log.logSegments.size())
-
- var offset = 0
- val deletableSegments = log.deletableSegments(
- (segment: LogSegment, nextSegmentOpt: Optional[LogSegment]) => {
- assertEquals(offset, segment.baseOffset)
- val logSegments = new LogSegments(log.topicPartition)
- log.logSegments.forEach(segment => logSegments.add(segment))
- val floorSegmentOpt = logSegments.floorSegment(offset)
- assertTrue(floorSegmentOpt.isPresent)
- assertEquals(floorSegmentOpt.get, segment)
- if (offset == log.logEndOffset) {
- assertFalse(nextSegmentOpt.isPresent)
- } else {
- assertTrue(nextSegmentOpt.isPresent)
- val higherSegmentOpt = logSegments.higherSegment(segment.baseOffset)
- assertTrue(higherSegmentOpt.isPresent)
- assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
- assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
- }
- offset += 1
- true
- })
- assertEquals(10L, log.logSegments.size())
- assertEquals(log.nonActiveLogSegmentsFrom(0L).stream.toList,
deletableSegments)
- }
-
- @Test
- def testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage():
Unit = {
- val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1,
segmentIndexBytes = 12,
- retentionMs = 3, localRetentionMs = 1, fileDeleteDelayMs = 0,
remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- // Append 1 message to the active segment
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes))),
- 0)
- // Update the highWatermark so that these segments will be eligible for
deletion.
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(1, log.logSegments.size)
- assertEquals(0, log.activeSegment.baseOffset())
-
- mockTime.sleep(2)
- // It should have rolled the active segment as they are eligible for
deletion
- assertEquals(0, log.deleteOldSegments())
- assertEquals(2, log.logSegments.size)
- log.logSegments.asScala.zipWithIndex.foreach {
- case (segment, idx) => assertEquals(idx, segment.baseOffset)
- }
-
- // Once rolled, the segment should be uploaded to remote storage and
eligible for deletion
- log.updateHighestOffsetInRemoteStorage(1)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(1, log.logSegments.size)
- assertEquals(1, log.logSegments.asScala.head.baseOffset())
- assertEquals(1, log.localLogStartOffset())
- assertEquals(1, log.logEndOffset)
- assertEquals(0, log.logStartOffset)
- }
-
- @Test
- def
testSegmentDeletionEnabledBeforeUploadToRemoteTierWhenLogStartOffsetMovedAhead():
Unit = {
- val logConfig = LogTestUtils.createLogConfig(retentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
- val pid = 1L
- val epoch = 0.toShort
-
- assertTrue(log.isEmpty)
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 0), 0)
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 1), 0)
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 2), 0)
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("d".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 3), 1)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("e".getBytes)),
- producerId = pid, producerEpoch = epoch, sequence = 4), 2)
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(2, log.logSegments.size)
-
- // No segments are uploaded to remote storage, none of the local log
segments should be eligible for deletion
- log.updateHighestOffsetInRemoteStorage(-1L)
- assertEquals(0, log.deleteOldSegments())
- mockTime.sleep(1)
- assertEquals(2, log.logSegments.size)
- assertFalse(log.isEmpty)
-
- // Update the log-start-offset from 0 to 3, then the base segment should
not be eligible for deletion
- log.updateLogStartOffsetFromRemoteTier(3L)
- assertEquals(0, log.deleteOldSegments())
- mockTime.sleep(1)
- assertEquals(2, log.logSegments.size)
- assertFalse(log.isEmpty)
-
- // Update the log-start-offset from 3 to 4, then the base segment should
be eligible for deletion now even
- // if it is not uploaded to remote storage
- log.updateLogStartOffsetFromRemoteTier(4L)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- mockTime.sleep(1)
- assertEquals(1, log.logSegments.size)
- assertFalse(log.isEmpty)
-
- log.updateLogStartOffsetFromRemoteTier(5L)
- assertEquals(0, log.deleteOldSegments())
- mockTime.sleep(1)
- assertEquals(1, log.logSegments.size)
- assertTrue(log.isEmpty)
- }
-
- @Test
- def
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes():
Unit = {
- def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
- val segmentBytes = createRecords.sizeInBytes()
- val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, retentionBytes = 1,
- fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
- val log = createLog(logDir, retentionBytesConfig,
remoteStorageSystemEnable = true)
-
- // Given 6 segments of 1 message each
- for (_ <- 0 until 6) {
- log.appendAsLeader(createRecords, 0)
- }
- assertEquals(6, log.logSegments.size)
-
- log.updateHighWatermark(log.logEndOffset)
- // simulate calls to upload 2 segments to remote storage
- log.updateHighestOffsetInRemoteStorage(1)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(4, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(2, log.localLogStartOffset())
- }
-
- @Test
- def
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs():
Unit = {
- def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
- val segmentBytes = createRecords.sizeInBytes()
- val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, retentionMs = 1000,
- fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
- val log = createLog(logDir, retentionBytesConfig,
remoteStorageSystemEnable = true)
-
- // Given 6 segments of 1 message each
- for (_ <- 0 until 6) {
- log.appendAsLeader(createRecords, 0)
- }
- assertEquals(6, log.logSegments.size)
-
- log.updateHighWatermark(log.logEndOffset)
- // simulate calls to upload 2 segments to remote storage
- log.updateHighestOffsetInRemoteStorage(1)
-
- mockTime.sleep(1001)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(4, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(2, log.localLogStartOffset())
- }
-
- @Test
- def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
- def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
- val segmentBytes = createRecords.sizeInBytes()
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes,
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
- fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- // Given 6 segments of 1 message each
- for (_ <- 0 until 6) {
- log.appendAsLeader(createRecords, 0)
- }
- assertEquals(6, log.logSegments.size)
-
- log.updateHighWatermark(log.logEndOffset)
-
- // Should not delete local log because highest remote storage offset is -1
(default value)
- assertEquals(0, log.deleteOldSegments())
- assertEquals(6, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(0, log.localLogStartOffset())
-
- // simulate calls to upload 2 segments to remote storage
- log.updateHighestOffsetInRemoteStorage(1)
-
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(4, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(2, log.localLogStartOffset())
-
- // add remoteCopyDisabled = true
- val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
- fileDeleteDelayMs = 0, remoteLogStorageEnable = true,
remoteLogCopyDisable = true)
- log.updateConfig(copyDisabledLogConfig)
-
- // No local logs will be deleted even though local retention bytes is 1
because we'll adopt retention.ms/bytes
- // when remote.log.copy.disable = true
- assertEquals(0, log.deleteOldSegments())
- assertEquals(4, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(2, log.localLogStartOffset())
-
- // simulate the remote logs are all deleted due to retention policy
- log.updateLogStartOffsetFromRemoteTier(2)
- assertEquals(4, log.logSegments.size())
- assertEquals(2, log.logStartOffset)
- assertEquals(2, log.localLogStartOffset())
-
- // produce 3 more segments
- for (_ <- 0 until 3) {
- log.appendAsLeader(createRecords, 0)
- }
- assertEquals(7, log.logSegments.size)
- log.updateHighWatermark(log.logEndOffset)
-
- // try to delete local logs again, 2 segments will be deleted this time
because we'll adopt retention.ms/bytes (retention.bytes = 5)
- // when remote.log.copy.disable = true
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(5, log.logSegments.size())
- assertEquals(4, log.logStartOffset)
- assertEquals(4, log.localLogStartOffset())
-
- // add localRetentionMs = 1, retentionMs = 1000
- val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, localRetentionMs = 1, retentionMs = 1000,
- fileDeleteDelayMs = 0, remoteLogStorageEnable = true,
remoteLogCopyDisable = true)
- log.updateConfig(retentionMsConfig)
-
- // Should not delete any logs because no local logs expired using
retention.ms = 1000
- mockTime.sleep(10)
- assertEquals(0, log.deleteOldSegments())
- assertEquals(5, log.logSegments.size())
- assertEquals(4, log.logStartOffset)
- assertEquals(4, log.localLogStartOffset())
-
- // Should delete all logs because all of them are expired based on
retentionMs = 1000
- mockTime.sleep(1000)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(1, log.logSegments.size())
- assertEquals(9, log.logStartOffset)
- assertEquals(9, log.localLogStartOffset())
- }
-
- @Test
- def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- var offset = 0L
- for(_ <- 0 until 50) {
- val records = TestUtils.singletonRecords("test".getBytes())
- val info = log.appendAsLeader(records, 0)
- offset = info.lastOffset
- if (offset != 0 && offset % 10 == 0)
- log.roll()
- }
- assertEquals(5, log.logSegments.size)
- log.updateHighWatermark(log.logEndOffset)
- // simulate calls to upload 3 segments to remote storage
- log.updateHighestOffsetInRemoteStorage(30)
-
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(2, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(31, log.localLogStartOffset())
- }
-
- @Test
- def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- var offset = 0L
- for(_ <- 0 until 50) {
- val records = TestUtils.singletonRecords("test".getBytes())
- val info = log.appendAsLeader(records, 0)
- offset = info.lastOffset
- if (offset != 0 && offset % 10 == 0)
- log.roll()
- }
- assertEquals(5, log.logSegments.size)
- log.updateHighWatermark(log.logEndOffset)
- // simulate calls to upload 3 segments to remote storage
- log.updateHighestOffsetInRemoteStorage(30)
-
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(2, log.logSegments.size())
- assertEquals(0, log.logStartOffset)
- assertEquals(31, log.localLogStartOffset())
-
- log.updateLogStartOffsetFromRemoteTier(15)
- assertEquals(15, log.logStartOffset)
-
- // case-1: offset is higher than the local-log-start-offset.
- // log-start-offset < local-log-start-offset < offset-to-be-converted <
log-end-offset
- assertEquals(new LogOffsetMetadata(35, 31, 288),
log.maybeConvertToOffsetMetadata(35))
- // case-2: offset is less than the local-log-start-offset
- // log-start-offset < offset-to-be-converted < local-log-start-offset <
log-end-offset
- assertEquals(new LogOffsetMetadata(29, -1L, -1),
log.maybeConvertToOffsetMetadata(29))
- // case-3: offset is higher than the log-end-offset
- // log-start-offset < local-log-start-offset < log-end-offset <
offset-to-be-converted
- assertEquals(new LogOffsetMetadata(log.logEndOffset + 1, -1L, -1),
log.maybeConvertToOffsetMetadata(log.logEndOffset + 1))
- // case-4: offset is less than the log-start-offset
- // offset-to-be-converted < log-start-offset < local-log-start-offset <
log-end-offset
- assertEquals(new LogOffsetMetadata(14, -1L, -1),
log.maybeConvertToOffsetMetadata(14))
- }
-
- @Test
- def testGetFirstBatchTimestampForSegments(): Unit = {
- val log = createLog(logDir, LogTestUtils.createLogConfig())
-
- val segments: util.List[LogSegment] = new util.ArrayList[LogSegment]()
- val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM)
- val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM)
- segments.add(seg1)
- segments.add(seg2)
- assertEquals(Seq(Long.MaxValue, Long.MaxValue),
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
-
- seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new
SimpleRecord(1000L, "one".getBytes)))
- seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new
SimpleRecord(2000L, "two".getBytes)))
- assertEquals(Seq(1000L, 2000L),
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
-
- seg1.close()
- seg2.close()
- }
-
- @Test
- def testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty(): Unit =
{
- val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
- val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(),
Optional.empty)
- assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()),
result)
- }
-
-
- private def createLog(dir: File,
- config: LogConfig,
- brokerTopicStats: BrokerTopicStats = brokerTopicStats,
- logStartOffset: Long = 0L,
- recoveryPoint: Long = 0L,
- scheduler: Scheduler = mockTime.scheduler,
- time: Time = mockTime,
- maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
- producerStateManagerConfig: ProducerStateManagerConfig
= producerStateManagerConfig,
- producerIdExpirationCheckIntervalMs: Int =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
- lastShutdownClean: Boolean = true,
- topicId: Option[Uuid] = None,
- remoteStorageSystemEnable: Boolean = false,
- remoteLogManager: Option[RemoteLogManager] = None,
- logOffsetsListener: LogOffsetsListener =
LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
- val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler,
time, logStartOffset, recoveryPoint,
- maxTransactionTimeoutMs, producerStateManagerConfig,
producerIdExpirationCheckIntervalMs,
- lastShutdownClean, topicId, new ConcurrentHashMap[String, Integer],
- remoteStorageSystemEnable, remoteLogManager, logOffsetsListener)
- logsToClose = logsToClose :+ log
- log
- }
-
- case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
-
- @Test
- def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
- // Producer epoch gets incremented (coordinator fail over, completed
transaction, etc.)
- // and client has stale cached epoch. Fix prevents fatal
InvalidTxnStateException.
-
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
-
- val producerId = 123L
- val oldEpoch = 5.toShort
- val newEpoch = 6.toShort
-
- // Step 1: Simulate a scenario where producer epoch was incremented to
fence the producer
- val previousRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, newEpoch, 0,
- new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
- )
- val previousGuard = log.maybeStartTransactionVerification(producerId, 0,
newEpoch, false) // TV1 = supportsEpochBump = false
- log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, previousGuard,
- TransactionVersion.TV_1.featureLevel())
-
- // Complete the transaction normally (commits do update producer state
with current epoch)
- val commitMarker = MemoryRecords.withEndTransactionMarker(
- producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT,
0)
- )
- log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching, VerificationGuard.SENTINEL,
- TransactionVersion.TV_1.featureLevel())
-
- // Step 2: TV1 client tries to write with stale cached epoch (before
learning about epoch increment)
- val staleEpochRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, oldEpoch, 0,
- new SimpleRecord("stale-epoch-key".getBytes,
"stale-epoch-value".getBytes)
- )
-
- // Step 3: Verify our fix - should get InvalidProducerEpochException
(recoverable), not InvalidTxnStateException (fatal)
- val exception = assertThrows(classOf[InvalidProducerEpochException], () =>
{
- val staleGuard = log.maybeStartTransactionVerification(producerId, 0,
oldEpoch, false)
- log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, staleGuard,
- TransactionVersion.TV_1.featureLevel())
- })
-
- // Verify the error message indicates epoch mismatch
- assertTrue(exception.getMessage.contains("smaller than the last seen
epoch"))
- assertTrue(exception.getMessage.contains(s"$oldEpoch"))
- assertTrue(exception.getMessage.contains(s"$newEpoch"))
- }
-
- @Test
- def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
- // Check producer epoch FIRST - if stale, return recoverable error before
verification checks.
-
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
-
- val producerId = 456L
- val originalEpoch = 3.toShort
- val bumpedEpoch = 4.toShort
-
- // Step 1: Start transaction with epoch 3 (before timeout)
- val initialRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, originalEpoch, 0,
- new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
- )
- val initialGuard = log.maybeStartTransactionVerification(producerId, 0,
originalEpoch, true) // TV2 = supportsEpochBump = true
- log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, initialGuard,
- TransactionVersion.TV_2.featureLevel())
-
- // Step 2: Coordinator times out and aborts transaction
- // TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort
marker with epoch 4
- val abortMarker = MemoryRecords.withEndTransactionMarker(
- producerId, bumpedEpoch, new
EndTransactionMarker(ControlRecordType.ABORT, 0)
- )
- log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching, VerificationGuard.SENTINEL,
- TransactionVersion.TV_2.featureLevel())
-
- // Step 3: TV2 transactional producer tries to append with stale epoch
(timeout recovery scenario)
- val staleEpochRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, originalEpoch, 0,
- new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
- )
-
- // Step 4: Verify our fix works for TV2 - should get
InvalidProducerEpochException (recoverable), not InvalidTxnStateException
(fatal)
- val exception = assertThrows(classOf[InvalidProducerEpochException], () =>
{
- val staleGuard = log.maybeStartTransactionVerification(producerId, 0,
originalEpoch, true) // TV2 = supportsEpochBump = true
- log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, staleGuard,
- TransactionVersion.TV_2.featureLevel())
- })
-
- // Verify the error message indicates epoch mismatch (3 < 4)
- assertTrue(exception.getMessage.contains("smaller than the last seen
epoch"))
- assertTrue(exception.getMessage.contains(s"$originalEpoch"))
- assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
- }
-}
-
-object UnifiedLogTest {
- def allRecords(log: UnifiedLog): List[Record] = {
- val recordsFound = ListBuffer[Record]()
- for (logSegment <- log.logSegments.asScala) {
- for (batch <- logSegment.log.batches.asScala) {
- recordsFound ++= batch.iterator().asScala
- }
- }
- recordsFound.toList
- }
-
- def verifyRecordsInLog(log: UnifiedLog, expectedRecords: List[Record]): Unit
= {
- assertEquals(expectedRecords, allRecords(log))
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index eb6957f13ac..8b0d8d9abe0 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -96,6 +96,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.security.DigestException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -115,6 +116,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -132,6 +134,7 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -1969,21 +1972,21 @@ public class UnifiedLogTest {
long pid = 1L;
short epoch = 0;
- int[] seq = {0};
+ AtomicInteger seq = new AtomicInteger(0);
// Pad the beginning of the log.
for (int i = 0; i <= 5; i++) {
MemoryRecords record = LogTestUtils.records(
List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
- pid, epoch, seq[0], 0L);
+ pid, epoch, seq.get(), 0L);
log.appendAsLeader(record, 0);
- seq[0]++;
+ seq.incrementAndGet();
}
// Append an entry with multiple log records.
Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(
- new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
- new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
- new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
- ), pid, epoch, seq[0], 0L);
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq.get()).getBytes(), ("value-" + seq.get()).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq.get()).getBytes(), ("value-" + seq.get()).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq.get()).getBytes(), ("value-" + seq.get()).getBytes())
+ ), pid, epoch, seq.get(), 0L);
LogAppendInfo multiEntryAppendInfo =
log.appendAsLeader(createRecords.get(), 0);
assertEquals(3, multiEntryAppendInfo.lastOffset() -
multiEntryAppendInfo.firstOffset() + 1,
"should have appended 3 entries");
@@ -1995,13 +1998,13 @@ public class UnifiedLogTest {
assertEquals(multiEntryAppendInfo.lastOffset(),
dupMultiEntryAppendInfo.lastOffset(),
"Somehow appended a duplicate entry with multiple log records
to the tail");
- seq[0] += 3;
+ seq.addAndGet(3);
// Append a partial duplicate of the tail. This is not allowed.
MemoryRecords partialDup = LogTestUtils.records(List.of(
- new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
- new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
- ), pid, epoch, seq[0] - 2, 0L);
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq.get()).getBytes(), ("value-" + seq.get()).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq.get()).getBytes(), ("value-" + seq.get()).getBytes())
+ ), pid, epoch, seq.get() - 2, 0L);
assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(partialDup, 0),
() -> "Should have received an OutOfOrderSequenceException
since we attempted to append a duplicate of a records in the middle of the
log.");
@@ -2022,7 +2025,7 @@ public class UnifiedLogTest {
// Append a duplicate entry with a single records at the tail of the
log. This should return the appendInfo of the original entry.
Supplier<MemoryRecords> createRecordsWithDuplicate = () ->
LogTestUtils.records(
List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
- pid, epoch, seq[0], 0L);
+ pid, epoch, seq.get(), 0L);
LogAppendInfo origAppendInfo =
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
LogAppendInfo newAppendInfo =
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
assertEquals(origAppendInfo.firstOffset(),
newAppendInfo.firstOffset(), "Inserted a duplicate records into the log");
@@ -4999,15 +5002,15 @@ public class UnifiedLogTest {
}
private BiConsumer<Long, Integer> appendTransactionalToBuffer(ByteBuffer
buffer, long producerId, short producerEpoch, int leaderEpoch) {
- int[] sequence = {0};
+ AtomicInteger sequence = new AtomicInteger(0);
return (offset, numRecords) -> {
- int baseSequence = sequence[0];
+ int baseSequence = sequence.get();
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME,
offset, mockTime.milliseconds(), producerId, producerEpoch,
baseSequence, true, leaderEpoch);
for (int seq = baseSequence; seq < baseSequence + numRecords;
seq++) {
builder.append(new
SimpleRecord(String.valueOf(seq).getBytes()));
}
- sequence[0] += numRecords;
+ sequence.addAndGet(numRecords);
builder.close();
};
}
@@ -5494,4 +5497,522 @@ public class UnifiedLogTest {
assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
}
}
+
+ @Test
+ public void
testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog log = spy(createLog(logDir, logConfig));
+
+ doThrow(new KafkaStorageException("Injected
exception")).when(log).flushProducerStateSnapshot(any(Path.class));
+
+ log.appendAsLeader(singletonRecords("a".getBytes()), 0);
+ assertThrows(KafkaStorageException.class, () ->
log.roll(Optional.of(1L)));
+
+ // check that the recovery point isn't incremented
+ assertEquals(0L, log.recoveryPoint());
+ }
+
+ @Test
+ public void testDeletableSegmentsFilter() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ for (int i = 0; i <= 8; i++) {
+ MemoryRecords records = records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+ log.appendAsLeader(records, 0);
+ log.roll();
+ }
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertEquals(10, log.logSegments().size());
+
+ List<LogSegment> deletable = log.deletableSegments(
+ (segment, next) -> segment.baseOffset() <= 5);
+ List<LogSegment> expected = log.nonActiveLogSegmentsFrom(0L).stream()
+ .filter(segment -> segment.baseOffset() <= 5)
+ .toList();
+ assertEquals(6, expected.size());
+ assertEquals(expected, deletable);
+
+ List<LogSegment> deletable1 = log.deletableSegments((segment, next) ->
true);
+ List<LogSegment> expected1 = new
ArrayList<>(log.nonActiveLogSegmentsFrom(0L));
+ assertEquals(9, expected1.size());
+ assertEquals(expected1, deletable1);
+
+ MemoryRecords records = records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+ log.appendAsLeader(records, 0);
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ List<LogSegment> deletable2 = log.deletableSegments((segment, next) ->
true);
+ List<LogSegment> expected2 = new ArrayList<>(log.logSegments());
+ assertEquals(10, expected2.size());
+ assertEquals(expected2, deletable2);
+ }
+
+ @Test
+ public void testDeletableSegmentsIteration() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ for (int i = 0; i <= 8; i++) {
+ MemoryRecords records = records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+ log.appendAsLeader(records, 0);
+ log.roll();
+ }
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertEquals(10, log.logSegments().size());
+
+ AtomicInteger offset = new AtomicInteger(0);
+ List<LogSegment> deletableSegments = log.deletableSegments((segment,
nextSegmentOpt) -> {
+ assertEquals(offset.get(), segment.baseOffset());
+ LogSegments logSegments = new LogSegments(log.topicPartition());
+ log.logSegments().forEach(logSegments::add);
+ Optional<LogSegment> floorSegmentOpt =
logSegments.floorSegment(offset.get());
+ assertTrue(floorSegmentOpt.isPresent());
+ assertEquals(floorSegmentOpt.get(), segment);
+ if (offset.get() == log.logEndOffset()) {
+ assertFalse(nextSegmentOpt.isPresent());
+ } else {
+ assertTrue(nextSegmentOpt.isPresent());
+ Optional<LogSegment> higherSegmentOpt =
logSegments.higherSegment(segment.baseOffset());
+ assertTrue(higherSegmentOpt.isPresent());
+ assertEquals(segment.baseOffset() + 1,
higherSegmentOpt.get().baseOffset());
+ assertEquals(higherSegmentOpt.get(), nextSegmentOpt.get());
+ }
+ offset.addAndGet(1);
+ return true;
+ });
+ assertEquals(10L, log.logSegments().size());
+ assertEquals(new ArrayList<>(log.nonActiveLogSegmentsFrom(0L)),
deletableSegments);
+ }
+
+ @Test
+ public void
testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .indexIntervalBytes(1)
+ .segmentIndexBytes(12)
+ .retentionMs(3)
+ .localRetentionMs(1)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ // Append 1 message to the active segment
+ log.appendAsLeader(records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+ // Update the highWatermark so that these segments will be eligible
for deletion.
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(1, log.logSegments().size());
+ assertEquals(0, log.activeSegment().baseOffset());
+
+ mockTime.sleep(2);
+ // It should have rolled the active segment as they are eligible for
deletion
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(2, log.logSegments().size());
+ AtomicInteger idx = new AtomicInteger(0);
+ log.logSegments().forEach(segment -> assertEquals(idx.getAndAdd(1),
segment.baseOffset()));
+
+ // Once rolled, the segment should be uploaded to remote storage and
eligible for deletion
+ log.updateHighestOffsetInRemoteStorage(1);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.logSegments().size());
+ assertEquals(1, log.logSegments().iterator().next().baseOffset());
+ assertEquals(1, log.localLogStartOffset());
+ assertEquals(1, log.logEndOffset());
+ assertEquals(0, log.logStartOffset());
+ }
+
+ @Test
+ public void
testSegmentDeletionEnabledBeforeUploadToRemoteTierWhenLogStartOffsetMovedAhead()
throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .retentionBytes(1)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+ long pid = 1L;
+ short epoch = 0;
+
+ assertTrue(log.isEmpty());
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes())), pid, epoch, 0, 0L), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("b".getBytes())), pid, epoch, 1, 0L), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("c".getBytes())), pid, epoch, 2, 0L), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("d".getBytes())), pid, epoch, 3, 0L), 1);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("e".getBytes())), pid, epoch, 4, 0L), 2);
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(2, log.logSegments().size());
+
+ // No segments are uploaded to remote storage, none of the local log
segments should be eligible for deletion
+ log.updateHighestOffsetInRemoteStorage(-1L);
+ assertEquals(0, log.deleteOldSegments());
+ mockTime.sleep(1);
+ assertEquals(2, log.logSegments().size());
+ assertFalse(log.isEmpty());
+
+ // Update the log-start-offset from 0 to 3, then the base segment
should not be eligible for deletion
+ log.updateLogStartOffsetFromRemoteTier(3L);
+ assertEquals(0, log.deleteOldSegments());
+ mockTime.sleep(1);
+ assertEquals(2, log.logSegments().size());
+ assertFalse(log.isEmpty());
+
+ // Update the log-start-offset from 3 to 4, then the base segment
should be eligible for deletion now even
+ // if it is not uploaded to remote storage
+ log.updateLogStartOffsetFromRemoteTier(4L);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ mockTime.sleep(1);
+ assertEquals(1, log.logSegments().size());
+ assertFalse(log.isEmpty());
+
+ log.updateLogStartOffsetFromRemoteTier(5L);
+ assertEquals(0, log.deleteOldSegments());
+ mockTime.sleep(1);
+ assertEquals(1, log.logSegments().size());
+ assertTrue(log.isEmpty());
+ }
+
+ @Test
+ public void
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes()
throws IOException {
+ MemoryRecords createRecords = records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+ int segmentBytes = createRecords.sizeInBytes();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentBytes)
+ .retentionBytes(1)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ // Given 6 segments of 1 message each
+ for (int i = 0; i < 6; i++) {
+ log.appendAsLeader(records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+ }
+ assertEquals(6, log.logSegments().size());
+
+ log.updateHighWatermark(log.logEndOffset());
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(4, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(2, log.localLogStartOffset());
+ }
+
+ @Test
+ public void
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs()
throws IOException {
+ MemoryRecords createRecords = records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+ int segmentBytes = createRecords.sizeInBytes();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentBytes)
+ .retentionMs(1000)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ // Given 6 segments of 1 message each
+ for (int i = 0; i < 6; i++) {
+ log.appendAsLeader(records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+ }
+ assertEquals(6, log.logSegments().size());
+
+ log.updateHighWatermark(log.logEndOffset());
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1);
+
+ mockTime.sleep(1001);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(4, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(2, log.localLogStartOffset());
+ }
+
+ @Test
+ public void testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled()
throws IOException {
+ MemoryRecords createRecords = records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+ int segmentBytes = createRecords.sizeInBytes();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentBytes)
+ .localRetentionBytes(1)
+ .retentionBytes((long) segmentBytes * 5)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ // Given 6 segments of 1 message each
+ for (int i = 0; i < 6; i++) {
+ log.appendAsLeader(records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+ }
+ assertEquals(6, log.logSegments().size());
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ // Should not delete local log because highest remote storage offset
is -1 (default value)
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(6, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(0, log.localLogStartOffset());
+
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1);
+
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(4, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(2, log.localLogStartOffset());
+
+ // add remoteCopyDisabled = true
+ LogConfig copyDisabledLogConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentBytes)
+ .localRetentionBytes(1)
+ .retentionBytes((long) segmentBytes * 5)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .remoteLogCopyDisable(true)
+ .build();
+ log.updateConfig(copyDisabledLogConfig);
+
+ // No local logs will be deleted even though local retention bytes is
1 because we'll adopt retention.ms/bytes
+ // when remote.log.copy.disable = true
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(4, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(2, log.localLogStartOffset());
+
+ // simulate the remote logs are all deleted due to retention policy
+ log.updateLogStartOffsetFromRemoteTier(2);
+ assertEquals(4, log.logSegments().size());
+ assertEquals(2, log.logStartOffset());
+ assertEquals(2, log.localLogStartOffset());
+
+ // produce 3 more segments
+ for (int i = 0; i < 3; i++) {
+ log.appendAsLeader(records(List.of(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+ }
+ assertEquals(7, log.logSegments().size());
+ log.updateHighWatermark(log.logEndOffset());
+
+ // try to delete local logs again, 2 segments will be deleted this
time because we'll adopt retention.ms/bytes (retention.bytes = 5)
+ // when remote.log.copy.disable = true
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(5, log.logSegments().size());
+ assertEquals(4, log.logStartOffset());
+ assertEquals(4, log.localLogStartOffset());
+
+ // add localRetentionMs = 1, retentionMs = 1000
+ LogConfig retentionMsConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentBytes)
+ .localRetentionMs(1)
+ .retentionMs(1000)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .remoteLogCopyDisable(true)
+ .build();
+ log.updateConfig(retentionMsConfig);
+
+ // Should not delete any logs because no local logs expired using
retention.ms = 1000
+ mockTime.sleep(10);
+ assertEquals(0, log.deleteOldSegments());
+ assertEquals(5, log.logSegments().size());
+ assertEquals(4, log.logStartOffset());
+ assertEquals(4, log.localLogStartOffset());
+
+ // Should delete all logs because all of them are expired based on
retentionMs = 1000
+ mockTime.sleep(1000);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.logSegments().size());
+ assertEquals(9, log.logStartOffset());
+ assertEquals(9, log.localLogStartOffset());
+ }
+
+ @Test
+ public void testIncrementLocalLogStartOffsetAfterLocalLogDeletion() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .localRetentionBytes(1)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ long offset;
+ for (int i = 0; i < 50; i++) {
+ MemoryRecords records = singletonRecords("test".getBytes());
+ LogAppendInfo info = log.appendAsLeader(records, 0);
+ offset = info.lastOffset();
+ if (offset != 0 && offset % 10 == 0)
+ log.roll();
+ }
+ assertEquals(5, log.logSegments().size());
+ log.updateHighWatermark(log.logEndOffset());
+ // simulate calls to upload 3 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(30);
+
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(2, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(31, log.localLogStartOffset());
+ }
+
+ @Test
+ public void testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError()
throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .localRetentionBytes(1)
+ .fileDeleteDelayMs(0)
+ .remoteLogStorageEnable(true)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ long offset;
+ for (int i = 0; i < 50; i++) {
+ MemoryRecords records = singletonRecords("test".getBytes());
+ LogAppendInfo info = log.appendAsLeader(records, 0);
+ offset = info.lastOffset();
+ if (offset != 0 && offset % 10 == 0)
+ log.roll();
+ }
+ assertEquals(5, log.logSegments().size());
+ log.updateHighWatermark(log.logEndOffset());
+ // simulate calls to upload 3 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(30);
+
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(2, log.logSegments().size());
+ assertEquals(0, log.logStartOffset());
+ assertEquals(31, log.localLogStartOffset());
+
+ log.updateLogStartOffsetFromRemoteTier(15);
+ assertEquals(15, log.logStartOffset());
+
+ // case-1: offset is higher than the local-log-start-offset.
+ // log-start-offset < local-log-start-offset < offset-to-be-converted
< log-end-offset
+ assertEquals(new LogOffsetMetadata(35, 31, 288),
log.maybeConvertToOffsetMetadata(35));
+ // case-2: offset is less than the local-log-start-offset
+ // log-start-offset < offset-to-be-converted < local-log-start-offset
< log-end-offset
+ assertEquals(new LogOffsetMetadata(29, -1L, -1),
log.maybeConvertToOffsetMetadata(29));
+ // case-3: offset is higher than the log-end-offset
+ // log-start-offset < local-log-start-offset < log-end-offset <
offset-to-be-converted
+ assertEquals(new LogOffsetMetadata(log.logEndOffset() + 1, -1L, -1),
log.maybeConvertToOffsetMetadata(log.logEndOffset() + 1));
+ // case-4: offset is less than the log-start-offset
+ // offset-to-be-converted < log-start-offset < local-log-start-offset
< log-end-offset
+ assertEquals(new LogOffsetMetadata(14, -1L, -1),
log.maybeConvertToOffsetMetadata(14));
+ }
+
+ @Test
+ public void testGetFirstBatchTimestampForSegments() throws IOException {
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+
+ List<LogSegment> segments = new ArrayList<>();
+ LogSegment seg1 = LogTestUtils.createSegment(1, logDir, 10, mockTime);
+ LogSegment seg2 = LogTestUtils.createSegment(2, logDir, 10, mockTime);
+ segments.add(seg1);
+ segments.add(seg2);
+
+ List<Long> timestamps = new
ArrayList<>(log.getFirstBatchTimestampForSegments(segments));
+ assertEquals(List.of(Long.MAX_VALUE, Long.MAX_VALUE), timestamps);
+
+ seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new
SimpleRecord(1000L, "one".getBytes())));
+ seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new
SimpleRecord(2000L, "two".getBytes())));
+
+ timestamps = new
ArrayList<>(log.getFirstBatchTimestampForSegments(segments));
+ assertEquals(List.of(1000L, 2000L), timestamps);
+
+ seg1.close();
+ seg2.close();
+ }
+
+ @Test
+ public void
testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+ OffsetResultHolder result =
log.fetchOffsetByTimestamp(mockTime.milliseconds(), Optional.empty());
+ assertEquals(new OffsetResultHolder(Optional.empty(),
Optional.empty()), result);
+ }
+
+ @Test
+ public void testStaleProducerEpochReturnsRecoverableErrorForTV1Clients()
throws IOException {
+ // Producer epoch gets incremented (coordinator fail over, completed
transaction, etc.)
+ // and client has stale cached epoch. Fix prevents fatal
InvalidTxnStateException.
+
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, true);
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+ long producerId = 123L;
+ short oldEpoch = 5;
+ short newEpoch = 6;
+
+ // Step 1: Simulate a scenario where producer epoch was incremented to
fence the producer
+ MemoryRecords previousRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, newEpoch, 0,
+ new SimpleRecord("previous-key".getBytes(),
"previous-value".getBytes()));
+ VerificationGuard previousGuard =
log.maybeStartTransactionVerification(producerId, 0, newEpoch, false); // TV1 =
supportsEpochBump = false
+ log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), previousGuard,
+ TransactionVersion.TV_1.featureLevel());
+
+ // Complete the transaction normally (commits do update producer state
with current epoch)
+ MemoryRecords commitMarker = MemoryRecords.withEndTransactionMarker(
+ producerId, newEpoch, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+ log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
+ TransactionVersion.TV_1.featureLevel());
+
+ // Step 2: TV1 client tries to write with stale cached epoch (before
learning about epoch increment)
+ MemoryRecords staleEpochRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, oldEpoch, 0,
+ new SimpleRecord("stale-epoch-key".getBytes(),
"stale-epoch-value".getBytes()));
+
+ // Step 3: Verify our fix - should get InvalidProducerEpochException
(recoverable), not InvalidTxnStateException (fatal)
+ InvalidProducerEpochException exception =
assertThrows(InvalidProducerEpochException.class, () -> {
+ VerificationGuard staleGuard =
log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false);
+ log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), staleGuard,
+ TransactionVersion.TV_1.featureLevel());
+ });
+
+ // Verify the error message indicates epoch mismatch
+ assertTrue(exception.getMessage().contains("smaller than the last seen
epoch"));
+ assertTrue(exception.getMessage().contains(String.valueOf(oldEpoch)));
+ assertTrue(exception.getMessage().contains(String.valueOf(newEpoch)));
+ }
+
+ @Test
+ public void testStaleProducerEpochReturnsRecoverableErrorForTV2Clients()
throws IOException {
+ // Check producer epoch FIRST - if stale, return recoverable error
before verification checks.
+
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, true);
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+ long producerId = 456L;
+ short originalEpoch = 3;
+ short bumpedEpoch = 4;
+
+ // Step 1: Start transaction with epoch 3 (before timeout)
+ MemoryRecords initialRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, originalEpoch, 0,
+ new SimpleRecord("ks-initial-key".getBytes(),
"ks-initial-value".getBytes()));
+ VerificationGuard initialGuard =
log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true); //
TV2 = supportsEpochBump = true
+ log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), initialGuard,
+ TransactionVersion.TV_2.featureLevel());
+
+ // Step 2: Coordinator times out and aborts transaction
+ // TV2 (KIP-890): Coordinator bumps epoch from 3 -> 4 and sends abort
marker with epoch 4
+ MemoryRecords abortMarker = MemoryRecords.withEndTransactionMarker(
+ producerId, bumpedEpoch, new
EndTransactionMarker(ControlRecordType.ABORT, 0));
+ log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
+ TransactionVersion.TV_2.featureLevel());
+
+ // Step 3: TV2 transactional producer tries to append with stale epoch
(timeout recovery scenario)
+ MemoryRecords staleEpochRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, originalEpoch, 0,
+ new SimpleRecord("ks-resume-key".getBytes(),
"ks-resume-value".getBytes()));
+
+ // Step 4: Verify our fix works for TV2 - should get
InvalidProducerEpochException (recoverable), not InvalidTxnStateException
(fatal)
+ InvalidProducerEpochException exception =
assertThrows(InvalidProducerEpochException.class, () -> {
+ VerificationGuard staleGuard =
log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true); //
TV2 = supportsEpochBump = true
+ log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), staleGuard,
+ TransactionVersion.TV_2.featureLevel());
+ });
+
+ // Verify the error message indicates epoch mismatch (3 < 4)
+ assertTrue(exception.getMessage().contains("smaller than the last seen
epoch"));
+
assertTrue(exception.getMessage().contains(String.valueOf(originalEpoch)));
+
assertTrue(exception.getMessage().contains(String.valueOf(bumpedEpoch)));
+ }
}