This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bf459748671 [fix][test] Fix flaky ExtensibleLoadManagerImpl client
reconnection tests: PulsarClientException$AlreadyClosedException: Client
already closed (#25509)
bf459748671 is described below
commit bf459748671ce62f6cb18f0b9e3b68cc06abf4b4
Author: void-ptr974 <[email protected]>
AuthorDate: Mon Apr 13 10:42:57 2026 +0800
[fix][test] Fix flaky ExtensibleLoadManagerImpl client reconnection tests:
PulsarClientException$AlreadyClosedException: Client already closed (#25509)
---
.../ExtensibleLoadManagerImplBaseTest.java | 52 +++++++++-------------
.../extensions/ExtensibleLoadManagerImplTest.java | 29 +++++++++---
...LoadManagerImplWithAdvertisedListenersTest.java | 47 ++++++++++++-------
3 files changed, 76 insertions(+), 52 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index d292e2ebc5b..f31f932e8a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -86,8 +85,28 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
return conf;
}
- protected ArrayList<PulsarClient> clients = new ArrayList<>();
- private final java.util.Map<PulsarClient, LookupService>
originalLookupServices = new java.util.HashMap<>();
+ /**
+ * Create fresh PulsarClient instances for use within a single test method.
+ * Each test creates and closes its own clients to avoid shared mutable
state
+ * that causes "Client already closed" flakiness.
+ */
+ protected List<PulsarClient> createTestClients(int count) throws Exception
{
+ List<PulsarClient> testClients = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ testClients.add(pulsarClient(lookupUrl.toString(), 100));
+ }
+ return testClients;
+ }
+
+ protected static void closeTestClients(List<PulsarClient> testClients) {
+ for (PulsarClient client : testClients) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
@DataProvider(name = "serviceUnitStateTableViewClassName")
public static Object[][] serviceUnitStateTableViewClassName() {
@@ -151,13 +170,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()), false);
lookupService = (LookupService)
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
-
- for (int i = 0; i < 4; i++) {
- PulsarClient client = pulsarClient(lookupUrl.toString(), 100);
- clients.add(client);
- originalLookupServices.put(client,
- (LookupService) FieldUtils.readDeclaredField(client,
"lookup", true));
- }
}
@SuppressWarnings("deprecation")
@@ -172,27 +184,15 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (PulsarClient client : clients) {
- futures.add(client.closeAsync());
- }
- futures.add(pulsar2.closeAsync());
-
if (additionalPulsarTestContext != null) {
additionalPulsarTestContext.close();
additionalPulsarTestContext = null;
}
super.internalCleanup();
- try {
- FutureUtil.waitForAll(futures).join();
- } catch (Throwable e) {
- // skip error
- }
pulsar1 = pulsar2 = null;
primaryLoadManager = secondaryLoadManager = null;
channel1 = channel2 = null;
lookupService = null;
-
}
@BeforeMethod(alwaysRun = true)
@@ -200,14 +200,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
admin.namespaces().unload(defaultTestNamespace);
reset(primaryLoadManager, secondaryLoadManager);
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
- // Restore original lookup services for all shared clients to prevent
state leakage
- // between tests when a previous test fails before resetting spied
lookup services.
- for (PulsarClient client : clients) {
- LookupService original = originalLookupServices.get(client);
- if (original != null) {
- FieldUtils.writeDeclaredField(client, "lookup", original,
true);
- }
- }
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index af071f35b01..dd5c624dcdb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -534,8 +534,14 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
@Test(timeOut = 30_000, dataProvider =
"isPersistentTopicSubscriptionTypeTest")
public void testTransferClientReconnectionWithoutLookup(TopicDomain
topicDomain, SubscriptionType subscriptionType)
throws Exception {
- testTransferClientReconnectionWithoutLookup(clients, topicDomain,
subscriptionType, defaultTestNamespace,
- admin, lookupUrl.toString(), pulsar1, pulsar2,
primaryLoadManager, secondaryLoadManager);
+ var testClients = createTestClients(4);
+ try {
+ testTransferClientReconnectionWithoutLookup(testClients,
topicDomain, subscriptionType,
+ defaultTestNamespace, admin, lookupUrl.toString(),
pulsar1, pulsar2,
+ primaryLoadManager, secondaryLoadManager);
+ } finally {
+ closeTestClients(testClients);
+ }
}
@Test(enabled = false)
@@ -691,8 +697,13 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
@Test(timeOut = 30 * 1000, dataProvider =
"isPersistentTopicSubscriptionTypeTest")
public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType
subscriptionType) throws Exception {
- testUnloadClientReconnectionWithLookup(clients, topicDomain,
subscriptionType, defaultTestNamespace,
- admin, lookupUrl.toString(), pulsar1);
+ var testClients = createTestClients(1);
+ try {
+ testUnloadClientReconnectionWithLookup(testClients, topicDomain,
subscriptionType,
+ defaultTestNamespace, admin, lookupUrl.toString(),
pulsar1);
+ } finally {
+ closeTestClients(testClients);
+ }
}
@SuppressWarnings("deprecation")
@@ -714,6 +725,7 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
PulsarClient pulsarClient = null;
try {
pulsarClient = clients.get(0);
+ @Cleanup
var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
var consumerCount = subscriptionType == SubscriptionType.Exclusive
? 1 : 3;
@@ -788,8 +800,13 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws
Exception {
- testOptimizeUnloadDisable(clients, topicDomain, defaultTestNamespace,
admin, lookupUrl.toString(), pulsar1,
- pulsar2);
+ var testClients = createTestClients(1);
+ try {
+ testOptimizeUnloadDisable(testClients, topicDomain,
defaultTestNamespace, admin,
+ lookupUrl.toString(), pulsar1, pulsar2);
+ } finally {
+ closeTestClients(testClients);
+ }
}
@SuppressWarnings("deprecation")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
index 54079c71c66..ac1edb6af69 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
@@ -72,23 +72,33 @@ public class
ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens
@Test(timeOut = 30_000, dataProvider =
"isPersistentTopicSubscriptionTypeTest")
public void testTransferClientReconnectionWithoutLookup(TopicDomain
topicDomain, SubscriptionType subscriptionType)
throws Exception {
-
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(
- clients,
- topicDomain, subscriptionType,
- defaultTestNamespace, admin,
- brokerServiceUrl,
- pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
+ var testClients = createTestClients(4);
+ try {
+
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(
+ testClients,
+ topicDomain, subscriptionType,
+ defaultTestNamespace, admin,
+ brokerServiceUrl,
+ pulsar1, pulsar2, primaryLoadManager,
secondaryLoadManager);
+ } finally {
+ closeTestClients(testClients);
+ }
}
@Test(timeOut = 30 * 1000, dataProvider =
"isPersistentTopicSubscriptionTypeTest")
public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
SubscriptionType
subscriptionType) throws Exception {
- ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(
- clients,
- topicDomain, subscriptionType,
- defaultTestNamespace, admin,
- brokerServiceUrl,
- pulsar1);
+ var testClients = createTestClients(1);
+ try {
+
ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(
+ testClients,
+ topicDomain, subscriptionType,
+ defaultTestNamespace, admin,
+ brokerServiceUrl,
+ pulsar1);
+ } finally {
+ closeTestClients(testClients);
+ }
}
@DataProvider(name = "isPersistentTopicTest")
@@ -98,10 +108,15 @@ public class
ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens
@Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws
Exception {
- ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(
- clients,
- topicDomain, defaultTestNamespace, admin,
- brokerServiceUrl, pulsar1, pulsar2);
+ var testClients = createTestClients(1);
+ try {
+ ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(
+ testClients,
+ topicDomain, defaultTestNamespace, admin,
+ brokerServiceUrl, pulsar1, pulsar2);
+ } finally {
+ closeTestClients(testClients);
+ }
}
}