This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 79bd60285f0ca17f8a6acb722a854c40ed8ef220 Author: Matteo Merli <[email protected]> AuthorDate: Mon Mar 30 18:06:51 2026 -0700 [fix][test] Fix flaky testLoadBalancerServiceUnitTableViewSyncer (#25427) (cherry picked from commit 0c285f4cd8ed4f43fc20f5be3a5f64196aea2efd) --- .../ExtensibleLoadManagerImplBaseTest.java | 14 +++- .../extensions/ExtensibleLoadManagerImplTest.java | 91 +++++++++++++++++----- 2 files changed, 83 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index 4dedba4f995..b50081de89e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -87,6 +87,7 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ } protected ArrayList<PulsarClient> clients = new ArrayList<>(); + private final java.util.Map<PulsarClient, LookupService> originalLookupServices = new java.util.HashMap<>(); @DataProvider(name = "serviceUnitStateTableViewClassName") public static Object[][] serviceUnitStateTableViewClassName() { @@ -152,7 +153,10 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); for (int i = 0; i < 4; i++) { - clients.add(pulsarClient(lookupUrl.toString(), 100)); + PulsarClient client = pulsarClient(lookupUrl.toString(), 100); + clients.add(client); + originalLookupServices.put(client, + (LookupService) FieldUtils.readDeclaredField(client, "lookup", true)); } } @@ -195,6 +199,14 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ admin.namespaces().unload(defaultTestNamespace); reset(primaryLoadManager, secondaryLoadManager); FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); + // Restore original lookup services for all shared clients to prevent state leakage + // between tests when a previous test fails before resetting spied lookup services. + for (PulsarClient client : clients) { + LookupService original = originalLookupServices.get(client); + if (original != null) { + FieldUtils.writeDeclaredField(client, "lookup", original, true); + } + } pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 82fed30cb35..c99443ac137 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -88,6 +88,7 @@ import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl; @@ -771,7 +772,9 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase for (var consumer : consumers) { consumer.close(); } - resetLookupService(pulsarClient, lookup.getLeft()); + if (lookup != null) { + resetLookupService(pulsarClient, lookup.getLeft()); + } } } @@ -1239,31 +1242,54 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); } } - // Check if the broker is available + // Simulate broker going offline: clean ownerships first (as disableBroker() does), + // then unregister from the broker registry. var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); var loadManager4 = spy((ExtensibleLoadManagerImpl) FieldUtils.readField(wrapper, "loadManager", true)); - loadManager4.getBrokerRegistry().unregister(); + ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel) + FieldUtils.readField(loadManager4, "serviceUnitStateChannel", true); + channel4.cleanOwnerships(); + // Simulate broker going offline by removing it from the broker registry. + // Set state to Closed BEFORE deleting the ZK node to prevent the notification + // handler's session-expiry recovery from auto-re-registering broker4. + // In production, the PulsarService shuts down after unregister(), so the handler + // never fires. In tests, the service stays running, creating a race. + var registry4 = (BrokerRegistryImpl) loadManager4.getBrokerRegistry(); + registry4.state.set(BrokerRegistryImpl.State.Closed); + String brokerZkPath = "/loadbalance/brokers/" + pulsar4.getBrokerId(); + pulsar4.getLocalMetadataStore().delete(brokerZkPath, Optional.empty()).get(); NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + // Wait for ownership to be reassigned after broker unregistration + String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String lookupResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(lookupResult); + assertNotEquals(lookupResult, pulsar4BrokerUrl); + }); String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(result); log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - assertNotEquals(result, pulsar4.getBrokerServiceUrl()); Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) .topic(slaMonitorTopic).create(); producer.send("t1"); // Test re-register broker and check the lookup result - loadManager4.getBrokerRegistry().registerAsync().get(); - + // Reset state to Started to allow re-registration. + registry4.state.set(BrokerRegistryImpl.State.Started); + registry4.registerAsync().get(); + + // Wait for ownership to be reassigned back to pulsar4 after re-registration + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String reRegResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(reRegResult); + assertEquals(reRegResult, pulsar4.getBrokerServiceUrl()); + }); result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(result); log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - assertEquals(result, pulsar4.getBrokerServiceUrl()); producer.send("t2"); Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING) @@ -1327,10 +1353,10 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); makeSecondaryAsLeader(); makePrimaryAsLeader(); - Awaitility.await() + Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); - Awaitility.await() + Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); ServiceConfiguration defaultConf = getDefaultConf(); @@ -1462,31 +1488,54 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); } } - // Check if the broker is available + // Simulate broker going offline: clean ownerships first (as disableBroker() does), + // then unregister from the broker registry. var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); var loadManager4 = spy((ExtensibleLoadManagerImpl) FieldUtils.readField(wrapper, "loadManager", true)); - loadManager4.getBrokerRegistry().unregister(); + ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel) + FieldUtils.readField(loadManager4, "serviceUnitStateChannel", true); + channel4.cleanOwnerships(); + // Simulate broker going offline by removing it from the broker registry. + // Set state to Closed BEFORE deleting the ZK node to prevent the notification + // handler's session-expiry recovery from auto-re-registering broker4. + // In production, the PulsarService shuts down after unregister(), so the handler + // never fires. In tests, the service stays running, creating a race. + var registry4 = (BrokerRegistryImpl) loadManager4.getBrokerRegistry(); + registry4.state.set(BrokerRegistryImpl.State.Closed); + String brokerZkPath = "/loadbalance/brokers/" + pulsar4.getBrokerId(); + pulsar4.getLocalMetadataStore().delete(brokerZkPath, Optional.empty()).get(); NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + // Wait for ownership to be reassigned after broker unregistration + String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String lookupResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(lookupResult); + assertNotEquals(lookupResult, pulsar4BrokerUrl); + }); String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(result); log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - assertNotEquals(result, pulsar4.getBrokerServiceUrl()); Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) .topic(slaMonitorTopic).create(); producer.send("t1"); // Test re-register broker and check the lookup result - loadManager4.getBrokerRegistry().registerAsync().get(); - + // Reset state to Started to allow re-registration. + registry4.state.set(BrokerRegistryImpl.State.Started); + registry4.registerAsync().get(); + + // Wait for ownership to be reassigned back to pulsar4 after re-registration + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String reRegResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(reRegResult); + assertEquals(reRegResult, pulsar4.getBrokerServiceUrl()); + }); result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(result); log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - assertEquals(result, pulsar4.getBrokerServiceUrl()); producer.send("t2"); Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING) @@ -1516,10 +1565,10 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase Awaitility.await().untilAsserted(() -> assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); makeSecondaryAsLeader(); - Awaitility.await() + Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); - Awaitility.await() + Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); }
