junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2326299192


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1906,16 +1906,29 @@ private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason rea
 
     /**
      * If topic deletion is enabled, delete any local log segments that have 
either expired due to time based
-     * retention or because the log size is > retentionSize. Whether or not 
deletion is enabled, delete any local
-     * log segments that are before the log start offset
+     * retention or because the log size is > retentionSize. Empty 
cleanup.policy is the same as delete with 
+     * infinite retention, so we only need to delete local segments if remote 
storage is enabled. Whether or 
+     * not deletion is enabled, delete any local log segments that are before 
the log start offset
      */
     public int deleteOldSegments() throws IOException {
         if (config().delete) {
             return deleteLogStartOffsetBreachedSegments() +
                     deleteRetentionSizeBreachedSegments() +
                     deleteRetentionMsBreachedSegments();
-        } else {
+        } else if (config().compact) {
             return deleteLogStartOffsetBreachedSegments();
+        } else {
+            // If cleanup.policy is empty and remote storage is enabled, the 
local log segments will 
+            // be cleaned based on the values of log.local.retention.bytes and 
log.local.retention.ms
+            if (remoteLogEnabledAndRemoteCopyEnabled()) {
+                return deleteLogStartOffsetBreachedSegments() +
+                        deleteRetentionSizeBreachedSegments() +
+                        deleteRetentionMsBreachedSegments();
+            } else {
+                // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local 
+                // log segments

Review Comment:
   > we should not delete any local log segments
   
   This is a bit inaccurate. We probably want to say "we should not delete any 
local log segments unless the log start offset advances through deleteRecords".



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -716,26 +716,27 @@ public void testInterceptorConstructorClose(GroupProtocol 
groupProtocol) {
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     public void 
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol
 groupProtocol) {
-        final int targetInterceptor = 3;
+        final int targetInterceptor = 1;
 
         try {
             Properties props = new Properties();
             props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
             props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-            props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,  
MockConsumerInterceptor.class.getName() + ", "
-                    + MockConsumerInterceptor.class.getName() + ", "
-                    + MockConsumerInterceptor.class.getName());
+            props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
+                    CloseInterceptor.class.getName() + "," + 
MockConsumerInterceptor.class.getName());
 
             
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
 
             assertThrows(KafkaException.class, () -> newConsumer(
                     props, new StringDeserializer(), new 
StringDeserializer()));
 
-            assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
-            assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
+            assertEquals(1, MockConsumerInterceptor.CONFIG_COUNT.get());
+            assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
 
+            assertEquals(1, CloseInterceptor.CLOSE_COUNT.get());

Review Comment:
   This is an existing issue. So, the interceptors are called in reverse 
ordering?



##########
docs/upgrade.html:
##########
@@ -113,6 +113,35 @@ <h5><a id="upgrade_420_notable" 
href="#upgrade_420_notable">Notable changes in 4
     <li>
         The <code>num.replica.fetchers</code> config has a new lower bound of 
1.
     </li>
+    <li>
+        Improvements have been made to the validation rules and default values 
of LIST-type configurations
+        (<a href="https://cwiki.apache.org/confluence/x/HArXF";>KIP-1161</a>).
+        <ul>
+            <li>
+                LIST-type configurations now enforce stricter validation:
+                <ul>
+                    <li>Null values are no longer accepted for most LIST-type 
configurations, except those that explicitly
+                        allow a null default value or where a null value has a 
well-defined semantic meaning.</li>
+                    <li>Duplicate entries within the same list are no longer 
permitted.</li>
+                    <li>Empty lists are no longer allowed, except in 
configurations where an empty list has a well-defined
+                        semantic meaning.</li>
+                </ul>
+            </li>
+            <li>
+                Several configurations have been reclassified from STRING-type 
to LIST-type to better reflect their
+                intended use as comma-separated values.
+            </li>
+            <li>
+                Default values for certain configurations have been adjusted 
to ensure better consistency with related
+                settings.
+            </li>
+            <li>
+                The <code>cleanup.policy</code> is empty and 
<code>remote.storage.enable</code> is set to true, the

Review Comment:
   The <code>cleanup.policy</code> is empty => If <code>cleanup.policy</code> 
is empty
   
   Also, could we also add "cleanup.policy supports empty, which means infinite 
retention."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to