This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0c285f4cd8e [fix][test] Fix flaky
testLoadBalancerServiceUnitTableViewSyncer (#25427)
0c285f4cd8e is described below
commit 0c285f4cd8ed4f43fc20f5be3a5f64196aea2efd
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 30 18:06:51 2026 -0700
[fix][test] Fix flaky testLoadBalancerServiceUnitTableViewSyncer (#25427)
---
build.gradle.kts | 1 +
.../ExtensibleLoadManagerImplBaseTest.java | 14 +++-
.../extensions/ExtensibleLoadManagerImplTest.java | 91 +++++++++++++++++-----
3 files changed, 84 insertions(+), 22 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index b2949af6591..e14c968cc49 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -92,6 +92,7 @@ idea {
}
}
}
+
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 71ebaf39448..d292e2ebc5b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -87,6 +87,7 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
}
protected ArrayList<PulsarClient> clients = new ArrayList<>();
+ private final java.util.Map<PulsarClient, LookupService>
originalLookupServices = new java.util.HashMap<>();
@DataProvider(name = "serviceUnitStateTableViewClassName")
public static Object[][] serviceUnitStateTableViewClassName() {
@@ -152,7 +153,10 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
lookupService = (LookupService)
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
for (int i = 0; i < 4; i++) {
- clients.add(pulsarClient(lookupUrl.toString(), 100));
+ PulsarClient client = pulsarClient(lookupUrl.toString(), 100);
+ clients.add(client);
+ originalLookupServices.put(client,
+ (LookupService) FieldUtils.readDeclaredField(client,
"lookup", true));
}
}
@@ -196,6 +200,14 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
admin.namespaces().unload(defaultTestNamespace);
reset(primaryLoadManager, secondaryLoadManager);
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService,
true);
+ // Restore original lookup services for all shared clients to prevent
state leakage
+ // between tests when a previous test fails before resetting spied
lookup services.
+ for (PulsarClient client : clients) {
+ LookupService original = originalLookupServices.get(client);
+ if (original != null) {
+ FieldUtils.writeDeclaredField(client, "lookup", original,
true);
+ }
+ }
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 368bcba4a60..af071f35b01 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -88,6 +88,7 @@ import
org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl;
@@ -774,7 +775,9 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
for (var consumer : consumers) {
consumer.close();
}
- resetLookupService(pulsarClient, lookup.getLeft());
+ if (lookup != null) {
+ resetLookupService(pulsarClient, lookup.getLeft());
+ }
}
}
@@ -1243,31 +1246,54 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl());
}
}
- // Check if the broker is available
+ // Simulate broker going offline: clean ownerships first (as
disableBroker() does),
+ // then unregister from the broker registry.
var wrapper = (ExtensibleLoadManagerWrapper)
pulsar4.getLoadManager().get();
var loadManager4 = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
- loadManager4.getBrokerRegistry().unregister();
+ ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
+ FieldUtils.readField(loadManager4,
"serviceUnitStateChannel", true);
+ channel4.cleanOwnerships();
+ // Simulate broker going offline by removing it from the
broker registry.
+ // Set state to Closed BEFORE deleting the ZK node to prevent
the notification
+ // handler's session-expiry recovery from auto-re-registering
broker4.
+ // In production, the PulsarService shuts down after
unregister(), so the handler
+ // never fires. In tests, the service stays running, creating
a race.
+ var registry4 = (BrokerRegistryImpl)
loadManager4.getBrokerRegistry();
+ registry4.state.set(BrokerRegistryImpl.State.Closed);
+ String brokerZkPath = "/loadbalance/brokers/" +
pulsar4.getBrokerId();
+ pulsar4.getLocalMetadataStore().delete(brokerZkPath,
Optional.empty()).get();
NamespaceName slaMonitorNamespace =
getSLAMonitorNamespace(pulsar4.getBrokerId(),
pulsar.getConfiguration());
String slaMonitorTopic =
slaMonitorNamespace.getPersistentTopicName("test");
+ // Wait for ownership to be reassigned after broker
unregistration
+ String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String lookupResult =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ assertNotNull(lookupResult);
+ assertNotEquals(lookupResult, pulsar4BrokerUrl);
+ });
String result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic,
result);
- assertNotEquals(result, pulsar4.getBrokerServiceUrl());
Producer<String> producer =
pulsar.getClient().newProducer(Schema.STRING)
.topic(slaMonitorTopic).create();
producer.send("t1");
// Test re-register broker and check the lookup result
- loadManager4.getBrokerRegistry().registerAsync().get();
-
+ // Reset state to Started to allow re-registration.
+ registry4.state.set(BrokerRegistryImpl.State.Started);
+ registry4.registerAsync().get();
+
+ // Wait for ownership to be reassigned back to pulsar4 after
re-registration
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String reRegResult =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ assertNotNull(reRegResult);
+ assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
+ });
result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic,
result);
- assertEquals(result, pulsar4.getBrokerServiceUrl());
producer.send("t2");
Producer<String> producer1 =
pulsar.getClient().newProducer(Schema.STRING)
@@ -1331,10 +1357,10 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
makeSecondaryAsLeader();
makePrimaryAsLeader();
- Awaitility.await()
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
.isActive()));
- Awaitility.await()
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() ->
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
.isActive()));
ServiceConfiguration defaultConf = getDefaultConf();
@@ -1466,31 +1492,54 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl());
}
}
- // Check if the broker is available
+ // Simulate broker going offline: clean ownerships first (as
disableBroker() does),
+ // then unregister from the broker registry.
var wrapper = (ExtensibleLoadManagerWrapper)
pulsar4.getLoadManager().get();
var loadManager4 = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
- loadManager4.getBrokerRegistry().unregister();
+ ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
+ FieldUtils.readField(loadManager4,
"serviceUnitStateChannel", true);
+ channel4.cleanOwnerships();
+ // Simulate broker going offline by removing it from the
broker registry.
+ // Set state to Closed BEFORE deleting the ZK node to prevent
the notification
+ // handler's session-expiry recovery from auto-re-registering
broker4.
+ // In production, the PulsarService shuts down after
unregister(), so the handler
+ // never fires. In tests, the service stays running, creating
a race.
+ var registry4 = (BrokerRegistryImpl)
loadManager4.getBrokerRegistry();
+ registry4.state.set(BrokerRegistryImpl.State.Closed);
+ String brokerZkPath = "/loadbalance/brokers/" +
pulsar4.getBrokerId();
+ pulsar4.getLocalMetadataStore().delete(brokerZkPath,
Optional.empty()).get();
NamespaceName slaMonitorNamespace =
getSLAMonitorNamespace(pulsar4.getBrokerId(),
pulsar.getConfiguration());
String slaMonitorTopic =
slaMonitorNamespace.getPersistentTopicName("test");
+ // Wait for ownership to be reassigned after broker
unregistration
+ String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String lookupResult =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ assertNotNull(lookupResult);
+ assertNotEquals(lookupResult, pulsar4BrokerUrl);
+ });
String result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic,
result);
- assertNotEquals(result, pulsar4.getBrokerServiceUrl());
Producer<String> producer =
pulsar.getClient().newProducer(Schema.STRING)
.topic(slaMonitorTopic).create();
producer.send("t1");
// Test re-register broker and check the lookup result
- loadManager4.getBrokerRegistry().registerAsync().get();
-
+ // Reset state to Started to allow re-registration.
+ registry4.state.set(BrokerRegistryImpl.State.Started);
+ registry4.registerAsync().get();
+
+ // Wait for ownership to be reassigned back to pulsar4 after
re-registration
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String reRegResult =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ assertNotNull(reRegResult);
+ assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
+ });
result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(result);
log.info("{} Namespace is re-owned by {}", slaMonitorTopic,
result);
- assertEquals(result, pulsar4.getBrokerServiceUrl());
producer.send("t2");
Producer<String> producer1 =
pulsar.getClient().newProducer(Schema.STRING)
@@ -1520,10 +1569,10 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
Awaitility.await().untilAsserted(() ->
assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
makeSecondaryAsLeader();
- Awaitility.await()
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() ->
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
.isActive()));
- Awaitility.await()
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() ->
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
.isActive()));
}