This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ceb36f98f1 KAFKA-19752 Move UnifiedLogTest to storage module (#21761)
6ceb36f98f1 is described below
commit 6ceb36f98f1af6868251c0d89f9aca1250bd0ccf
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Mar 20 01:12:39 2026 +0800
KAFKA-19752 Move UnifiedLogTest to storage module (#21761)
testFetchEarliestPendingUploadTimestampNoRemoteStorage ~
testTransactionIndexUpdated part.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 792 +-----------------
.../kafka/storage/internals/log/LogTestUtils.java | 81 ++
.../storage/internals/log/UnifiedLogTest.java | 907 ++++++++++++++++++++-
3 files changed, 991 insertions(+), 789 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index b01375e59ae..65a41b2579c 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -25,39 +25,32 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.record.internal.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.internal._
import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.{ListOffsetsRequest,
ListOffsetsResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
-import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
+import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory,
DelayedRemoteListOffsets}
-import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffsetException}
+import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{MockTime, Scheduler}
-import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
-import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot,
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason,
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
-import org.mockito.ArgumentMatchers
-import org.mockito.ArgumentMatchers.{any, anyLong}
-import org.mockito.Mockito.{doAnswer, doThrow, spy}
-import org.apache.kafka.raft.KRaftConfigs
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{doThrow, spy}
import java.io._
import java.nio.ByteBuffer
import java.nio.file.Files
import java.util
-import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
-import java.util.{Optional, Properties}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.Optional
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@@ -91,728 +84,6 @@ class UnifiedLogTest {
}
}
- private def createKafkaConfigWithRLM: KafkaConfig = {
- val props = new Properties()
- props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
- props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
- props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
- props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
- props.setProperty("listeners", "CONTROLLER://:9093")
- props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
- props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true")
- props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteStorageManager].getName)
-
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteLogMetadataManager].getName)
- // set log reader threads number to 2
- props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
- KafkaConfig.fromProps(props)
- }
-
- @Test
- def testFetchEarliestPendingUploadTimestampNoRemoteStorage(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1)
- val log = createLog(logDir, logConfig)
-
- // Test initial state before any records
- assertFetchOffsetBySpecialTimestamp(log, None, new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-
- // Append records
- val _ = prepareLogWithSequentialRecords(log, recordCount = 2)
-
- // Test state after records are appended
- assertFetchOffsetBySpecialTimestamp(log, None, new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
- }
-
- @Test
- def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = {
- val logStartOffset = 0
- val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
- val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
- val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
- val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
-
- doAnswer(ans => {
- val timestamp = ans.getArgument(1).asInstanceOf[Long]
- Optional.of(timestamp)
- .filter(_ == timestampAndEpochs.head.timestamp)
- .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L,
Optional.of(timestampAndEpochs.head.leaderEpoch)))
-
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
- anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
- // Offset 0 (first timestamp) is in remote storage and deleted locally.
Offset 1 (second timestamp) is in local storage.
- log.updateLocalLogStartOffset(1)
- log.updateHighestOffsetInRemoteStorage(0)
-
- // In the assertions below we test that offset 0 (first timestamp) is only
in remote and offset 1 (second timestamp) is in local storage.
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
- ListOffsetsRequest.LATEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
- }
-
- @Test
- def
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion(): Unit
= {
- val logStartOffset = 0
- val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
- val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
- val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
- val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
-
- // Offsets upto 1 are in remote storage
- doAnswer(ans => {
- val timestamp = ans.getArgument(1).asInstanceOf[Long]
- Optional.of(
- timestamp match {
- case x if x == firstTimestamp => new TimestampAndOffset(x, 0L,
Optional.of(firstLeaderEpoch))
- case x if x == secondTimestamp => new TimestampAndOffset(x, 1L,
Optional.of(secondLeaderEpoch))
- case _ => null
- }
- )
-
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
- anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
- // Offsets 0, 1 (first and second timestamps) are in remote storage and
not deleted locally.
- log.updateLocalLogStartOffset(0)
- log.updateHighestOffsetInRemoteStorage(1)
-
- // In the assertions below we test that offset 0 (first timestamp) and
offset 1 (second timestamp) are on both remote and local storage
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
- ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
- ListOffsetsRequest.LATEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(thirdLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
- }
-
- @Test
- def testFetchEarliestPendingUploadTimestampNoSegmentsUploaded(): Unit = {
- val logStartOffset = 0
- val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
- val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
- val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
- val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
-
- // No offsets are in remote storage
- doAnswer(_ => Optional.empty[TimestampAndOffset]())
-
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
- anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
- // Offsets 0, 1, 2 (first, second and third timestamps) are in local
storage only and not uploaded to remote storage.
- log.updateLocalLogStartOffset(0)
- log.updateHighestOffsetInRemoteStorage(-1)
-
- // In the assertions below we test that offset 0 (first timestamp), offset
1 (second timestamp) and offset 2 (third timestamp) are only on the local
storage.
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, Optional.of(-1)),
- ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
- ListOffsetsRequest.LATEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
- }
-
- @Test
- def testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote():
Unit = {
- val logStartOffset = 100
- val (remoteLogManager: RemoteLogManager, log: UnifiedLog,
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
- val (firstTimestamp, firstLeaderEpoch) =
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
- val (secondTimestamp, secondLeaderEpoch) =
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
- val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp,
timestampAndEpochs(2).leaderEpoch)
-
- // Offsets 100, 101, 102 (first, second and third timestamps) are in local
storage and not uploaded to remote storage.
- // Tiered storage copy was disabled and then enabled again, because of
which the remote log segments are deleted but
- // the highest offset in remote storage has become stale
- doAnswer(_ => Optional.empty[TimestampAndOffset]())
-
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
- anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
- log.updateLocalLogStartOffset(100)
- log.updateHighestOffsetInRemoteStorage(50)
-
- // In the assertions below we test that offset 100 (first timestamp),
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the
local storage.
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(firstTimestamp, 100L, Optional.of(firstLeaderEpoch))),
firstTimestamp)
- assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new
TimestampAndOffset(secondTimestamp, 101L, Optional.of(secondLeaderEpoch))),
secondTimestamp)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L,
Optional.empty()),
- ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L,
Optional.of(thirdLeaderEpoch)),
- ListOffsetsRequest.LATEST_TIMESTAMP)
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
- }
-
- private def prepare(logStartOffset: Int): (RemoteLogManager, UnifiedLog,
Seq[TimestampAndEpoch]) = {
- val config: KafkaConfig = createKafkaConfigWithRLM
- val purgatory = new
DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets",
config.brokerId)
- val remoteLogManager = spy(new
RemoteLogManager(config.remoteLogManagerConfig,
- 0,
- logDir.getAbsolutePath,
- "clusterId",
- mockTime,
- _ => Optional.empty[UnifiedLog](),
- (_, _) => {},
- brokerTopicStats,
- new Metrics(),
- Optional.empty))
- remoteLogManager.setDelayedOperationPurgatory(purgatory)
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1, remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, logStartOffset = logStartOffset,
remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
-
- // Verify earliest pending upload offset for empty log
- assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, logStartOffset,
Optional.empty()),
- ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-
- val timestampAndEpochs = prepareLogWithSequentialRecords(log, recordCount
= 3)
- (remoteLogManager, log, timestampAndEpochs)
- }
-
- /**
- * Test the Log truncate operations
- */
- @Test
- def testTruncateTo(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds)
- val setSize = createRecords.sizeInBytes
- val msgPerSeg = 10
- val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
-
- // create a log
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
- for (_ <- 1 to msgPerSeg)
- log.appendAsLeader(createRecords, 0)
-
- assertEquals(1, log.numberOfSegments, "There should be exactly 1
segments.")
- assertEquals(msgPerSeg, log.logEndOffset, "Log end offset should be equal
to number of messages")
-
- val lastOffset = log.logEndOffset
- val size = log.size
- log.truncateTo(log.logEndOffset) // keep the entire log
- assertEquals(lastOffset, log.logEndOffset, "Should not change offset")
- assertEquals(size, log.size, "Should not change log size")
- log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset
- assertEquals(lastOffset, log.logEndOffset, "Should not change offset but
should log error")
- assertEquals(size, log.size, "Should not change log size")
- log.truncateTo(msgPerSeg/2) // truncate somewhere in between
- assertEquals(log.logEndOffset, msgPerSeg/2, "Should change offset")
- assertTrue(log.size < size, "Should change log size")
- log.truncateTo(0) // truncate the entire log
- assertEquals(0, log.logEndOffset, "Should change offset")
- assertEquals(0, log.size, "Should change log size")
-
- for (_ <- 1 to msgPerSeg)
- log.appendAsLeader(createRecords, 0)
-
- assertEquals(log.logEndOffset, lastOffset, "Should be back to original
offset")
- assertEquals(log.size, size, "Should be back to original size")
- log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1),
Optional.empty)
- assertEquals(log.logEndOffset, lastOffset - (msgPerSeg - 1), "Should
change offset")
- assertEquals(log.size, 0, "Should change log size")
-
- for (_ <- 1 to msgPerSeg)
- log.appendAsLeader(createRecords, 0)
-
- assertTrue(log.logEndOffset > msgPerSeg, "Should be ahead of to original
offset")
- assertEquals(size, log.size, "log size should be same as before")
- log.truncateTo(0) // truncate before first start offset in the log
- assertEquals(0, log.logEndOffset, "Should change offset")
- assertEquals(log.size, 0, "Should change log size")
- }
-
- /**
- * Verify that when we truncate a log the index of the last segment is
resized to the max index size to allow more appends
- */
- @Test
- def testIndexResizingAtTruncation(): Unit = {
- val setSize = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds).sizeInBytes
- val msgPerSeg = 10
- val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize,
indexIntervalBytes = setSize - 1)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
- for (i<- 1 to msgPerSeg)
- log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds + i), 0)
- assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
- mockTime.sleep(msgPerSeg)
- for (i<- 1 to msgPerSeg)
- log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds + i), 0)
- assertEquals(2, log.numberOfSegments, "There should be exactly 2 segment.")
- val expectedEntries = msgPerSeg - 1
-
- assertEquals(expectedEntries,
log.logSegments.asScala.toList.head.offsetIndex.maxEntries,
- s"The index of the first segment should have $expectedEntries entries")
- assertEquals(expectedEntries,
log.logSegments.asScala.toList.head.timeIndex.maxEntries,
- s"The time index of the first segment should have $expectedEntries
entries")
-
- log.truncateTo(0)
- assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
- assertEquals(log.config.maxIndexSize/8,
log.logSegments.asScala.toList.head.offsetIndex.maxEntries,
- "The index of segment 1 should be resized to maxIndexSize")
- assertEquals(log.config.maxIndexSize/12,
log.logSegments.asScala.toList.head.timeIndex.maxEntries,
- "The time index of segment 1 should be resized to maxIndexSize")
-
- mockTime.sleep(msgPerSeg)
- for (i<- 1 to msgPerSeg)
- log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds + i), 0)
- assertEquals(1, log.numberOfSegments,
- "There should be exactly 1 segment.")
- }
-
- /**
- * Test that deleted files are deleted after the appropriate time.
- */
- @Test
- def testAsyncDelete(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000L)
- val asyncDeleteMs = 1000
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes =
10000,
- retentionMs = 999, fileDeleteDelayMs =
asyncDeleteMs)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 100)
- log.appendAsLeader(createRecords, 0)
-
- // files should be renamed
- val segments = log.logSegments.asScala.toArray
- val oldFiles = segments.map(_.log.file) ++ segments.map(_.offsetIndexFile)
-
- log.updateHighWatermark(log.logEndOffset)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
-
- assertEquals(1, log.numberOfSegments, "Only one segment should remain.")
-
assertTrue(segments.forall(_.log.file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
&&
-
segments.forall(_.offsetIndexFile.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)),
- "All log and index files should end in .deleted")
- assertTrue(segments.forall(_.log.file.exists) &&
segments.forall(_.offsetIndexFile.exists),
- "The .deleted files should still be there.")
- assertTrue(oldFiles.forall(!_.exists), "The original file should be gone.")
-
- // when enough time passes the files should be deleted
- val deletedFiles = segments.map(_.log.file) ++
segments.map(_.offsetIndexFile)
- mockTime.sleep(asyncDeleteMs + 1)
- assertTrue(deletedFiles.forall(!_.exists), "Files should all be gone.")
- }
-
- @Test
- def testAppendMessageWithNullPayload(): Unit = {
- val log = createLog(logDir, new LogConfig(new Properties))
- log.appendAsLeader(TestUtils.singletonRecords(value = null), 0)
- val head = LogTestUtils.readLog(log, 0,
4096).records.records.iterator.next()
- assertEquals(0, head.offset)
- assertFalse(head.hasValue, "Message payload should be null.")
- }
-
- @Test
- def testAppendWithOutOfOrderOffsetsThrowsException(): Unit = {
- val log = createLog(logDir, new LogConfig(new Properties))
-
- val epoch = 0
- val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
- val buffer = ByteBuffer.allocate(512)
- for (offset <- appendOffsets) {
- val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
Compression.NONE,
- TimestampType.LOG_APPEND_TIME,
offset, mockTime.milliseconds(),
- 1L, 0, 0, false, epoch)
- builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
- builder.close()
- }
- buffer.flip()
- val memoryRecords = MemoryRecords.readableRecords(buffer)
-
- assertThrows(
- classOf[OffsetsOutOfOrderException],
- () => log.appendAsFollower(memoryRecords, epoch)
- )
- }
-
- @Test
- def testAppendBelowExpectedOffsetThrowsException(): Unit = {
- val log = createLog(logDir, new LogConfig(new Properties))
- val records = (0 until 2).map(id => new
SimpleRecord(id.toString.getBytes)).toArray
- records.foreach(record =>
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, record), 0))
-
- val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
- val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
- for (magic <- magicVals; compressionType <- compressionTypes) {
- val compression = Compression.of(compressionType).build()
- val invalidRecord = MemoryRecords.withRecords(magic, compression, new
SimpleRecord(1.toString.getBytes))
- assertThrows(
- classOf[UnexpectedAppendOffsetException],
- () => log.appendAsFollower(invalidRecord, Int.MaxValue),
- () => s"Magic=$magic, compressionType=$compressionType"
- )
- }
- }
-
- @Test
- def testAppendEmptyLogBelowLogStartOffsetThrowsException(): Unit = {
- createEmptyLogs(logDir, 7)
- val log = createLog(logDir, new LogConfig(new Properties),
brokerTopicStats = brokerTopicStats)
- assertEquals(7L, log.logStartOffset)
- assertEquals(7L, log.logEndOffset)
-
- val firstOffset = 4L
- val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
- val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
- for (magic <- magicVals; compressionType <- compressionTypes) {
- val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes,
"v1".getBytes),
- new SimpleRecord("k2".getBytes,
"v2".getBytes),
- new SimpleRecord("k3".getBytes,
"v3".getBytes)),
- magicValue = magic, codec =
Compression.of(compressionType).build(),
- baseOffset = firstOffset)
-
- val exception = assertThrows(
- classOf[UnexpectedAppendOffsetException],
- () => log.appendAsFollower(batch, Int.MaxValue)
- )
- assertEquals(firstOffset, exception.firstOffset, s"Magic=$magic,
compressionType=$compressionType, UnexpectedAppendOffsetException#firstOffset")
- assertEquals(firstOffset + 2, exception.lastOffset, s"Magic=$magic,
compressionType=$compressionType, UnexpectedAppendOffsetException#lastOffset")
- }
- }
-
- @Test
- def testAppendWithNoTimestamp(): Unit = {
- val log = createLog(logDir, new LogConfig(new Properties))
- log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
- new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes,
"value".getBytes)), 0)
- }
-
- @Test
- def testAppendToOrReadFromLogInFailedLogDir(): Unit = {
- val pid = 1L
- val epoch = 0.toShort
- val log = createLog(logDir, new LogConfig(new Properties))
- log.appendAsLeader(TestUtils.singletonRecords(value = null), 0)
- assertEquals(0, LogTestUtils.readLog(log, 0,
4096).records.records.iterator.next().offset)
- val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
- append(10)
- // Kind of a hack, but renaming the index to a directory ensures that the
append
- // to the index will fail.
- log.activeSegment.txnIndex.renameTo(log.dir)
- assertThrows(classOf[KafkaStorageException], () =>
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT,
- mockTime.milliseconds(), coordinatorEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel()))
- assertThrows(classOf[KafkaStorageException], () =>
log.appendAsLeader(TestUtils.singletonRecords(value = null), 0))
- assertThrows(classOf[KafkaStorageException], () =>
LogTestUtils.readLog(log, 0, 4096).records.records.iterator.next().offset)
- }
-
- @Test
- def testWriteLeaderEpochCheckpointAfterDirectoryRename(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig)
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), 5)
- assertEquals(Optional.of(5), log.latestEpoch)
-
- // Ensure that after a directory rename, the epoch cache is written to the
right location
- val tp = UnifiedLog.parseTopicPartitionName(log.dir)
- log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), 10)
- assertEquals(Optional.of(10), log.latestEpoch)
- assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
- assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists())
- }
-
- @Test
- def testTopicIdTransfersAfterDirectoryRename(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig)
-
- // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
- val topicId = Uuid.randomUuid()
- log.assignTopicId(topicId)
-
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), 5)
- assertEquals(Optional.of(5), log.latestEpoch)
-
- // Ensure that after a directory rename, the partition metadata file is
written to the right location.
- val tp = UnifiedLog.parseTopicPartitionName(log.dir)
- log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), 10)
- assertEquals(Optional.of(10), log.latestEpoch)
- assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
- assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
-
- // Check the topic ID remains in memory and was copied correctly.
- assertTrue(log.topicId.isPresent)
- assertEquals(topicId, log.topicId.get)
- assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
- }
-
- @Test
- def testTopicIdFlushesBeforeDirectoryRename(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig)
-
- // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
- val topicId = Uuid.randomUuid()
- log.partitionMetadataFile.get.record(topicId)
-
- // Ensure that after a directory rename, the partition metadata file is
written to the right location.
- val tp = UnifiedLog.parseTopicPartitionName(log.dir)
- log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
- assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
- assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
-
- // Check the file holds the correct contents.
- assertTrue(log.partitionMetadataFile.get.exists())
- assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
- }
-
- @Test
- def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000,
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
- val log = createLog(logDir, logConfig)
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes()))), 5)
- assertEquals(Optional.of(5), log.leaderEpochCache.latestEpoch)
-
- log.appendAsFollower(
- TestUtils.records(
- List(
- new SimpleRecord("foo".getBytes())
- ),
- baseOffset = 1L,
- magicValue = RecordVersion.V1.value
- ),
- 5
- )
- assertEquals(Optional.empty, log.leaderEpochCache.latestEpoch)
- }
-
- @Test
- def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000")
- logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
- logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536")
- val logConfig = new LogConfig(logProps)
- val log = createLog(logDir, logConfig)
- log.appendAsLeaderWithRecordVersion(TestUtils.records(List(new
SimpleRecord("bar".getBytes())),
- magicValue = RecordVersion.V1.value), 5, RecordVersion.V1)
- assertTrue(log.latestEpoch.isEmpty)
-
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord("foo".getBytes())),
- magicValue = RecordVersion.V2.value), 5)
- assertEquals(5, log.latestEpoch.get)
- }
-
- @Test
- def testSplitOnOffsetOverflow(): Unit = {
- // create a log such that one log segment has offsets that overflow, and
call the split API on that segment
- val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1,
fileDeleteDelayMs = 1000)
- val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig)
- assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment must
have offset overflow")
-
- val allRecordsBeforeSplit = UnifiedLogTest.allRecords(log)
-
- // split the segment with overflow
- log.splitOverflowedSegment(segmentWithOverflow)
-
- // assert we were successfully able to split the segment
- assertEquals(4, log.numberOfSegments)
- UnifiedLogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
-
- // verify we do not have offset overflow anymore
- assertFalse(LogTestUtils.hasOffsetOverflow(log))
- }
-
- @Test
- def testDegenerateSegmentSplit(): Unit = {
- // This tests a scenario where all of the batches appended to a segment
have overflowed.
- // When we split the overflowed segment, only one new segment will be
created.
-
- val overflowOffset = Int.MaxValue + 1L
- val batch1 = MemoryRecords.withRecords(overflowOffset, Compression.NONE, 0,
- new SimpleRecord("a".getBytes))
- val batch2 = MemoryRecords.withRecords(overflowOffset + 1,
Compression.NONE, 0,
- new SimpleRecord("b".getBytes))
-
- testDegenerateSplitSegmentWithOverflow(segmentBaseOffset = 0L,
List(batch1, batch2))
- }
-
- @Test
- def testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset(): Unit = {
- // Degenerate case where the only batch in the segment overflows. In this
scenario,
- // the first offset of the batch is valid, but the last overflows.
-
- val firstBatchBaseOffset = Int.MaxValue - 1
- val records = MemoryRecords.withRecords(firstBatchBaseOffset,
Compression.NONE, 0,
- new SimpleRecord("a".getBytes),
- new SimpleRecord("b".getBytes),
- new SimpleRecord("c".getBytes))
-
- testDegenerateSplitSegmentWithOverflow(segmentBaseOffset = 0L,
List(records))
- }
-
- private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long,
records: List[MemoryRecords]): Unit = {
- val segment = LogTestUtils.rawSegment(logDir, segmentBaseOffset)
- // Need to create the offset files explicitly to avoid triggering segment
recovery to truncate segment.
- Files.createFile(LogFileUtils.offsetIndexFile(logDir,
segmentBaseOffset).toPath)
- Files.createFile(LogFileUtils.timeIndexFile(logDir,
segmentBaseOffset).toPath)
- records.foreach(segment.append)
- segment.close()
-
- val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1,
fileDeleteDelayMs = 1000)
- val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
-
- val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse
{
- throw new AssertionError("Failed to create log with a segment which has
overflowed offsets")
- }
-
- val allRecordsBeforeSplit = UnifiedLogTest.allRecords(log)
- log.splitOverflowedSegment(segmentWithOverflow)
-
- assertEquals(1, log.numberOfSegments)
-
- val firstBatchBaseOffset = records.head.batches.asScala.head.baseOffset
- assertEquals(firstBatchBaseOffset, log.activeSegment.baseOffset)
- UnifiedLogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
-
- assertFalse(LogTestUtils.hasOffsetOverflow(log))
- }
-
- def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache
-
- @Test
- def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
- val lastOffset = 50L
-
- val producerEpoch = 0.toShort
- val producerId = 15L
- val appendProducer = LogTestUtils.appendTransactionalAsLeader(log,
producerId, producerEpoch, mockTime)
-
- // Thread 1 writes single-record transactions and attempts to read them
- // before they have been aborted, and then aborts them
- val txnWriteAndReadLoop: Callable[Int] = () => {
- var nonEmptyReads = 0
- while (log.logEndOffset < lastOffset) {
- val currentLogEndOffset = log.logEndOffset
-
- appendProducer(1)
-
- val readInfo = log.read(currentLogEndOffset, Int.MaxValue,
FetchIsolation.TXN_COMMITTED, false)
-
- if (readInfo.records.sizeInBytes() > 0)
- nonEmptyReads += 1
-
- LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId,
producerEpoch, ControlRecordType.ABORT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
- }
- nonEmptyReads
- }
-
- // Thread 2 watches the log and updates the high watermark
- val hwUpdateLoop: Runnable = () => {
- while (log.logEndOffset < lastOffset) {
- log.updateHighWatermark(log.logEndOffset)
- }
- }
-
- val executor = Executors.newFixedThreadPool(2)
- try {
- executor.submit(hwUpdateLoop)
-
- val future = executor.submit(txnWriteAndReadLoop)
- val nonEmptyReads = future.get()
-
- assertEquals(0, nonEmptyReads)
- } finally {
- executor.shutdownNow()
- }
- }
-
-
- @Test
- def testTransactionIndexUpdated(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
- val epoch = 0.toShort
-
- val pid1 = 1L
- val pid2 = 2L
- val pid3 = 3L
- val pid4 = 4L
-
- val appendPid1 = LogTestUtils.appendTransactionalAsLeader(log, pid1,
epoch, mockTime)
- val appendPid2 = LogTestUtils.appendTransactionalAsLeader(log, pid2,
epoch, mockTime)
- val appendPid3 = LogTestUtils.appendTransactionalAsLeader(log, pid3,
epoch, mockTime)
- val appendPid4 = LogTestUtils.appendTransactionalAsLeader(log, pid4,
epoch, mockTime)
-
- // mix transactional and non-transactional data
- appendPid1(5) // nextOffset: 5
- LogTestUtils.appendNonTransactionalAsLeader(log, 3) // 8
- appendPid2(2) // 10
- appendPid1(4) // 14
- appendPid3(3) // 17
- LogTestUtils.appendNonTransactionalAsLeader(log, 2) // 19
- appendPid1(10) // 29
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch,
ControlRecordType.ABORT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel()) // 30
- appendPid2(6) // 36
- appendPid4(3) // 39
- LogTestUtils.appendNonTransactionalAsLeader(log, 10) // 49
- appendPid3(9) // 58
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid3, epoch,
ControlRecordType.COMMIT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel()) // 59
- appendPid4(8) // 67
- appendPid2(7) // 74
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch,
ControlRecordType.ABORT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel()) // 75
- LogTestUtils.appendNonTransactionalAsLeader(log, 10) // 85
- appendPid4(4) // 89
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid4, epoch,
ControlRecordType.COMMIT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel()) // 90
-
- val abortedTransactions = LogTestUtils.allAbortedTransactions(log)
- val expectedTransactions = List(
- new AbortedTxn(pid1, 0L, 29L, 8L),
- new AbortedTxn(pid2, 8L, 74L, 36L)
- )
- assertEquals(expectedTransactions, abortedTransactions)
-
- // Verify caching of the segment position of the first unstable offset
- log.updateHighWatermark(30L)
- assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
-
- log.updateHighWatermark(75L)
- assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
-
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
@Test
def testTransactionIndexUpdatedThroughReplication(): Unit = {
@@ -2443,53 +1714,6 @@ class UnifiedLogTest {
log
}
- private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog,
LogSegment) = {
- LogTestUtils.initializeLogDirWithOverflowedSegment(logDir)
-
- val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
- val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse
{
- throw new AssertionError("Failed to create log with a segment which has
overflowed offsets")
- }
-
- (log, segmentWithOverflow)
- }
-
- private def assertFetchOffsetByTimestamp(log: UnifiedLog,
remoteLogManagerOpt: Option[RemoteLogManager], expected:
Option[TimestampAndOffset], timestamp: Long): Unit = {
- val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
- val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp,
remoteOffsetReader)
- assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
- offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
- assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
-
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset)
- assertEquals(expected.get,
offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null))
- }
-
- private def assertFetchOffsetBySpecialTimestamp(log: UnifiedLog,
remoteLogManagerOpt: Option[RemoteLogManager], expected: TimestampAndOffset,
timestamp: Long): Unit = {
- val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
- val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp,
remoteOffsetReader)
- assertEquals(new OffsetResultHolder(expected), offsetResultHolder)
- }
-
- private def getRemoteOffsetReader(remoteLogManagerOpt: Option[Any]):
Optional[AsyncOffsetReader] = {
- remoteLogManagerOpt match {
- case Some(remoteLogManager) =>
Optional.of(remoteLogManager.asInstanceOf[AsyncOffsetReader])
- case None => Optional.empty[AsyncOffsetReader]()
- }
- }
-
- private def prepareLogWithSequentialRecords(log: UnifiedLog, recordCount:
Int): Seq[TimestampAndEpoch] = {
- val firstTimestamp = mockTime.milliseconds()
-
- (0 until recordCount).map { i =>
- val timestampAndEpoch = TimestampAndEpoch(firstTimestamp + i, i)
- log.appendAsLeader(
- TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
timestamp = timestampAndEpoch.timestamp),
- timestampAndEpoch.leaderEpoch
- )
- timestampAndEpoch
- }
- }
-
case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
@Test
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 92a1e8cf0d9..e9efb9e5af7 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -30,17 +30,21 @@ import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.RequestLocal;
+import org.apache.kafka.server.storage.log.FetchIsolation;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.LongFunction;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -196,6 +200,10 @@ public class LogTestUtils {
return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
}
+ public static MemoryRecords records(List<SimpleRecord> records, byte
magicValue, long baseOffset) {
+ return records(records, magicValue, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
public static void deleteProducerSnapshotFiles(File logDir) {
Stream.of(logDir.listFiles())
.filter(f -> f.isFile() &&
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
@@ -206,6 +214,79 @@ public class LogTestUtils {
return ProducerStateManager.listSnapshotFiles(logDir).stream().map(f
-> f.offset).sorted().toList();
}
+ public static FetchDataInfo readLog(UnifiedLog log,
+ long startOffset,
+ int maxLength,
+ FetchIsolation isolation,
+ boolean minOneMessage) throws
IOException {
+ return log.read(startOffset, maxLength, isolation, minOneMessage);
+ }
+
+ public static FetchDataInfo readLog(UnifiedLog log, long startOffset, int
maxLength) throws IOException {
+ return readLog(log, startOffset, maxLength, FetchIsolation.LOG_END,
true);
+ }
+
+ public static boolean hasOffsetOverflow(UnifiedLog log) {
+ return firstOverflowSegment(log).isPresent();
+ }
+
+ public static Optional<LogSegment> firstOverflowSegment(UnifiedLog log) {
+ for (LogSegment segment : log.logSegments()) {
+ for (RecordBatch batch : segment.log().batches()) {
+ if (batch.lastOffset() > segment.baseOffset() +
Integer.MAX_VALUE || batch.baseOffset() < segment.baseOffset()) {
+ return Optional.of(segment);
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ public static FileRecords rawSegment(File logDir, long baseOffset) throws
IOException {
+ return FileRecords.open(LogFileUtils.logFile(logDir, baseOffset));
+ }
+
+ /**
+ * Initialize the given log directory with a set of segments, one of which
will have an
+ * offset which overflows the segment
+ */
+ public static void initializeLogDirWithOverflowedSegment(File logDir)
throws IOException {
+ long nextOffset = 0L;
+ nextOffset = writeNormalSegment(logDir, nextOffset);
+ nextOffset = writeOverflowSegment(logDir, nextOffset);
+ writeNormalSegment(logDir, nextOffset);
+ }
+
+ private static long writeSampleBatches(File logDir, long baseOffset,
FileRecords segment) throws IOException {
+ LongFunction<SimpleRecord> record = offset -> {
+ byte[] data = Long.toString(offset).getBytes();
+ return new SimpleRecord(data, data);
+ };
+ segment.append(MemoryRecords.withRecords(baseOffset, Compression.NONE,
0,
+ record.apply(baseOffset)));
+ segment.append(MemoryRecords.withRecords(baseOffset + 1,
Compression.NONE, 0,
+ record.apply(baseOffset + 1),
+ record.apply(baseOffset + 2)));
+ segment.append(MemoryRecords.withRecords(baseOffset +
Integer.MAX_VALUE - 1, Compression.NONE, 0,
+ record.apply(baseOffset + Integer.MAX_VALUE - 1)));
+ // Need to create the offset files explicitly to avoid triggering
segment recovery to truncate segment.
+ Files.createFile(LogFileUtils.offsetIndexFile(logDir,
baseOffset).toPath());
+ Files.createFile(LogFileUtils.timeIndexFile(logDir,
baseOffset).toPath());
+ return baseOffset + Integer.MAX_VALUE;
+ }
+
+ private static long writeNormalSegment(File logDir, long baseOffset)
throws IOException {
+ try (FileRecords segment = rawSegment(logDir, baseOffset)) {
+ return writeSampleBatches(logDir, baseOffset, segment);
+ }
+ }
+
+ private static long writeOverflowSegment(File logDir, long baseOffset)
throws IOException {
+ try (FileRecords segment = rawSegment(logDir, baseOffset)) {
+ long nextOffset = writeSampleBatches(logDir, baseOffset, segment);
+ return writeSampleBatches(logDir, nextOffset, segment);
+ }
+ }
+
public static void appendNonTransactionalAsLeader(UnifiedLog log, int
numRecords) throws IOException {
List<SimpleRecord> simpleRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index a0a207c512a..a9aa710c701 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.AbstractConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.record.internal.CompressionType;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.DefaultRecordBatch;
import org.apache.kafka.common.record.internal.FileRecords;
@@ -40,6 +42,7 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.record.internal.RecordVersion;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -60,6 +63,8 @@ import
org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.utils.Throttler;
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
@@ -72,8 +77,10 @@ import com.yammer.metrics.core.Meter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
@@ -92,7 +99,11 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@@ -101,7 +112,9 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -128,8 +141,9 @@ public class UnifiedLogTest {
private final int maxTransactionTimeoutMs = 60 * 60 * 1000;
private final ProducerStateManagerConfig producerStateManagerConfig = new
ProducerStateManagerConfig(maxTransactionTimeoutMs, false);
private final List<UnifiedLog> logsToClose = new ArrayList<>();
-
+ private RemoteLogManager remoteLogManager;
private UnifiedLog log;
+ private List<TimestampAndEpoch> timestampAndEpochs;
@AfterEach
public void tearDown() throws IOException {
@@ -2391,6 +2405,11 @@ public class UnifiedLogTest {
producerStateManagerConfig, true, Optional.empty(),
remoteStorageSystemEnable);
}
+ private UnifiedLog createLog(File dir, LogConfig config, long
recoveryPoint) throws IOException {
+ return createLog(dir, config, 0L, recoveryPoint, brokerTopicStats,
mockTime.scheduler, mockTime,
+ producerStateManagerConfig, true, Optional.empty(), false);
+ }
+
private UnifiedLog createLog(
File dir,
LogConfig config,
@@ -2445,10 +2464,6 @@ public class UnifiedLogTest {
return log;
}
- public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
- return singletonRecords(value, key, Compression.NONE,
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
- }
-
public static MemoryRecords singletonRecords(byte[] value, long timestamp)
{
return singletonRecords(value, null, Compression.NONE, timestamp,
RecordBatch.CURRENT_MAGIC_VALUE);
}
@@ -3407,6 +3422,888 @@ public class UnifiedLogTest {
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.of(remoteLogManager)));
}
+ @Test
+ public void testFetchEarliestPendingUploadTimestampNoRemoteStorage()
throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Test initial state before any records
+ assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
+ // Append records
+ prepareLogWithSequentialRecords(log, 2);
+
+ // Test state after records are appended
+ assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void testFetchEarliestPendingUploadTimestampWithRemoteStorage()
throws Exception {
+ int logStartOffset = 0;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ doAnswer(ans -> {
+ long timestamp = ans.getArgument(1);
+ if (timestamp == firstTimestamp) {
+ return Optional.of(new
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+ }
+ return Optional.empty();
+ }).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ // Offset 0 (first timestamp) is in remote storage and deleted
locally. Offset 1 (second timestamp) is in local storage.
+ log.updateLocalLogStartOffset(1);
+ log.updateHighestOffsetInRemoteStorage(0);
+
+ // In the assertions below we test that offset 0 (first timestamp) is
only in remote and offset 1 (second timestamp) is in local storage.
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion()
throws Exception {
+ int logStartOffset = 0;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ // Offsets upto 1 are in remote storage
+ doAnswer(ans -> {
+ long timestamp = ans.getArgument(1);
+ if (timestamp == firstTimestamp) {
+ return Optional.of(new
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+ } else if (timestamp == secondTimestamp) {
+ return Optional.of(new
FileRecords.TimestampAndOffset(timestamp, 1L, Optional.of(secondLeaderEpoch)));
+ }
+ return Optional.empty();
+ }).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ // Offsets 0, 1 (first and second timestamps) are in remote storage
and not deleted locally.
+ log.updateLocalLogStartOffset(0);
+ log.updateHighestOffsetInRemoteStorage(1);
+
+ // In the assertions below we test that offset 0 (first timestamp) and
offset 1 (second timestamp) are on both remote and local storage
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void testFetchEarliestPendingUploadTimestampNoSegmentsUploaded()
throws Exception {
+ int logStartOffset = 0;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ // No offsets are in remote storage
+ doAnswer(ans ->
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ // Offsets 0, 1, 2 (first, second and third timestamps) are in local
storage only and not uploaded to remote storage.
+ log.updateLocalLogStartOffset(0);
+ log.updateHighestOffsetInRemoteStorage(-1);
+
+ // In the assertions below we test that offset 0 (first timestamp),
offset 1 (second timestamp) and offset 2 (third timestamp) are only on the
local storage.
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L,
Optional.of(-1)),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ @Test
+ public void
testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote() throws
Exception {
+ int logStartOffset = 100;
+ prepare(logStartOffset);
+
+ long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+ int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+ long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+ int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+ int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+ // Offsets 100, 101, 102 (first, second and third timestamps) are in
local storage and not uploaded to remote storage.
+ // Tiered storage copy was disabled and then enabled again, because of
which the remote log segments are deleted but
+ // the highest offset in remote storage has become stale
+ doAnswer(ans ->
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()),
+ anyLong(),
+ anyLong(),
+ eq(log.leaderEpochCache()));
+
+ log.updateLocalLogStartOffset(100);
+ log.updateHighestOffsetInRemoteStorage(50);
+
+ // In the assertions below we test that offset 100 (first timestamp),
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the
local storage.
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp,
100L, Optional.of(firstLeaderEpoch))), firstTimestamp);
+ assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+ Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp,
101L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L,
Optional.empty()),
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L,
Optional.of(thirdLeaderEpoch)),
+ ListOffsetsRequest.LATEST_TIMESTAMP);
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L,
Optional.of(firstLeaderEpoch)),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+ }
+
+ /**
+ * Test the Log truncate operations
+ */
+ @Test
+ public void testTruncateTo() throws IOException {
+ MemoryRecords createRecords = singletonRecords("test".getBytes(),
mockTime.milliseconds());
+ int setSize = createRecords.sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+
+ // create a log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ for (int i = 0; i < msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segments.");
+ assertEquals(msgPerSeg, log.logEndOffset(), "Log end offset should be
equal to number of messages");
+
+ long lastOffset = log.logEndOffset();
+ long size = log.size();
+ log.truncateTo(log.logEndOffset()); // keep the entire log
+ assertEquals(lastOffset, log.logEndOffset(), "Should not change
offset");
+ assertEquals(size, log.size(), "Should not change log size");
+ log.truncateTo(log.logEndOffset() + 1); // try to truncate beyond
lastOffset
+ assertEquals(lastOffset, log.logEndOffset(), "Should not change offset
but should log error");
+ assertEquals(size, log.size(), "Should not change log size");
+ log.truncateTo(msgPerSeg / 2); // truncate somewhere in between
+ assertEquals(msgPerSeg / 2, log.logEndOffset(), "Should change
offset");
+ assertTrue(log.size() < size, "Should change log size");
+ log.truncateTo(0); // truncate the entire log
+ assertEquals(0, log.logEndOffset(), "Should change offset");
+ assertEquals(0, log.size(), "Should change log size");
+
+ for (int i = 0; i < msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+
+ assertEquals(lastOffset, log.logEndOffset(), "Should be back to
original offset");
+ assertEquals(size, log.size(), "Should be back to original size");
+ log.truncateFullyAndStartAt(log.logEndOffset() - (msgPerSeg - 1),
Optional.empty());
+ assertEquals(lastOffset - (msgPerSeg - 1), log.logEndOffset(), "Should
change offset");
+ assertEquals(0, log.size(), "Should change log size");
+
+ for (int i = 0; i < msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds()), 0);
+
+ assertTrue(log.logEndOffset() > msgPerSeg, "Should be ahead of to
original offset");
+ assertEquals(size, log.size(), "log size should be same as before");
+ log.truncateTo(0); // truncate before first start offset in the log
+ assertEquals(0, log.logEndOffset(), "Should change offset");
+ assertEquals(0, log.size(), "Should change log size");
+ }
+
+ /**
+ * Verify that when we truncate a log the index of the last segment is
resized to the max index size to allow more appends
+ */
+ @Test
+ public void testIndexResizingAtTruncation() throws IOException {
+ int setSize = singletonRecords("test".getBytes(),
mockTime.milliseconds()).sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(segmentSize)
+ .indexIntervalBytes(setSize - 1)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ for (int i = 1; i <= msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() + i), 0);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ mockTime.sleep(msgPerSeg);
+ for (int i = 1; i <= msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() + i), 0);
+ assertEquals(2, log.numberOfSegments(), "There should be exactly 2
segment.");
+ int expectedEntries = msgPerSeg - 1;
+
+ List<LogSegment> segments = new ArrayList<>(log.logSegments());
+ assertEquals(expectedEntries,
segments.get(0).offsetIndex().maxEntries(),
+ "The index of the first segment should have " + expectedEntries +
" entries");
+ assertEquals(expectedEntries, segments.get(0).timeIndex().maxEntries(),
+ "The time index of the first segment should have " +
expectedEntries + " entries");
+
+ log.truncateTo(0);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+ assertEquals(log.config().maxIndexSize / 8, new
ArrayList<>(log.logSegments()).get(0).offsetIndex().maxEntries(),
+ "The index of segment 1 should be resized to maxIndexSize");
+ assertEquals(log.config().maxIndexSize / 12, new
ArrayList<>(log.logSegments()).get(0).timeIndex().maxEntries(),
+ "The time index of segment 1 should be resized to maxIndexSize");
+
+ mockTime.sleep(msgPerSeg);
+ for (int i = 1; i <= msgPerSeg; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() + i), 0);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+ }
+
+ /**
+ * Test that deleted files are deleted after the appropriate time.
+ */
+ @Test
+ public void testAsyncDelete() throws IOException {
+ MemoryRecords createRecords = singletonRecords("test".getBytes(),
mockTime.milliseconds() - 1000L);
+ int asyncDeleteMs = 1000;
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(createRecords.sizeInBytes() * 5)
+ .segmentIndexBytes(1000)
+ .indexIntervalBytes(10000)
+ .retentionMs(999)
+ .fileDeleteDelayMs(asyncDeleteMs)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // append some messages to create some segments
+ for (int i = 0; i < 100; i++)
+ log.appendAsLeader(singletonRecords("test".getBytes(),
mockTime.milliseconds() - 1000L), 0);
+
+ // files should be renamed
+ List<LogSegment> segments = new ArrayList<>(log.logSegments());
+ List<File> oldFiles = new ArrayList<>();
+ for (LogSegment segment : segments) {
+ oldFiles.add(segment.log().file());
+ oldFiles.add(segment.offsetIndexFile());
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+
+ assertEquals(1, log.numberOfSegments(), "Only one segment should
remain.");
+ assertTrue(segments.stream().allMatch(s ->
s.log().file().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) &&
+ segments.stream().allMatch(s ->
s.offsetIndexFile().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)),
+ "All log and index files should end in .deleted");
+ assertTrue(segments.stream().allMatch(s -> s.log().file().exists()) &&
+ segments.stream().allMatch(s -> s.offsetIndexFile().exists()),
+ "The .deleted files should still be there.");
+ assertTrue(oldFiles.stream().noneMatch(File::exists), "The original
file should be gone.");
+
+ // when enough time passes the files should be deleted
+ List<File> deletedFiles = new ArrayList<>();
+ for (LogSegment segment : segments) {
+ deletedFiles.add(segment.log().file());
+ deletedFiles.add(segment.offsetIndexFile());
+ }
+ mockTime.sleep(asyncDeleteMs + 1);
+ assertTrue(deletedFiles.stream().noneMatch(File::exists), "Files
should all be gone.");
+ }
+
+ @Test
+ public void testAppendMessageWithNullPayload() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(null, null)), 0);
+ Record head = LogTestUtils.readLog(log, 0,
4096).records.records().iterator().next();
+ assertEquals(0, head.offset());
+ assertFalse(head.hasValue(), "Message payload should be null.");
+ }
+
+ @Test
+ public void testAppendWithOutOfOrderOffsetsThrowsException() throws
IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+ int epoch = 0;
+ long[] appendOffsets = {0L, 1L, 3L, 2L, 4L};
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+ for (long offset : appendOffsets) {
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, offset,
mockTime.milliseconds(), 1L, (short) 0, 0, false, epoch);
+ builder.append(new SimpleRecord("key".getBytes(),
"value".getBytes()));
+ builder.close();
+ }
+ buffer.flip();
+ MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+ assertThrows(OffsetsOutOfOrderException.class, () ->
log.appendAsFollower(memoryRecords, epoch));
+ }
+
+ private static Stream<Arguments> magicAndCompressionTypes() {
+ return Stream.of(
+ RecordBatch.MAGIC_VALUE_V0,
+ RecordBatch.MAGIC_VALUE_V1,
+ RecordBatch.MAGIC_VALUE_V2
+ ).flatMap(magic -> Stream.of(CompressionType.NONE, CompressionType.LZ4)
+ .map(compressionType -> Arguments.of(magic, compressionType)));
+ }
+
+ @ParameterizedTest(name = "magic={0}, compressionType={1}")
+ @MethodSource("magicAndCompressionTypes")
+ public void testAppendBelowExpectedOffsetThrowsException(byte magic,
CompressionType compressionType) throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ for (int id = 0; id < 2; id++)
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(Integer.toString(id).getBytes())), 0);
+
+ Compression compression = Compression.of(compressionType).build();
+ MemoryRecords invalidRecord = MemoryRecords.withRecords(magic,
compression,
+ new SimpleRecord(Integer.toString(1).getBytes()));
+ assertThrows(
+ UnexpectedAppendOffsetException.class,
+ () -> log.appendAsFollower(invalidRecord, Integer.MAX_VALUE)
+ );
+ }
+
+ @ParameterizedTest(name = "magic={0}, compressionType={1}")
+ @MethodSource("magicAndCompressionTypes")
+ public void testAppendEmptyLogBelowLogStartOffsetThrowsException(byte
magic, CompressionType compressionType) throws IOException {
+ createEmptyLogs(logDir, 7);
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ assertEquals(7L, log.logStartOffset());
+ assertEquals(7L, log.logEndOffset());
+
+ long firstOffset = 4L;
+ MemoryRecords batch = LogTestUtils.records(
+ List.of(new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+ new SimpleRecord("k2".getBytes(), "v2".getBytes()),
+ new SimpleRecord("k3".getBytes(), "v3".getBytes())),
+ magic, Compression.of(compressionType).build(),
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
+ firstOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ UnexpectedAppendOffsetException exception = assertThrows(
+ UnexpectedAppendOffsetException.class,
+ () -> log.appendAsFollower(batch, Integer.MAX_VALUE)
+ );
+ assertEquals(firstOffset, exception.firstOffset,
+ "UnexpectedAppendOffsetException#firstOffset");
+ assertEquals(firstOffset + 2, exception.lastOffset,
+ "UnexpectedAppendOffsetException#lastOffset");
+ }
+
+ @Test
+ public void testAppendWithNoTimestamp() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes(),
"value".getBytes())), 0);
+ }
+
+ @Test
+ public void testAppendToOrReadFromLogInFailedLogDir() throws IOException {
+ long pid = 1L;
+ short epoch = 0;
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0);
+ assertEquals(0, LogTestUtils.readLog(log, 0,
4096).records.records().iterator().next().offset());
+ Consumer<Integer> append =
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+ append.accept(10);
+ // Kind of a hack, but renaming the index to a directory ensures that
the append
+ // to the index will fail.
+ log.activeSegment().txnIndex().renameTo(log.dir());
+ assertThrows(KafkaStorageException.class, () ->
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT,
+ mockTime.milliseconds(), 1, 0,
TransactionVersion.TV_0.featureLevel()));
+ assertThrows(KafkaStorageException.class, () ->
log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0));
+ assertThrows(KafkaStorageException.class, () ->
LogTestUtils.readLog(log, 0,
4096).records.records().iterator().next().offset());
+ }
+
+ @Test
+ public void testWriteLeaderEpochCheckpointAfterDirectoryRename() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(Optional.of(5), log.latestEpoch());
+
+ // Ensure that after a directory rename, the epoch cache is written to
the right location
+ TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 10);
+ assertEquals(Optional.of(10), log.latestEpoch());
+ assertTrue(LeaderEpochCheckpointFile.newFile(log.dir()).exists());
+ assertFalse(LeaderEpochCheckpointFile.newFile(logDir).exists());
+ }
+
+ @Test
+ public void testTopicIdTransfersAfterDirectoryRename() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
+ Uuid topicId = Uuid.randomUuid();
+ log.assignTopicId(topicId);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(Optional.of(5), log.latestEpoch());
+
+ // Ensure that after a directory rename, the partition metadata file
is written to the right location.
+ TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 10);
+ assertEquals(Optional.of(10), log.latestEpoch());
+ assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+ assertFalse(PartitionMetadataFile.newFile(logDir).exists());
+
+ // Check the topic ID remains in memory and was copied correctly.
+ assertTrue(log.topicId().isPresent());
+ assertEquals(topicId, log.topicId().get());
+ assertEquals(topicId,
log.partitionMetadataFile().get().read().topicId());
+ }
+
+ @Test
+ public void testTopicIdFlushesBeforeDirectoryRename() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Write a topic ID to the partition metadata file to ensure it is
transferred correctly.
+ Uuid topicId = Uuid.randomUuid();
+ log.partitionMetadataFile().get().record(topicId);
+
+ // Ensure that after a directory rename, the partition metadata file
is written to the right location.
+ TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+ log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+ assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+ assertFalse(PartitionMetadataFile.newFile(logDir).exists());
+
+ // Check the file holds the correct contents.
+ assertTrue(log.partitionMetadataFile().get().exists());
+ assertEquals(topicId,
log.partitionMetadataFile().get().read().topicId());
+ }
+
+ @Test
+ public void testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages()
throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1000)
+ .indexIntervalBytes(1)
+ .maxMessageBytes(64 * 1024)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(Optional.of(5), log.leaderEpochCache().latestEpoch());
+
+ log.appendAsFollower(
+ LogTestUtils.records(List.of(new SimpleRecord("foo".getBytes())),
RecordVersion.V1.value, 1L),
+ 5
+ );
+ assertEquals(Optional.empty(), log.leaderEpochCache().latestEpoch());
+ }
+
+ @Test
+ public void testLeaderEpochCacheCreatedAfterMessageFormatUpgrade() throws
IOException {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000");
+ logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+ logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536");
+ LogConfig logConfig = new LogConfig(logProps);
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeaderWithRecordVersion(LogTestUtils.records(List.of(new
SimpleRecord("bar".getBytes())),
+ RecordBatch.MAGIC_VALUE_V1, 0L), 5, RecordVersion.V1);
+ assertTrue(log.latestEpoch().isEmpty());
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes()))), 5);
+ assertEquals(5, log.latestEpoch().get());
+ }
+
+ @Test
+ public void testSplitOnOffsetOverflow() throws IOException {
+ // create a log such that one log segment has offsets that overflow,
and call the split API on that segment
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .indexIntervalBytes(1)
+ .fileDeleteDelayMs(1000)
+ .build();
+ LogSegment segmentWithOverflow = createLogWithOverflow(logConfig);
+ assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment
must have offset overflow");
+
+ List<Record> allRecordsBeforeSplit = allRecords(log);
+
+ // split the segment with overflow
+ log.splitOverflowedSegment(segmentWithOverflow);
+
+ // assert we were successfully able to split the segment
+ assertEquals(4, log.numberOfSegments());
+ verifyRecordsInLog(log, allRecordsBeforeSplit);
+
+ // verify we do not have offset overflow anymore
+ assertFalse(LogTestUtils.hasOffsetOverflow(log));
+ }
+
+ @Test
+ public void testDegenerateSegmentSplit() throws IOException {
+ // This tests a scenario where all of the batches appended to a
segment have overflowed.
+ // When we split the overflowed segment, only one new segment will be
created.
+
+ long overflowOffset = (long) Integer.MAX_VALUE + 1;
+ MemoryRecords batch1 = MemoryRecords.withRecords(overflowOffset,
Compression.NONE, 0,
+ new SimpleRecord("a".getBytes()));
+ MemoryRecords batch2 = MemoryRecords.withRecords(overflowOffset + 1,
Compression.NONE, 0,
+ new SimpleRecord("b".getBytes()));
+
+ testDegenerateSplitSegmentWithOverflow(0L, List.of(batch1, batch2));
+ }
+
+ @Test
+ public void testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset()
throws IOException {
+ // Degenerate case where the only batch in the segment overflows. In
this scenario,
+ // the first offset of the batch is valid, but the last overflows.
+
+ long firstBatchBaseOffset = (long) Integer.MAX_VALUE - 1;
+ MemoryRecords records =
MemoryRecords.withRecords(firstBatchBaseOffset, Compression.NONE, 0,
+ new SimpleRecord("a".getBytes()),
+ new SimpleRecord("b".getBytes()),
+ new SimpleRecord("c".getBytes()));
+
+ testDegenerateSplitSegmentWithOverflow(0L, List.of(records));
+ }
+
+ @Test
+ public void testReadCommittedWithConcurrentHighWatermarkUpdates() throws
Exception {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024 * 1024 * 5)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long lastOffset = 50L;
+
+ short producerEpoch = 0;
+ long producerId = 15L;
+ Consumer<Integer> appendProducer =
LogTestUtils.appendTransactionalAsLeader(log, producerId, producerEpoch,
mockTime);
+
+ // Thread 1 writes single-record transactions and attempts to read them
+ // before they have been aborted, and then aborts them
+ Callable<Integer> txnWriteAndReadLoop = () -> {
+ int nonEmptyReads = 0;
+ while (log.logEndOffset() < lastOffset) {
+ long currentLogEndOffset = log.logEndOffset();
+
+ appendProducer.accept(1);
+
+ FetchDataInfo readInfo = log.read(currentLogEndOffset,
Integer.MAX_VALUE, FetchIsolation.TXN_COMMITTED, false);
+
+ if (readInfo.records.sizeInBytes() > 0)
+ nonEmptyReads += 1;
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId,
producerEpoch, ControlRecordType.ABORT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel());
+ }
+ return nonEmptyReads;
+ };
+
+ // Thread 2 watches the log and updates the high watermark
+ Runnable hwUpdateLoop = () -> assertDoesNotThrow(() -> {
+ while (log.logEndOffset() < lastOffset) {
+ log.updateHighWatermark(log.logEndOffset());
+ }
+ });
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ executor.submit(hwUpdateLoop);
+
+ Future<Integer> future = executor.submit(txnWriteAndReadLoop);
+ int nonEmptyReads = future.get();
+
+ assertEquals(0, nonEmptyReads);
+ } finally {
+ executor.shutdownNow();
+ assertDoesNotThrow(() -> executor.awaitTermination(60,
TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testTransactionIndexUpdated() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024 * 1024 * 5)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ short epoch = 0;
+
+ long pid1 = 1L;
+ long pid2 = 2L;
+ long pid3 = 3L;
+ long pid4 = 4L;
+
+ Consumer<Integer> appendPid1 =
LogTestUtils.appendTransactionalAsLeader(log, pid1, epoch, mockTime);
+ Consumer<Integer> appendPid2 =
LogTestUtils.appendTransactionalAsLeader(log, pid2, epoch, mockTime);
+ Consumer<Integer> appendPid3 =
LogTestUtils.appendTransactionalAsLeader(log, pid3, epoch, mockTime);
+ Consumer<Integer> appendPid4 =
LogTestUtils.appendTransactionalAsLeader(log, pid4, epoch, mockTime);
+
+ // mix transactional and non-transactional data
+ appendPid1.accept(5); // nextOffset: 5
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3); // 8
+ appendPid2.accept(2); // 10
+ appendPid1.accept(4); // 14
+ appendPid3.accept(3); // 17
+ LogTestUtils.appendNonTransactionalAsLeader(log, 2); // 19
+ appendPid1.accept(10); // 29
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch,
ControlRecordType.ABORT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel()); // 30
+ appendPid2.accept(6); // 36
+ appendPid4.accept(3); // 39
+ LogTestUtils.appendNonTransactionalAsLeader(log, 10); // 49
+ appendPid3.accept(9); // 58
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid3, epoch,
ControlRecordType.COMMIT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel()); // 59
+ appendPid4.accept(8); // 67
+ appendPid2.accept(7); // 74
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch,
ControlRecordType.ABORT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel()); // 75
+ LogTestUtils.appendNonTransactionalAsLeader(log, 10); // 85
+ appendPid4.accept(4); // 89
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid4, epoch,
ControlRecordType.COMMIT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel()); // 90
+
+ List<AbortedTxn> abortedTransactions = new ArrayList<>();
+ for (LogSegment segment : log.logSegments()) {
+ abortedTransactions.addAll(segment.txnIndex().allAbortedTxns());
+ }
+ List<AbortedTxn> expectedTransactions = List.of(
+ new AbortedTxn(pid1, 0L, 29L, 8L),
+ new AbortedTxn(pid2, 8L, 74L, 36L)
+ );
+ assertEquals(expectedTransactions, abortedTransactions);
+
+ // Verify caching of the segment position of the first unstable offset
+ log.updateHighWatermark(30L);
+ assertCachedFirstUnstableOffset(log, 8L);
+
+ log.updateHighWatermark(75L);
+ assertCachedFirstUnstableOffset(log, 36L);
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+ }
+
+ private void createEmptyLogs(File dir, int... offsets) throws IOException {
+ for (int offset : offsets) {
+ Files.createFile(LogFileUtils.logFile(dir, offset).toPath());
+ Files.createFile(LogFileUtils.offsetIndexFile(dir,
offset).toPath());
+ }
+ }
+
+ private static List<Record> allRecords(UnifiedLog log) {
+ List<Record> recordsFound = new ArrayList<>();
+ for (LogSegment logSegment : log.logSegments()) {
+ for (RecordBatch batch : logSegment.log().batches()) {
+ batch.iterator().forEachRemaining(recordsFound::add);
+ }
+ }
+ return recordsFound;
+ }
+
+ private static void verifyRecordsInLog(UnifiedLog log, List<Record>
expectedRecords) {
+ assertEquals(expectedRecords, allRecords(log));
+ }
+
+ private LogSegment createLogWithOverflow(LogConfig logConfig) throws
IOException {
+ LogTestUtils.initializeLogDirWithOverflowedSegment(logDir);
+ log = createLog(logDir, logConfig, Long.MAX_VALUE);
+ return LogTestUtils.firstOverflowSegment(log).orElseThrow(
+ () -> new AssertionError("Failed to create log with a segment
which has overflowed offsets"));
+ }
+
+ private void testDegenerateSplitSegmentWithOverflow(long
segmentBaseOffset, List<MemoryRecords> records) throws IOException {
+ try (FileRecords segment = LogTestUtils.rawSegment(logDir,
segmentBaseOffset)) {
+ // Need to create the offset files explicitly to avoid triggering
segment recovery to truncate segment.
+ Files.createFile(LogFileUtils.offsetIndexFile(logDir,
segmentBaseOffset).toPath());
+ Files.createFile(LogFileUtils.timeIndexFile(logDir,
segmentBaseOffset).toPath());
+ for (MemoryRecords record : records)
+ segment.append(record);
+ }
+
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .indexIntervalBytes(1)
+ .fileDeleteDelayMs(1000)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig, Long.MAX_VALUE);
+
+ LogSegment segmentWithOverflow =
LogTestUtils.firstOverflowSegment(log).orElseThrow(
+ () -> new AssertionError("Failed to create log with a segment
which has overflowed offsets"));
+
+ List<Record> allRecordsBeforeSplit = allRecords(log);
+ log.splitOverflowedSegment(segmentWithOverflow);
+
+ assertEquals(1, log.numberOfSegments());
+
+ long firstBatchBaseOffset =
records.get(0).batches().iterator().next().baseOffset();
+ assertEquals(firstBatchBaseOffset, log.activeSegment().baseOffset());
+ verifyRecordsInLog(log, allRecordsBeforeSplit);
+
+ assertFalse(LogTestUtils.hasOffsetOverflow(log));
+ }
+
+ private record TimestampAndEpoch(long timestamp, int leaderEpoch) { }
+
+ private void prepare(int logStartOffset) throws IOException {
+ RemoteLogManagerConfig config = createRemoteLogManagerConfig();
+ DelayedOperationPurgatory<DelayedRemoteListOffsets> purgatory =
+ new DelayedOperationPurgatory<>("RemoteListOffsets", 0);
+ remoteLogManager = spy(new RemoteLogManager(
+ config,
+ 0,
+ logDir.getAbsolutePath(),
+ "clusterId",
+ mockTime,
+ tp -> Optional.empty(),
+ (tp, l) -> { },
+ brokerTopicStats,
+ new Metrics(),
+ Optional.empty()
+ ));
+ remoteLogManager.setDelayedOperationPurgatory(purgatory);
+
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .remoteLogStorageEnable(true)
+ .build();
+
+ log = createLog(logDir, logConfig, logStartOffset, 0L,
brokerTopicStats,
+ mockTime.scheduler, mockTime, producerStateManagerConfig,
true, Optional.empty(), true);
+
+ // Verify earliest pending upload offset for empty log
+ assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+ new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP,
logStartOffset, Optional.empty()),
+ ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
+ timestampAndEpochs = prepareLogWithSequentialRecords(log, 3);
+ }
+
+ private List<TimestampAndEpoch> prepareLogWithSequentialRecords(UnifiedLog
log, int recordCount) throws IOException {
+ long firstTimestamp = mockTime.milliseconds();
+ List<TimestampAndEpoch> result = new ArrayList<>();
+
+ for (int i = 0; i < recordCount; i++) {
+ TimestampAndEpoch timestampAndEpoch = new
TimestampAndEpoch(firstTimestamp + i, i);
+ log.appendAsLeader(
+ singletonRecords(TestUtils.randomBytes(10), firstTimestamp +
i),
+ timestampAndEpoch.leaderEpoch
+ );
+ result.add(timestampAndEpoch);
+ }
+
+ return result;
+ }
+
+ private void assertFetchOffsetBySpecialTimestamp(UnifiedLog log,
+
Optional<RemoteLogManager> remoteLogManagerOpt,
+
FileRecords.TimestampAndOffset expected,
+ long timestamp) {
+ Optional<AsyncOffsetReader> remoteOffsetReader =
remoteLogManagerOpt.map(rlm -> rlm);
+ OffsetResultHolder offsetResultHolder =
log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader);
+ assertEquals(new OffsetResultHolder(expected), offsetResultHolder);
+ }
+
+ private void assertFetchOffsetByTimestamp(UnifiedLog log,
+ Optional<RemoteLogManager>
remoteLogManagerOpt,
+
Optional<FileRecords.TimestampAndOffset> expected,
+ long timestamp) throws
Exception {
+ Optional<AsyncOffsetReader> remoteOffsetReader =
remoteLogManagerOpt.map(rlm -> rlm);
+ OffsetResultHolder offsetResultHolder =
log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader);
+ assertTrue(offsetResultHolder.futureHolderOpt().isPresent());
+ offsetResultHolder.futureHolderOpt().get().taskFuture().get(1,
TimeUnit.SECONDS);
+
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().isDone());
+
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().get().hasTimestampAndOffset());
+ assertEquals(expected.get(),
offsetResultHolder.futureHolderOpt().get().taskFuture().get().timestampAndOffset().orElse(null));
+ }
+
+ private void assertCachedFirstUnstableOffset(UnifiedLog log, long
expectedOffset) throws IOException {
+
assertTrue(log.producerStateManager().firstUnstableOffset().isPresent());
+ LogOffsetMetadata firstUnstableOffset =
log.producerStateManager().firstUnstableOffset().get();
+ assertEquals(expectedOffset, firstUnstableOffset.messageOffset);
+ assertFalse(firstUnstableOffset.messageOffsetOnly());
+ assertValidLogOffsetMetadata(log, firstUnstableOffset);
+ }
+
private void assertFetchOffsetByTimestamp(RemoteLogManager
remoteLogManager,
FileRecords.TimestampAndOffset
expected,
long timestamp,