showuon commented on code in PR #16765:
URL: https://github.com/apache/kafka/pull/16765#discussion_r1704051456


##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4188,11 +4188,77 @@ class UnifiedLogTest {
     assertEquals(1, log.logSegments.size)
   }
 
+  @Test
+  def testRetentionOnLocalLogDeletionWhenRemoteCopyDisabled(): Unit = {
+    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+    val segmentBytes = createRecords.sizeInBytes()
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, 
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+          fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    // Given 10 segments of 1 message each
+    for (_ <- 0 until 10) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(10, log.logSegments.size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 2 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    log.deleteOldSegments()
+    assertEquals(8, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // add remoteCopyDisabled = true
+    val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteCopyDisabled 
= true)
+    log.updateConfig(copyDisabledLogConfig)
+
+    // No local logs will be deleted even though local retention bytes is 1 
because there are still logs in remote storage
+    log.deleteOldSegments()
+    assertEquals(8, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // simulate the remote logs are all deleted due to retention policy
+    log.updateLogStartOffsetFromRemoteTier(2)
+    assertEquals(8, log.logSegments.size())
+    assertEquals(2, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // try to delete local logs again, 3 segments will be deleted this time 
because log start offset == local log start offset,
+    // which means no remote storage is empty. We'll treat this log as local 
logs and use retention.bytes for retention policy.

Review Comment:
   Thanks @kamalcph !
   I was trying to start a new discussion in KIP-950 to change the design 
(again...), then, when I re-read the KIP, I found the local log retention 
change was originally defined:
   
   > When tiered storage is disabled or becomes read-only on a topic, the local 
retention configuration becomes irrelevant, and all data expiration follows the 
topic-wide retention configuration exclusively.
   
   With that, I think all we need to do is to make the document clear, and 
follow the original design. I've updated the PR accordingly. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to