This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5288178e50 [fix][test] Fix thread leaks in multiple tests and 
KinesisSink (#25376)
d5288178e50 is described below

commit d5288178e500558fee1aa20e4a058ee7c5954dde
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 20 17:04:27 2026 -0700

    [fix][test] Fix thread leaks in multiple tests and KinesisSink (#25376)
---
 .../zookeeper/PrometheusMetricsProviderConfigTest.java    | 14 +++++++++-----
 .../broker/admin/BrokerEndpointsAuthorizationTest.java    |  4 ++++
 .../broker/admin/ClusterEndpointsAuthorizationTest.java   |  4 ++++
 .../broker/admin/TenantEndpointsAuthorizationTest.java    |  4 ++++
 .../apache/pulsar/client/impl/ClientInterruptTest.java    |  4 ++++
 .../java/org/apache/pulsar/io/kinesis/KinesisSink.java    |  3 +++
 .../org/apache/pulsar/io/redis/sink/RedisSinkTest.java    | 15 +++++++++------
 7 files changed, 37 insertions(+), 11 deletions(-)

diff --git 
a/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
 
b/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
index f076a15db93..d6bda2a9da4 100644
--- 
a/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
+++ 
b/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
@@ -56,11 +56,15 @@ public class PrometheusMetricsProviderConfigTest {
     public void testValidConfig() throws MetricsProviderLifeCycleException {
         CollectorRegistry.defaultRegistry.clear();
         PrometheusMetricsProvider provider = new PrometheusMetricsProvider();
-        Properties configuration = new Properties();
-        configuration.setProperty("httpHost", "0.0.0.0");
-        configuration.setProperty("httpPort", "0");
-        provider.configure(configuration);
-        provider.start();
+        try {
+            Properties configuration = new Properties();
+            configuration.setProperty("httpHost", "0.0.0.0");
+            configuration.setProperty("httpPort", "0");
+            provider.configure(configuration);
+            provider.start();
+        } finally {
+            provider.stop();
+        }
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
index bc94fc85309..44d33d60e0c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
@@ -85,6 +85,10 @@ public class BrokerEndpointsAuthorizationTest extends 
MockedPulsarStandalone {
             superUserAdmin.close();
             superUserAdmin = null;
         }
+        if (nobodyAdmin != null) {
+            nobodyAdmin.close();
+            nobodyAdmin = null;
+        }
         spyAuthorizationService = null;
         orignalAuthorizationService = null;
         super.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
index d6ad8207bd5..43a320f6d35 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
@@ -95,6 +95,10 @@ public class ClusterEndpointsAuthorizationTest extends 
MockedPulsarStandalone {
             superUserAdmin.close();
             superUserAdmin = null;
         }
+        if (nobodyAdmin != null) {
+            nobodyAdmin.close();
+            nobodyAdmin = null;
+        }
         spyAuthorizationService = null;
         orignalAuthorizationService = null;
         super.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
index 0d454abf41b..93fac55e89a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
@@ -90,6 +90,10 @@ public class TenantEndpointsAuthorizationTest extends 
MockedPulsarStandalone {
             superUserAdmin.close();
             superUserAdmin = null;
         }
+        if (nobodyAdmin != null) {
+            nobodyAdmin.close();
+            nobodyAdmin = null;
+        }
         spyAuthorizationService = null;
         orignalAuthorizationService = null;
         super.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
index d6a86fc7959..73ea4bea404 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
@@ -70,6 +70,10 @@ public class ClientInterruptTest extends 
ProducerConsumerBase {
     @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
+        if (client != null) {
+            client.close();
+            client = null;
+        }
         super.internalCleanup();
     }
 
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index db23972905a..abd216c0c70 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -144,6 +144,9 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<GenericObj
 
     @Override
     public void close() {
+        if (scheduledExecutor != null) {
+            scheduledExecutor.shutdownNow();
+        }
         if (kinesisProducer != null) {
             kinesisProducer.flush();
             kinesisProducer.destroy();
diff --git 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 3afcece76b5..8b7ec02516b 100644
--- 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++ 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -70,13 +70,16 @@ public class RedisSinkTest {
         SinkContext sinkContext = Mockito.mock(SinkContext.class);
         sink.open(configs, sinkContext);
 
-        // write should success.
-        sink.write(record);
-        log.info("executed write");
-
-        // sleep to wait backend flush complete
-        Thread.sleep(1000);
+        try {
+            // write should success.
+            sink.write(record);
+            log.info("executed write");
 
+            // sleep to wait backend flush complete
+            Thread.sleep(1000);
+        } finally {
+            sink.close();
+        }
     }
 
     private Record<byte[]> build(String topic, String key, String value) {

Reply via email to