This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a919db3935 [fix][test] Whitelist SharedPulsarCluster threads in
thread leak detector (#25375)
7a919db3935 is described below
commit 7a919db39351981e33f8c42bb3cd2ef10ec90483
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 20 17:05:28 2026 -0700
[fix][test] Whitelist SharedPulsarCluster threads in thread leak detector
(#25375)
---
.../pulsar/tests/ThreadLeakDetectorListener.java | 23 +++++++++++++++++++++-
.../pulsar/broker/service/SharedPulsarCluster.java | 5 +++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
index 43f219ebc23..93baf491b48 100644
---
a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
+++
b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadLeakDetectorListener.java
@@ -59,7 +59,9 @@ public class ThreadLeakDetectorListener extends
BetweenTestClassesListenerAdapte
private static final boolean COLLECT_THREADDUMP =
Boolean.parseBoolean(System.getenv().getOrDefault("THREAD_LEAK_DETECTOR_COLLECT_THREADDUMP",
"true"));
- private Set<ThreadKey> capturedThreadKeys;
+ private static volatile ThreadLeakDetectorListener activeInstance;
+
+ private volatile Set<ThreadKey> capturedThreadKeys;
private static final Field THREAD_TARGET_FIELD;
static {
@@ -76,10 +78,29 @@ public class ThreadLeakDetectorListener extends
BetweenTestClassesListenerAdapte
@Override
public void onStart(ISuite suite) {
+ activeInstance = this;
// capture the initial set of threads
detectLeakedThreads(Collections.emptyList());
}
+ /**
+ * Re-captures the current set of threads as the baseline. This should be
called after
+ * shared infrastructure (e.g., a JVM-wide singleton cluster) has been
fully initialized,
+ * so that its threads are not reported as leaks of the first test class
that triggers
+ * the initialization.
+ */
+ public static void resetCapturedThreads() {
+ ThreadLeakDetectorListener listener = activeInstance;
+ if (listener != null) {
+ listener.capturedThreadKeys = Collections.unmodifiableSet(
+ ThreadUtils.getAllThreads().stream()
+ .filter(thread -> !shouldSkipThread(thread))
+ .map(ThreadKey::of)
+ .collect(Collectors.<ThreadKey,
Set<ThreadKey>>toCollection(
+ LinkedHashSet::new)));
+ }
+ }
+
@Override
protected void onBetweenTestClasses(List<ITestClass> testClasses) {
detectLeakedThreads(testClasses);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index 41ff4eb20cd..d67c8194edf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+import org.apache.pulsar.tests.ThreadLeakDetectorListener;
/**
* JVM-wide singleton that manages a lightweight Pulsar cluster for
integration tests.
@@ -185,6 +186,10 @@ public class SharedPulsarCluster {
log.info("SharedPulsarCluster started. broker={} web={}",
pulsarService.getBrokerServiceUrl(),
pulsarService.getWebServiceAddress());
+
+ // Reset the thread leak detector baseline so that threads created by
+ // this shared cluster are not reported as leaks of the first test
class
+ ThreadLeakDetectorListener.resetCapturedThreads();
}
private void close() throws Exception {