This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3b92e856404d772244b517df848ddef0b856f034 Author: void-ptr974 <[email protected]> AuthorDate: Mon Apr 13 10:42:57 2026 +0800 [fix][test] Fix flaky ExtensibleLoadManagerImpl client reconnection tests: PulsarClientException$AlreadyClosedException: Client already closed (#25509) (cherry picked from commit bf459748671ce62f6cb18f0b9e3b68cc06abf4b4) --- .../ExtensibleLoadManagerImplBaseTest.java | 52 +++++++++------------- .../extensions/ExtensibleLoadManagerImplTest.java | 29 +++++++++--- ...LoadManagerImplWithAdvertisedListenersTest.java | 47 ++++++++++++------- 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index 4474fed0a82..55e9b8d6baf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -47,7 +47,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -86,8 +85,28 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ return conf; } - protected ArrayList<PulsarClient> clients = new ArrayList<>(); - private final java.util.Map<PulsarClient, LookupService> originalLookupServices = new java.util.HashMap<>(); + /** + * Create fresh PulsarClient instances for use within a single test method. + * Each test creates and closes its own clients to avoid shared mutable state + * that causes "Client already closed" flakiness. + */ + protected List<PulsarClient> createTestClients(int count) throws Exception { + List<PulsarClient> testClients = new ArrayList<>(); + for (int i = 0; i < count; i++) { + testClients.add(pulsarClient(lookupUrl.toString(), 100)); + } + return testClients; + } + + protected static void closeTestClients(List<PulsarClient> testClients) { + for (PulsarClient client : testClients) { + try { + client.close(); + } catch (Exception e) { + // ignore + } + } + } @DataProvider(name = "serviceUnitStateTableViewClassName") public static Object[][] serviceUnitStateTableViewClassName() { @@ -151,13 +170,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, Sets.newHashSet(this.conf.getClusterName())); lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); - - for (int i = 0; i < 4; i++) { - PulsarClient client = pulsarClient(lookupUrl.toString(), 100); - clients.add(client); - originalLookupServices.put(client, - (LookupService) FieldUtils.readDeclaredField(client, "lookup", true)); - } } private static PulsarClient pulsarClient(String url, int intervalInMillis) throws PulsarClientException { @@ -171,27 +183,15 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ @Override @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - List<CompletableFuture<Void>> futures = new ArrayList<>(); - for (PulsarClient client : clients) { - futures.add(client.closeAsync()); - } - futures.add(pulsar2.closeAsync()); - if (additionalPulsarTestContext != null) { additionalPulsarTestContext.close(); additionalPulsarTestContext = null; } super.internalCleanup(); - try { - FutureUtil.waitForAll(futures).join(); - } catch (Throwable e) { - // skip error - } pulsar1 = pulsar2 = null; primaryLoadManager = secondaryLoadManager = null; channel1 = channel2 = null; lookupService = null; - } @BeforeMethod(alwaysRun = true) @@ -199,14 +199,6 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ admin.namespaces().unload(defaultTestNamespace); reset(primaryLoadManager, secondaryLoadManager); FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); - // Restore original lookup services for all shared clients to prevent state leakage - // between tests when a previous test fails before resetting spied lookup services. - for (PulsarClient client : clients) { - LookupService original = originalLookupServices.get(client); - if (original != null) { - FieldUtils.writeDeclaredField(client, "lookup", original, true); - } - } pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 382d792a23a..94c60356b4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -500,8 +500,14 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { - testTransferClientReconnectionWithoutLookup(clients, topicDomain, subscriptionType, defaultTestNamespace, - admin, lookupUrl.toString(), pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + var testClients = createTestClients(4); + try { + testTransferClientReconnectionWithoutLookup(testClients, topicDomain, subscriptionType, + defaultTestNamespace, admin, lookupUrl.toString(), pulsar1, pulsar2, + primaryLoadManager, secondaryLoadManager); + } finally { + closeTestClients(testClients); + } } @Test(enabled = false) @@ -657,8 +663,13 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { - testUnloadClientReconnectionWithLookup(clients, topicDomain, subscriptionType, defaultTestNamespace, - admin, lookupUrl.toString(), pulsar1); + var testClients = createTestClients(1); + try { + testUnloadClientReconnectionWithLookup(testClients, topicDomain, subscriptionType, + defaultTestNamespace, admin, lookupUrl.toString(), pulsar1); + } finally { + closeTestClients(testClients); + } } @Test(enabled = false) @@ -679,6 +690,7 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase PulsarClient pulsarClient = null; try { pulsarClient = clients.get(0); + @Cleanup var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); var consumerCount = subscriptionType == SubscriptionType.Exclusive ? 1 : 3; @@ -753,8 +765,13 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest") public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception { - testOptimizeUnloadDisable(clients, topicDomain, defaultTestNamespace, admin, lookupUrl.toString(), pulsar1, - pulsar2); + var testClients = createTestClients(1); + try { + testOptimizeUnloadDisable(testClients, topicDomain, defaultTestNamespace, admin, + lookupUrl.toString(), pulsar1, pulsar2); + } finally { + closeTestClients(testClients); + } } @Test(enabled = false) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java index 54079c71c66..ac1edb6af69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithAdvertisedListenersTest.java @@ -72,23 +72,33 @@ public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens @Test(timeOut = 30_000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testTransferClientReconnectionWithoutLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { - ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup( - clients, - topicDomain, subscriptionType, - defaultTestNamespace, admin, - brokerServiceUrl, - pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + var testClients = createTestClients(4); + try { + ExtensibleLoadManagerImplTest.testTransferClientReconnectionWithoutLookup( + testClients, + topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1, pulsar2, primaryLoadManager, secondaryLoadManager); + } finally { + closeTestClients(testClients); + } } @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicSubscriptionTypeTest") public void testUnloadClientReconnectionWithLookup(TopicDomain topicDomain, SubscriptionType subscriptionType) throws Exception { - ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup( - clients, - topicDomain, subscriptionType, - defaultTestNamespace, admin, - brokerServiceUrl, - pulsar1); + var testClients = createTestClients(1); + try { + ExtensibleLoadManagerImplTest.testUnloadClientReconnectionWithLookup( + testClients, + topicDomain, subscriptionType, + defaultTestNamespace, admin, + brokerServiceUrl, + pulsar1); + } finally { + closeTestClients(testClients); + } } @DataProvider(name = "isPersistentTopicTest") @@ -98,10 +108,15 @@ public class ExtensibleLoadManagerImplWithAdvertisedListenersTest extends Extens @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest") public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception { - ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable( - clients, - topicDomain, defaultTestNamespace, admin, - brokerServiceUrl, pulsar1, pulsar2); + var testClients = createTestClients(1); + try { + ExtensibleLoadManagerImplTest.testOptimizeUnloadDisable( + testClients, + topicDomain, defaultTestNamespace, admin, + brokerServiceUrl, pulsar1, pulsar2); + } finally { + closeTestClients(testClients); + } } }
