satishd commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1546167963
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata]
= None
+ @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
Review Comment:
nit: You can leave it at the earliest place for this field as it is not
really needed for this change.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata]
= None
+ @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
+
+ @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+
/* Keep track of the current high watermark in order to ensure that segments
containing offsets at or above it are
* not eligible for deletion. This means that the active segment is only
eligible for deletion if the high watermark
* equals the log end offset (which may never happen for a partition under
consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from
getting ahead of the high watermark.
*/
- @volatile private var highWatermarkMetadata: LogOffsetMetadata = new
LogOffsetMetadata(logStartOffset)
-
- @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
-
- @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+ @volatile private var highWatermarkMetadata: LogOffsetMetadata = new
LogOffsetMetadata(_localLogStartOffset)
Review Comment:
There won't be any effect with this change as `_localLogStartOffset` is
initialized with `logStartOffset`. But it is good to keep
`_localLogStartOffset` for consistency and relevance of this field.
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -318,6 +318,80 @@ class UnifiedLogTest {
assertHighWatermark(4L)
}
+ @Test
+ def testHighWatermarkMaintenanceForRemoteTopic(): Unit = {
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024,
remoteLogStorageEnable = true)
+ val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+ val leaderEpoch = 0
+
+ def assertHighWatermark(offset: Long): Unit = {
+ assertEquals(offset, log.highWatermark)
+ assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+ }
+
+ // High watermark initialized to 0
+ assertHighWatermark(0L)
+
+ var offset = 0L
+ for(_ <- 0 until 50) {
+ val records = TestUtils.singletonRecords("test".getBytes())
+ val info = log.appendAsLeader(records, leaderEpoch)
+ offset = info.lastOffset
+ if (offset != 0 && offset % 10 == 0)
+ log.roll()
+ }
+ assertEquals(5, log.logSegments.size)
+
+ // High watermark not changed by append
+ assertHighWatermark(0L)
+
+ // Update high watermark as leader
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L))
+ assertHighWatermark(50L)
+ assertEquals(50L, log.logEndOffset)
+
+ // Cannot update high watermark past the log end offset
+ log.updateHighWatermark(60L)
+ assertHighWatermark(50L)
+
+ // simulate calls to upload 3 segments to remote storage and remove them
from local-log.
+ log.updateHighestOffsetInRemoteStorage(30)
+ log.maybeIncrementLocalLogStartOffset(31L,
LogStartOffsetIncrementReason.SegmentDeletion)
+ log.deleteOldSegments()
+ assertEquals(2, log.logSegments.size)
+ assertEquals(31L, log.localLogStartOffset())
+ assertHighWatermark(50L)
+
+ // simulate one remote-log segment deletion
+ val logStartOffset = 11L
+ log.maybeIncrementLogStartOffset(logStartOffset,
LogStartOffsetIncrementReason.SegmentDeletion)
+ assertEquals(11, log.logStartOffset)
+
+ // Updating the HW below the log-start-offset / local-log-start-offset is
not allowed. HW should reset to local-log-start-offset.
+ log.updateHighWatermark(new LogOffsetMetadata(5L))
+ assertHighWatermark(31L)
+ // Updating the HW between log-start-offset and local-log-start-offset is
not allowed. HW should reset to local-log-start-offset.
+ log.updateHighWatermark(new LogOffsetMetadata(25L))
+ assertHighWatermark(31L)
+ // Updating the HW between local-log-start-offset and log-end-offset is
allowed.
+ log.updateHighWatermark(new LogOffsetMetadata(32L))
+ assertHighWatermark(32L)
+ assertEquals(11L, log.logStartOffset)
+ assertEquals(31L, log.localLogStartOffset())
+
+ // Truncating the logs to below the local-log-start-offset, should update
the high watermark
Review Comment:
Good to see covering the truncation scenarios also.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]