This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5288178e50 [fix][test] Fix thread leaks in multiple tests and
KinesisSink (#25376)
d5288178e50 is described below
commit d5288178e500558fee1aa20e4a058ee7c5954dde
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 20 17:04:27 2026 -0700
[fix][test] Fix thread leaks in multiple tests and KinesisSink (#25376)
---
.../zookeeper/PrometheusMetricsProviderConfigTest.java | 14 +++++++++-----
.../broker/admin/BrokerEndpointsAuthorizationTest.java | 4 ++++
.../broker/admin/ClusterEndpointsAuthorizationTest.java | 4 ++++
.../broker/admin/TenantEndpointsAuthorizationTest.java | 4 ++++
.../apache/pulsar/client/impl/ClientInterruptTest.java | 4 ++++
.../java/org/apache/pulsar/io/kinesis/KinesisSink.java | 3 +++
.../org/apache/pulsar/io/redis/sink/RedisSinkTest.java | 15 +++++++++------
7 files changed, 37 insertions(+), 11 deletions(-)
diff --git
a/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
b/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
index f076a15db93..d6bda2a9da4 100644
---
a/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
+++
b/jetty-upgrade/zookeeper-prometheus-metrics/src/test/java/org/apache/pulsar/metrics/prometheus/zookeeper/PrometheusMetricsProviderConfigTest.java
@@ -56,11 +56,15 @@ public class PrometheusMetricsProviderConfigTest {
public void testValidConfig() throws MetricsProviderLifeCycleException {
CollectorRegistry.defaultRegistry.clear();
PrometheusMetricsProvider provider = new PrometheusMetricsProvider();
- Properties configuration = new Properties();
- configuration.setProperty("httpHost", "0.0.0.0");
- configuration.setProperty("httpPort", "0");
- provider.configure(configuration);
- provider.start();
+ try {
+ Properties configuration = new Properties();
+ configuration.setProperty("httpHost", "0.0.0.0");
+ configuration.setProperty("httpPort", "0");
+ provider.configure(configuration);
+ provider.start();
+ } finally {
+ provider.stop();
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
index bc94fc85309..44d33d60e0c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerEndpointsAuthorizationTest.java
@@ -85,6 +85,10 @@ public class BrokerEndpointsAuthorizationTest extends
MockedPulsarStandalone {
superUserAdmin.close();
superUserAdmin = null;
}
+ if (nobodyAdmin != null) {
+ nobodyAdmin.close();
+ nobodyAdmin = null;
+ }
spyAuthorizationService = null;
orignalAuthorizationService = null;
super.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
index d6ad8207bd5..43a320f6d35 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
@@ -95,6 +95,10 @@ public class ClusterEndpointsAuthorizationTest extends
MockedPulsarStandalone {
superUserAdmin.close();
superUserAdmin = null;
}
+ if (nobodyAdmin != null) {
+ nobodyAdmin.close();
+ nobodyAdmin = null;
+ }
spyAuthorizationService = null;
orignalAuthorizationService = null;
super.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
index 0d454abf41b..93fac55e89a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java
@@ -90,6 +90,10 @@ public class TenantEndpointsAuthorizationTest extends
MockedPulsarStandalone {
superUserAdmin.close();
superUserAdmin = null;
}
+ if (nobodyAdmin != null) {
+ nobodyAdmin.close();
+ nobodyAdmin = null;
+ }
spyAuthorizationService = null;
orignalAuthorizationService = null;
super.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
index d6a86fc7959..73ea4bea404 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java
@@ -70,6 +70,10 @@ public class ClientInterruptTest extends
ProducerConsumerBase {
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
super.internalCleanup();
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index db23972905a..abd216c0c70 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -144,6 +144,9 @@ public class KinesisSink extends AbstractAwsConnector
implements Sink<GenericObj
@Override
public void close() {
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdownNow();
+ }
if (kinesisProducer != null) {
kinesisProducer.flush();
kinesisProducer.destroy();
diff --git
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 3afcece76b5..8b7ec02516b 100644
---
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -70,13 +70,16 @@ public class RedisSinkTest {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
sink.open(configs, sinkContext);
- // write should success.
- sink.write(record);
- log.info("executed write");
-
- // sleep to wait backend flush complete
- Thread.sleep(1000);
+ try {
+ // write should success.
+ sink.write(record);
+ log.info("executed write");
+ // sleep to wait backend flush complete
+ Thread.sleep(1000);
+ } finally {
+ sink.close();
+ }
}
private Record<byte[]> build(String topic, String key, String value) {