kamalcph commented on code in PR #16765:
URL: https://github.com/apache/kafka/pull/16765#discussion_r1702764279


##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4188,11 +4188,77 @@ class UnifiedLogTest {
     assertEquals(1, log.logSegments.size)
   }
 
+  @Test
+  def testRetentionOnLocalLogDeletionWhenRemoteCopyDisabled(): Unit = {
+    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+    val segmentBytes = createRecords.sizeInBytes()
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, 
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+          fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    // Given 10 segments of 1 message each
+    for (_ <- 0 until 10) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(10, log.logSegments.size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 2 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    log.deleteOldSegments()
+    assertEquals(8, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // add remoteCopyDisabled = true
+    val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteCopyDisabled 
= true)
+    log.updateConfig(copyDisabledLogConfig)
+
+    // No local logs will be deleted even though local retention bytes is 1 
because there are still logs in remote storage
+    log.deleteOldSegments()
+    assertEquals(8, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // simulate the remote logs are all deleted due to retention policy
+    log.updateLogStartOffsetFromRemoteTier(2)
+    assertEquals(8, log.logSegments.size())
+    assertEquals(2, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // try to delete local logs again, 3 segments will be deleted this time 
because log start offset == local log start offset,
+    // which means no remote storage is empty. We'll treat this log as local 
logs and use retention.bytes for retention policy.

Review Comment:
   When LSO == LLSO does not mean that the remote logs are **always** empty for 
the partition. Assume that the user configured the remote and local retention 
values to be same to observe only the segment copy to the remote storage. Then, 
disabled the remote copy feature. 
   
   In this case, both the remote and local storage has an overlap of all the 
passive segments. With `hasRemoteLogs` check, we might trigger the local and 
remote segment retention in parallel.   But, it will still work as we are 
moving the log-start-offset on deletion of logs from either local/remote. We 
can either update the comments in the test (or) leave them as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to