satishd commented on code in PR #16765:
URL: https://github.com/apache/kafka/pull/16765#discussion_r1704933466
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1478,7 +1485,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// Segments are eligible for deletion when:
// 1. they are uploaded to the remote storage
// 2. log-start-offset was incremented higher than the largest offset
in the candidate segment
- if (remoteLogEnabled()) {
+ if (remoteLogEnabledAndRemoteCopyEnabled()) {
Review Comment:
Good to add a comment here that we fallback to `log.retention` configs for
local log segments when `remote.log.copy.disabled` is true.
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4188,11 +4188,77 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
+ @Test
+ def testRetentionOnLocalLogDeletionWhenRemoteCopyDisabled(): Unit = {
+ def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+ val segmentBytes = createRecords.sizeInBytes()
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes,
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+ val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+ // Given 10 segments of 1 message each
+ for (_ <- 0 until 10) {
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ }
+ assertEquals(10, log.logSegments.size)
+
+ log.updateHighWatermark(log.logEndOffset)
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1)
+
+ log.deleteOldSegments()
+ assertEquals(8, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // add remoteCopyDisabled = true
+ val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteCopyDisabled
= true)
+ log.updateConfig(copyDisabledLogConfig)
+
+ // No local logs will be deleted even though local retention bytes is 1
because there are still logs in remote storage
+ log.deleteOldSegments()
+ assertEquals(8, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // simulate the remote logs are all deleted due to retention policy
+ log.updateLogStartOffsetFromRemoteTier(2)
+ assertEquals(8, log.logSegments.size())
+ assertEquals(2, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // try to delete local logs again, 3 segments will be deleted this time
because log start offset == local log start offset,
+ // which means no remote storage is empty. We'll treat this log as local
logs and use retention.bytes for retention policy.
Review Comment:
@showuon Do you plan to add the required local.log and log retention configs
for making `remote.log.copy.disable` as true?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]