This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f766ce54e69 KAFKA-20346 LogCleaner#cleaners should be thread-safe 
(#21847)
f766ce54e69 is described below

commit f766ce54e693ea94ce16b00bf820d0bcd11685fe
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Apr 6 17:58:15 2026 +0800

    KAFKA-20346 LogCleaner#cleaners should be thread-safe (#21847)
    
    logCleaner#cleaner can be accessed by two threads `
    kafka-0-metadata-loader-event-handler` and JMX reader thread so we
    should  use copyOnwrtieList to avoid concurrent issues.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/storage/internals/log/LogCleaner.java    |  4 +-
 .../internals/log/LogCleanerIntegrationTest.java   | 43 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
index 19f973c29fb..72acdf3cfc1 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
@@ -35,12 +35,12 @@ import java.io.File;
 import java.io.IOException;
 import java.security.DigestException;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -140,7 +140,7 @@ public class LogCleaner implements BrokerReconfigurable {
     private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
     private final LogDirFailureChannel logDirFailureChannel;
     private final Time time;
-    private final List<CleanerThread> cleaners = new ArrayList<>();
+    private final List<CleanerThread> cleaners = new CopyOnWriteArrayList<>();
 
     /**
      * Log cleaner configuration which may be dynamically updated.
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
index 7abf4b63265..a8c65daf7ff 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
@@ -64,6 +64,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.LongStream;
@@ -591,6 +592,48 @@ public class LogCleanerIntegrationTest {
             "log should have been compacted: startSize=" + startSize + " 
compactedSize=" + compactedSize);
     }
 
+    @Test
+    public void testGaugeReadsAreNotAffectedByReconfigure() throws Exception {
+        cleaner = makeCleaner(TOPIC_PARTITIONS, CLEANER_BACKOFF_MS, 
MIN_COMPACTION_LAG, SEGMENT_SIZE);
+        cleaner.startup();
+
+        AbstractConfig config1Thread = makeReconfigureConfig(1);
+        AbstractConfig config2Thread = makeReconfigureConfig(2);
+
+        var checkError = CompletableFuture.runAsync(() -> {
+            var endtime = System.currentTimeMillis() + 
Duration.ofSeconds(5).toMillis();
+            while (System.currentTimeMillis() < endtime) {
+                cleaner.maxOverCleanerThreads(t -> 
t.lastStats().bufferUtilization());
+                cleaner.deadThreadCount();
+            }
+        });
+
+        var updateCleaner = CompletableFuture.runAsync(() -> {
+            var useOne = true;
+            var endtime = System.currentTimeMillis() + 
Duration.ofSeconds(5).toMillis();
+            while (System.currentTimeMillis() < endtime) {
+                AbstractConfig oldCfg = useOne ? config2Thread : config1Thread;
+                AbstractConfig newCfg = useOne ? config1Thread : config2Thread;
+                cleaner.reconfigure(oldCfg, newCfg);
+                useOne = !useOne;
+            }
+        });
+
+        checkError.join();
+        updateCleaner.join();
+    }
+
+    private AbstractConfig makeReconfigureConfig(int numThreads) {
+        // Extend CleanerConfig.CONFIG_DEF with message.max.bytes, which 
CleanerConfig(AbstractConfig)
+        // reads via ServerConfigs.MESSAGE_MAX_BYTES_CONFIG but which is not 
part of CleanerConfig's own ConfigDef.
+        ConfigDef configDef = new ConfigDef(CleanerConfig.CONFIG_DEF)
+                .define("message.max.bytes", ConfigDef.Type.INT,
+                        DEFAULT_MAX_MESSAGE_SIZE, ConfigDef.Importance.MEDIUM, 
"");
+        Map<String, Object> props = new HashMap<>();
+        props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, numThreads);
+        return new AbstractConfig(configDef, props);
+    }
+
     private void checkLastCleaned(String topic, int partitionId, long 
firstDirty) throws InterruptedException {
         // wait until cleaning up to base_offset, note that cleaning happens 
only when "log dirty ratio" is higher than
         // TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG

Reply via email to