This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f766ce54e69 KAFKA-20346 LogCleaner#cleaners should be thread-safe
(#21847)
f766ce54e69 is described below
commit f766ce54e693ea94ce16b00bf820d0bcd11685fe
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Apr 6 17:58:15 2026 +0800
KAFKA-20346 LogCleaner#cleaners should be thread-safe (#21847)
logCleaner#cleaner can be accessed by two threads `
kafka-0-metadata-loader-event-handler` and JMX reader thread so we
should use copyOnwrtieList to avoid concurrent issues.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/storage/internals/log/LogCleaner.java | 4 +-
.../internals/log/LogCleanerIntegrationTest.java | 43 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
index 19f973c29fb..72acdf3cfc1 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
@@ -35,12 +35,12 @@ import java.io.File;
import java.io.IOException;
import java.security.DigestException;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -140,7 +140,7 @@ public class LogCleaner implements BrokerReconfigurable {
private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
private final LogDirFailureChannel logDirFailureChannel;
private final Time time;
- private final List<CleanerThread> cleaners = new ArrayList<>();
+ private final List<CleanerThread> cleaners = new CopyOnWriteArrayList<>();
/**
* Log cleaner configuration which may be dynamically updated.
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
index 7abf4b63265..a8c65daf7ff 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerIntegrationTest.java
@@ -64,6 +64,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.LongStream;
@@ -591,6 +592,48 @@ public class LogCleanerIntegrationTest {
"log should have been compacted: startSize=" + startSize + "
compactedSize=" + compactedSize);
}
+ @Test
+ public void testGaugeReadsAreNotAffectedByReconfigure() throws Exception {
+ cleaner = makeCleaner(TOPIC_PARTITIONS, CLEANER_BACKOFF_MS,
MIN_COMPACTION_LAG, SEGMENT_SIZE);
+ cleaner.startup();
+
+ AbstractConfig config1Thread = makeReconfigureConfig(1);
+ AbstractConfig config2Thread = makeReconfigureConfig(2);
+
+ var checkError = CompletableFuture.runAsync(() -> {
+ var endtime = System.currentTimeMillis() +
Duration.ofSeconds(5).toMillis();
+ while (System.currentTimeMillis() < endtime) {
+ cleaner.maxOverCleanerThreads(t ->
t.lastStats().bufferUtilization());
+ cleaner.deadThreadCount();
+ }
+ });
+
+ var updateCleaner = CompletableFuture.runAsync(() -> {
+ var useOne = true;
+ var endtime = System.currentTimeMillis() +
Duration.ofSeconds(5).toMillis();
+ while (System.currentTimeMillis() < endtime) {
+ AbstractConfig oldCfg = useOne ? config2Thread : config1Thread;
+ AbstractConfig newCfg = useOne ? config1Thread : config2Thread;
+ cleaner.reconfigure(oldCfg, newCfg);
+ useOne = !useOne;
+ }
+ });
+
+ checkError.join();
+ updateCleaner.join();
+ }
+
+ private AbstractConfig makeReconfigureConfig(int numThreads) {
+ // Extend CleanerConfig.CONFIG_DEF with message.max.bytes, which
CleanerConfig(AbstractConfig)
+ // reads via ServerConfigs.MESSAGE_MAX_BYTES_CONFIG but which is not
part of CleanerConfig's own ConfigDef.
+ ConfigDef configDef = new ConfigDef(CleanerConfig.CONFIG_DEF)
+ .define("message.max.bytes", ConfigDef.Type.INT,
+ DEFAULT_MAX_MESSAGE_SIZE, ConfigDef.Importance.MEDIUM,
"");
+ Map<String, Object> props = new HashMap<>();
+ props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, numThreads);
+ return new AbstractConfig(configDef, props);
+ }
+
private void checkLastCleaned(String topic, int partitionId, long
firstDirty) throws InterruptedException {
// wait until cleaning up to base_offset, note that cleaning happens
only when "log dirty ratio" is higher than
// TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG