kamalcph commented on code in PR #16765:
URL: https://github.com/apache/kafka/pull/16765#discussion_r1703936217


##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4188,11 +4188,77 @@ class UnifiedLogTest {
     assertEquals(1, log.logSegments.size)
   }
 
+  @Test
+  def testRetentionOnLocalLogDeletionWhenRemoteCopyDisabled(): Unit = {
+    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+    val segmentBytes = createRecords.sizeInBytes()
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, 
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+          fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    // Given 10 segments of 1 message each
+    for (_ <- 0 until 10) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(10, log.logSegments.size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 2 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    log.deleteOldSegments()
+    assertEquals(8, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // add remoteCopyDisabled = true
+    val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteCopyDisabled 
= true)
+    log.updateConfig(copyDisabledLogConfig)
+
+    // No local logs will be deleted even though local retention bytes is 1 
because there are still logs in remote storage
+    log.deleteOldSegments()
+    assertEquals(8, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // simulate the remote logs are all deleted due to retention policy
+    log.updateLogStartOffsetFromRemoteTier(2)
+    assertEquals(8, log.logSegments.size())
+    assertEquals(2, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // try to delete local logs again, 3 segments will be deleted this time 
because log start offset == local log start offset,
+    // which means no remote storage is empty. We'll treat this log as local 
logs and use retention.bytes for retention policy.

Review Comment:
   Can we add one validation to ensure that the remote and local log retention 
values (time and size) are same when remote copy gets disabled?
   
   With this, both local and remote log deletion become independent of each 
other and the user will be aware of the data retention settings in the local 
disk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to