This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3b92e856404d772244b517df848ddef0b856f034
Author: void-ptr974 <[email protected]>
AuthorDate: Mon Apr 13 10:42:57 2026 +0800

    [fix][test] Fix flaky ExtensibleLoadManagerImpl client reconnection tests: 
PulsarClientException$AlreadyClosedException: Client already closed (#25509)
    
    (cherry picked from commit bf459748671ce62f6cb18f0b9e3b68cc06abf4b4)
---
 .../ExtensibleLoadManagerImplBaseTest.java         | 52 +++++++++-------------
 .../extensions/ExtensibleLoadManagerImplTest.java  | 29 +++++++++---
 ...LoadManagerImplWithAdvertisedListenersTest.java | 47 ++++++++++++-------
 3 files changed, 76 insertions(+), 52 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 4474fed0a82..55e9b8d6baf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -86,8 +85,28 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         return conf;
     }
 
-    protected ArrayList<PulsarClient> clients = new ArrayList<>();
-    private final java.util.Map<PulsarClient, LookupService> 
originalLookupServices = new java.util.HashMap<>();
+    /**
+     * Create fresh PulsarClient instances for use within a single test method.
+     * Each test creates and closes its own clients to avoid shared mutable 
state
+     * that causes "Client already closed" flakiness.
+     */
+    protected List<PulsarClient> createTestClients(int count) throws Exception 
{
+        List<PulsarClient> testClients = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            testClients.add(pulsarClient(lookupUrl.toString(), 100));
+        }
+        return testClients;
+    }
+
+    protected static void closeTestClients(List<PulsarClient> testClients) {
+        for (PulsarClient client : testClients) {
+            try {
+                client.close();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
 
     @DataProvider(name = "serviceUnitStateTableViewClassName")
     public static Object[][] serviceUnitStateTableViewClassName() {
@@ -151,13 +170,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
                 Sets.newHashSet(this.conf.getClusterName()));
         lookupService = (LookupService) 
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
-
-        for (int i = 0; i < 4; i++) {
-            PulsarClient client = pulsarClient(lookupUrl.toString(), 100);
-            clients.add(client);
-            originalLookupServices.put(client,
-                    (LookupService) FieldUtils.readDeclaredField(client, 
"lookup", true));
-        }
     }
 
     private static PulsarClient pulsarClient(String url, int intervalInMillis) 
throws PulsarClientException {
@@ -171,27 +183,15 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
     @Override
     @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (PulsarClient client : clients) {
-            futures.add(client.closeAsync());
-        }
-        futures.add(pulsar2.closeAsync());
-
         if (additionalPulsarTestContext != null) {
             additionalPulsarTestContext.close();
             additionalPulsarTestContext = null;
         }
         super.internalCleanup();
-        try {
-            FutureUtil.waitForAll(futures).join();
-        } catch (Throwable e) {
-            // skip error
-        }
         pulsar1 = pulsar2 = null;
         primaryLoadManager = secondaryLoadManager = null;
         channel1 = channel2 = null;
         lookupService = null;
-
     }
 
     @BeforeMethod(alwaysRun = true)
@@ -199,14 +199,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         admin.namespaces().unload(defaultTestNamespace);
         reset(primaryLoadManager, secondaryLoadManager);
         FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
-        // Restore original lookup services for all shared clients to prevent 
state leakage
-        // between tests when a previous test fails before resetting spied 
lookup services.
-        for (PulsarClient client : clients) {
-            LookupService original = originalLookupServices.get(client);
-            if (original != null) {
-                FieldUtils.writeDeclaredField(client, "lookup", original, 
true);
-            }
-        }
         pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
         pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 382d792a23a..94c60356b4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -500,8 +500,14 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     @Test(timeOut = 30_000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
     public void testTransferClientReconnectionWithoutLookup(TopicDomain 
topicDomain, SubscriptionType subscriptionType)
             throws Exception {
-        testTransferClientReconnectionWithoutLookup(clients, topicDomain, 
subscriptionType, defaultTestNamespace,
-                admin, lookupUrl.toString(), pulsar1, pulsar2, 
primaryLoadManager, secondaryLoadManager);
+        var testClients = createTestClients(4);
+        try {
+            testTransferClientReconnectionWithoutLookup(testClients, 
topicDomain, subscriptionType,
+                    defaultTestNamespace, admin, lookupUrl.toString(), 
pulsar1, pulsar2,
+                    primaryLoadManager, secondaryLoadManager);
+        } finally {
+            closeTestClients(testClients);
+        }
     }
 
     @Test(enabled = false)
@@ -657,8 +663,13 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
     @Test(timeOut = 30 * 1000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
     public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
                                                        SubscriptionType 
subscriptionType) throws Exception {
-        testUnloadClientReconnectionWithLookup(clients, topicDomain, 
subscriptionType, defaultTestNamespace,
-                admin, lookupUrl.toString(), pulsar1);
+        var testClients = createTestClients(1);
+        try {
+            testUnloadClientReconnectionWithLookup(testClients, topicDomain, 
subscriptionType,
+                    defaultTestNamespace, admin, lookupUrl.toString(), 
pulsar1);
+        } finally {
+            closeTestClients(testClients);
+        }
     }
 
     @Test(enabled = false)
@@ -679,6 +690,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         PulsarClient pulsarClient = null;
         try {
             pulsarClient = clients.get(0);
+            @Cleanup
             var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
 
             var consumerCount = subscriptionType == SubscriptionType.Exclusive 
? 1 : 3;
@@ -753,8 +765,13 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
     @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
     public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws 
Exception {
-        testOptimizeUnloadDisable(clients, topicDomain, defaultTestNamespace, 
admin, lookupUrl.toString(), pulsar1,
-                pulsar2);
+        var testClients = createTestClients(1);
+        try {
+            testOptimizeUnloadDisable(testClients, topicDomain, 
defaultTestNamespace, admin,
+                    lookupUrl.toString(), pulsar1, pulsar2);
+        } finally {
+            closeTestClients(testClients);
+        }
     }
 
     @Test(enabled = false)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
index 54079c71c66..ac1edb6af69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java
@@ -72,23 +72,33 @@ public class 
ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens
     @Test(timeOut = 30_000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
     public void testTransferClientReconnectionWithoutLookup(TopicDomain 
topicDomain, SubscriptionType subscriptionType)
             throws Exception {
-        
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(
-                clients,
-                topicDomain, subscriptionType,
-                defaultTestNamespace, admin,
-                brokerServiceUrl,
-                pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager);
+        var testClients = createTestClients(4);
+        try {
+            
ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup(
+                    testClients,
+                    topicDomain, subscriptionType,
+                    defaultTestNamespace, admin,
+                    brokerServiceUrl,
+                    pulsar1, pulsar2, primaryLoadManager, 
secondaryLoadManager);
+        } finally {
+            closeTestClients(testClients);
+        }
     }
 
     @Test(timeOut = 30 * 1000, dataProvider = 
"isPersistentTopicSubscriptionTypeTest")
     public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain,
                                                        SubscriptionType 
subscriptionType) throws Exception {
-        ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(
-                clients,
-                topicDomain, subscriptionType,
-                defaultTestNamespace, admin,
-                brokerServiceUrl,
-                pulsar1);
+        var testClients = createTestClients(1);
+        try {
+            
ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup(
+                    testClients,
+                    topicDomain, subscriptionType,
+                    defaultTestNamespace, admin,
+                    brokerServiceUrl,
+                    pulsar1);
+        } finally {
+            closeTestClients(testClients);
+        }
     }
 
     @DataProvider(name = "isPersistentTopicTest")
@@ -98,10 +108,15 @@ public class 
ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens
 
     @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest")
     public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws 
Exception {
-        ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(
-                clients,
-                topicDomain, defaultTestNamespace, admin,
-                brokerServiceUrl, pulsar1, pulsar2);
+        var testClients = createTestClients(1);
+        try {
+            ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable(
+                    testClients,
+                    topicDomain, defaultTestNamespace, admin,
+                    brokerServiceUrl, pulsar1, pulsar2);
+        } finally {
+            closeTestClients(testClients);
+        }
     }
 
 }

Reply via email to