This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 79bd60285f0ca17f8a6acb722a854c40ed8ef220
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 30 18:06:51 2026 -0700

    [fix][test] Fix flaky testLoadBalancerServiceUnitTableViewSyncer (#25427)
    
    (cherry picked from commit 0c285f4cd8ed4f43fc20f5be3a5f64196aea2efd)
---
 .../ExtensibleLoadManagerImplBaseTest.java         | 14 +++-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 91 +++++++++++++++++-----
 2 files changed, 83 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 4dedba4f995..b50081de89e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -87,6 +87,7 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
     }
 
     protected ArrayList<PulsarClient> clients = new ArrayList<>();
+    private final java.util.Map<PulsarClient, LookupService> 
originalLookupServices = new java.util.HashMap<>();
 
     @DataProvider(name = "serviceUnitStateTableViewClassName")
     public static Object[][] serviceUnitStateTableViewClassName() {
@@ -152,7 +153,10 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         lookupService = (LookupService) 
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
 
         for (int i = 0; i < 4; i++) {
-            clients.add(pulsarClient(lookupUrl.toString(), 100));
+            PulsarClient client = pulsarClient(lookupUrl.toString(), 100);
+            clients.add(client);
+            originalLookupServices.put(client,
+                    (LookupService) FieldUtils.readDeclaredField(client, 
"lookup", true));
         }
     }
 
@@ -195,6 +199,14 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
         admin.namespaces().unload(defaultTestNamespace);
         reset(primaryLoadManager, secondaryLoadManager);
         FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
+        // Restore original lookup services for all shared clients to prevent 
state leakage
+        // between tests when a previous test fails before resetting spied 
lookup services.
+        for (PulsarClient client : clients) {
+            LookupService original = originalLookupServices.get(client);
+            if (original != null) {
+                FieldUtils.writeDeclaredField(client, "lookup", original, 
true);
+            }
+        }
         pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
         pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 82fed30cb35..c99443ac137 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -88,6 +88,7 @@ import 
org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl;
@@ -771,7 +772,9 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             for (var consumer : consumers) {
                 consumer.close();
             }
-            resetLookupService(pulsarClient, lookup.getLeft());
+            if (lookup != null) {
+                resetLookupService(pulsarClient, lookup.getLeft());
+            }
         }
     }
 
@@ -1239,31 +1242,54 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                                 pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
                     }
                 }
-                // Check if the broker is available
+                // Simulate broker going offline: clean ownerships first (as 
disableBroker() does),
+                // then unregister from the broker registry.
                 var wrapper = (ExtensibleLoadManagerWrapper) 
pulsar4.getLoadManager().get();
                 var loadManager4 = spy((ExtensibleLoadManagerImpl)
                         FieldUtils.readField(wrapper, "loadManager", true));
-                loadManager4.getBrokerRegistry().unregister();
+                ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
+                        FieldUtils.readField(loadManager4, 
"serviceUnitStateChannel", true);
+                channel4.cleanOwnerships();
+                // Simulate broker going offline by removing it from the 
broker registry.
+                // Set state to Closed BEFORE deleting the ZK node to prevent 
the notification
+                // handler's session-expiry recovery from auto-re-registering 
broker4.
+                // In production, the PulsarService shuts down after 
unregister(), so the handler
+                // never fires. In tests, the service stays running, creating 
a race.
+                var registry4 = (BrokerRegistryImpl) 
loadManager4.getBrokerRegistry();
+                registry4.state.set(BrokerRegistryImpl.State.Closed);
+                String brokerZkPath = "/loadbalance/brokers/" + 
pulsar4.getBrokerId();
+                pulsar4.getLocalMetadataStore().delete(brokerZkPath, 
Optional.empty()).get();
 
                 NamespaceName slaMonitorNamespace =
                         getSLAMonitorNamespace(pulsar4.getBrokerId(), 
pulsar.getConfiguration());
                 String slaMonitorTopic = 
slaMonitorNamespace.getPersistentTopicName("test");
+                // Wait for ownership to be reassigned after broker 
unregistration
+                String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String lookupResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(lookupResult);
+                    assertNotEquals(lookupResult, pulsar4BrokerUrl);
+                });
                 String result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                assertNotNull(result);
                 log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
-                assertNotEquals(result, pulsar4.getBrokerServiceUrl());
 
                 Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
                         .topic(slaMonitorTopic).create();
                 producer.send("t1");
 
                 // Test re-register broker and check the lookup result
-                loadManager4.getBrokerRegistry().registerAsync().get();
-
+                // Reset state to Started to allow re-registration.
+                registry4.state.set(BrokerRegistryImpl.State.Started);
+                registry4.registerAsync().get();
+
+                // Wait for ownership to be reassigned back to pulsar4 after 
re-registration
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String reRegResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(reRegResult);
+                    assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
+                });
                 result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                assertNotNull(result);
                 log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
-                assertEquals(result, pulsar4.getBrokerServiceUrl());
 
                 producer.send("t2");
                 Producer<String> producer1 = 
pulsar.getClient().newProducer(Schema.STRING)
@@ -1327,10 +1353,10 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 
assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
         makeSecondaryAsLeader();
         makePrimaryAsLeader();
-        Awaitility.await()
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
-        Awaitility.await()
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
         ServiceConfiguration defaultConf = getDefaultConf();
@@ -1462,31 +1488,54 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                                 pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
                     }
                 }
-                // Check if the broker is available
+                // Simulate broker going offline: clean ownerships first (as 
disableBroker() does),
+                // then unregister from the broker registry.
                 var wrapper = (ExtensibleLoadManagerWrapper) 
pulsar4.getLoadManager().get();
                 var loadManager4 = spy((ExtensibleLoadManagerImpl)
                         FieldUtils.readField(wrapper, "loadManager", true));
-                loadManager4.getBrokerRegistry().unregister();
+                ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
+                        FieldUtils.readField(loadManager4, 
"serviceUnitStateChannel", true);
+                channel4.cleanOwnerships();
+                // Simulate broker going offline by removing it from the 
broker registry.
+                // Set state to Closed BEFORE deleting the ZK node to prevent 
the notification
+                // handler's session-expiry recovery from auto-re-registering 
broker4.
+                // In production, the PulsarService shuts down after 
unregister(), so the handler
+                // never fires. In tests, the service stays running, creating 
a race.
+                var registry4 = (BrokerRegistryImpl) 
loadManager4.getBrokerRegistry();
+                registry4.state.set(BrokerRegistryImpl.State.Closed);
+                String brokerZkPath = "/loadbalance/brokers/" + 
pulsar4.getBrokerId();
+                pulsar4.getLocalMetadataStore().delete(brokerZkPath, 
Optional.empty()).get();
 
                 NamespaceName slaMonitorNamespace =
                         getSLAMonitorNamespace(pulsar4.getBrokerId(), 
pulsar.getConfiguration());
                 String slaMonitorTopic = 
slaMonitorNamespace.getPersistentTopicName("test");
+                // Wait for ownership to be reassigned after broker 
unregistration
+                String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String lookupResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(lookupResult);
+                    assertNotEquals(lookupResult, pulsar4BrokerUrl);
+                });
                 String result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                assertNotNull(result);
                 log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
-                assertNotEquals(result, pulsar4.getBrokerServiceUrl());
 
                 Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
                         .topic(slaMonitorTopic).create();
                 producer.send("t1");
 
                 // Test re-register broker and check the lookup result
-                loadManager4.getBrokerRegistry().registerAsync().get();
-
+                // Reset state to Started to allow re-registration.
+                registry4.state.set(BrokerRegistryImpl.State.Started);
+                registry4.registerAsync().get();
+
+                // Wait for ownership to be reassigned back to pulsar4 after 
re-registration
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String reRegResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(reRegResult);
+                    assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
+                });
                 result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                assertNotNull(result);
                 log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
-                assertEquals(result, pulsar4.getBrokerServiceUrl());
 
                 producer.send("t2");
                 Producer<String> producer1 = 
pulsar.getClient().newProducer(Schema.STRING)
@@ -1516,10 +1565,10 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         Awaitility.await().untilAsserted(() ->
                 
assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
         makeSecondaryAsLeader();
-        Awaitility.await()
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
-        Awaitility.await()
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
     }

Reply via email to