m1a2st commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2330253003


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1906,16 +1906,29 @@ private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason rea
 
     /**
      * If topic deletion is enabled, delete any local log segments that have 
either expired due to time based
-     * retention or because the log size is > retentionSize. Whether or not 
deletion is enabled, delete any local
-     * log segments that are before the log start offset
+     * retention or because the log size is > retentionSize. Empty 
cleanup.policy is the same as delete with 
+     * infinite retention, so we only need to delete local segments if remote 
storage is enabled. Whether or 
+     * not deletion is enabled, delete any local log segments that are before 
the log start offset
      */
     public int deleteOldSegments() throws IOException {
         if (config().delete) {
             return deleteLogStartOffsetBreachedSegments() +
                     deleteRetentionSizeBreachedSegments() +
                     deleteRetentionMsBreachedSegments();
-        } else {
+        } else if (config().compact) {
             return deleteLogStartOffsetBreachedSegments();
+        } else {
+            // If cleanup.policy is empty and remote storage is enabled, the 
local log segments will 
+            // be cleaned based on the values of log.local.retention.bytes and 
log.local.retention.ms
+            if (remoteLogEnabledAndRemoteCopyEnabled()) {
+                return deleteLogStartOffsetBreachedSegments() +
+                        deleteRetentionSizeBreachedSegments() +
+                        deleteRetentionMsBreachedSegments();
+            } else {
+                // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local 
+                // log segments
+                return deleteLogStartOffsetBreachedSegments();

Review Comment:
   Agree, we should allow an empty cleanup.policy when receiving a 
DeleteRecordsRequest. I will update this change in the follow up PR.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1906,16 +1906,29 @@ private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason rea
 
     /**
      * If topic deletion is enabled, delete any local log segments that have 
either expired due to time based
-     * retention or because the log size is > retentionSize. Whether or not 
deletion is enabled, delete any local
-     * log segments that are before the log start offset
+     * retention or because the log size is > retentionSize. Empty 
cleanup.policy is the same as delete with 
+     * infinite retention, so we only need to delete local segments if remote 
storage is enabled. Whether or 
+     * not deletion is enabled, delete any local log segments that are before 
the log start offset
      */
     public int deleteOldSegments() throws IOException {
         if (config().delete) {
             return deleteLogStartOffsetBreachedSegments() +
                     deleteRetentionSizeBreachedSegments() +
                     deleteRetentionMsBreachedSegments();
-        } else {
+        } else if (config().compact) {
             return deleteLogStartOffsetBreachedSegments();
+        } else {
+            // If cleanup.policy is empty and remote storage is enabled, the 
local log segments will 
+            // be cleaned based on the values of log.local.retention.bytes and 
log.local.retention.ms
+            if (remoteLogEnabledAndRemoteCopyEnabled()) {
+                return deleteLogStartOffsetBreachedSegments() +
+                        deleteRetentionSizeBreachedSegments() +
+                        deleteRetentionMsBreachedSegments();
+            } else {
+                // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local 
+                // log segments
+                return deleteLogStartOffsetBreachedSegments();

Review Comment:
   Agree, we should allow an empty `cleanup.policy` when receiving a 
`DeleteRecordsRequest`. I will update this change in the follow up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to