This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2accf437172 [fix][test] Reduce flakiness in
testLoadBalancerServiceUnitTableViewSyncer (#25638)
2accf437172 is described below
commit 2accf437172cef87776973335138c151bb26bd1a
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 06:09:07 2026 -0700
[fix][test] Reduce flakiness in testLoadBalancerServiceUnitTableViewSyncer
(#25638)
---
.../extensions/ExtensibleLoadManagerImplTest.java | 348 +++++++--------------
1 file changed, 113 insertions(+), 235 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 35459a25230..6b55543ba91 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1337,270 +1337,148 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
@Test(priority = 200)
public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
+ // Make pulsar1 the leader so primaryLoadManager is the syncer-running
broker.
+ makePrimaryAsLeader();
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer");
- TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();
- String topic = topicName.toString();
-
- String lookupResultBefore1 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResultBefore2 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResultBefore1, lookupResultBefore2);
+ String topic = topicAndBundle.getLeft().toString();
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(false)
.readOnly(false)
.loadTopicsInBundle(false).build();
- Optional<URL> webServiceUrlBefore1 =
- pulsar1.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrlBefore1.isPresent());
- Optional<URL> webServiceUrlBefore2 =
- pulsar2.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrlBefore2.isPresent());
- assertEquals(webServiceUrlBefore2.get().toString(),
webServiceUrlBefore1.get().toString());
-
-
- String syncerTyp =
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
- ? "SystemTopicToMetadataStoreSyncer" :
"MetadataStoreToSystemTopicSyncer";
+ String ownershipBefore =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
+ Optional<URL> webUrlBefore =
pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webUrlBefore.isPresent());
+
+ // === Phase 1: enable the syncer and verify it activates on the
leader only ===
+ // The syncer is started inside ExtensibleLoadManagerImpl.monitor()
when the
+ // dynamic config is enabled and the broker is the channel owner.
Calling
+ // monitor() directly avoids forcing a leader transition (which
serializes
+ // playLeader() behind playFollower() on the single-threaded load
manager
+ // executor and was the root cause of repeated 30s+ flakes here).
+ String syncerType =
+
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+ ? "SystemTopicToMetadataStoreSyncer" :
"MetadataStoreToSystemTopicSyncer";
pulsar.getAdminClient().brokers()
-
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer",
syncerTyp);
- // Wait for the dynamic config to propagate to both brokers before
triggering
- // leader transitions. Without this, the leader callback may see the
old config
- // and skip activating the syncer.
+
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer",
syncerType);
Awaitility.await().untilAsserted(() ->
assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
- Awaitility.await().untilAsserted(() ->
-
assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
- makeSecondaryAsLeader();
- makePrimaryAsLeader();
- // playLeader()/playFollower() run on a single-threaded
loadManagerExecutor, so
- // playLeader() on the new leader can be queued behind a still-running
playFollower()
- // from the prior demotion. Under CI load this serial chain plus the
syncer.start()
- // work (which opens both table views and runs
syncExistingItems/syncTailItems)
- // can take longer than 30s, so use a more generous timeout here.
- Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ primaryLoadManager.monitor();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() ->
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
.isActive()));
- Awaitility.await().atMost(60, TimeUnit.SECONDS)
- .untilAsserted(() ->
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
- .isActive()));
- ServiceConfiguration defaultConf = getDefaultConf();
- defaultConf.setAllowAutoTopicCreation(true);
- defaultConf.setForceDeleteNamespaceAllowed(true);
-
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
- defaultConf.setLoadBalancerSheddingEnabled(false);
-
defaultConf.setLoadManagerServiceUnitStateTableViewClassName(ServiceUnitStateTableViewImpl.class.getName());
- try (var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf)) {
- // start pulsar3 with ServiceUnitStateTableViewImpl
- @Cleanup
- var pulsar3 = additionalPulsarTestContext.getPulsarService();
-
- String lookupResult1 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult2 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult3 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult1, lookupResult2);
- assertEquals(lookupResult1, lookupResult3);
- assertEquals(lookupResult1, lookupResultBefore1);
-
- Optional<URL> webServiceUrl1 =
- pulsar1.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl1.isPresent());
-
- Optional<URL> webServiceUrl2 =
- pulsar2.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl2.isPresent());
- assertEquals(webServiceUrl2.get().toString(),
webServiceUrl1.get().toString());
-
- Optional<URL> webServiceUrl3 =
- pulsar3.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl3.isPresent());
- assertEquals(webServiceUrl3.get().toString(),
webServiceUrl1.get().toString());
-
- assertEquals(webServiceUrl3.get().toString(),
webServiceUrlBefore1.get().toString());
-
- List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2,
pulsar3);
- for (PulsarService pulsarService : pulsarServices) {
- // Test lookup heartbeat namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupHeartbeatOwner(pulsarService,
- pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl());
- }
- // Test lookup SLA namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupSLANamespaceOwner(pulsarService,
- pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl());
+
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+
+ // === Phase 2: add a 3rd broker using the OTHER table view impl ===
+ // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3
deliberately
+ // uses the other one so the test exercises cross-impl lookups
regardless of
+ // which parametrization we're running.
+ String otherClassName =
+
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+ ?
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
+ : ServiceUnitStateTableViewImpl.class.getName();
+
+ ServiceConfiguration crossImplConf = getDefaultConf();
+ crossImplConf.setAllowAutoTopicCreation(true);
+ crossImplConf.setForceDeleteNamespaceAllowed(true);
+
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
+
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
+
+ try (var crossImplCtx =
createAdditionalPulsarTestContext(crossImplConf)) {
+ var pulsar3 = crossImplCtx.getPulsarService();
+
+ // All three brokers (across both impls) must agree on topic
ownership.
+
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
+
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic),
ownershipBefore);
+ Optional<URL> webUrlPulsar3 =
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webUrlPulsar3.isPresent());
+ assertEquals(webUrlPulsar3.get().toString(),
webUrlBefore.get().toString());
+
+ // SLA monitor and heartbeat lookups must agree across impls in
every direction.
+ List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3);
+ for (PulsarService viewer : brokers) {
+ for (PulsarService owner : brokers) {
+ assertLookupHeartbeatOwner(viewer, owner.getBrokerId(),
owner.getBrokerServiceUrl());
+ assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(),
owner.getBrokerServiceUrl());
}
}
- // Start broker4 with ServiceUnitStateMetadataStoreTableViewImpl
- ServiceConfiguration conf = getDefaultConf();
- conf.setAllowAutoTopicCreation(true);
- conf.setForceDeleteNamespaceAllowed(true);
-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadManagerServiceUnitStateTableViewClassName(
-
ServiceUnitStateMetadataStoreTableViewImpl.class.getName());
- try (var additionPulsarTestContext =
createAdditionalPulsarTestContext(conf)) {
- @Cleanup
- var pulsar4 = additionPulsarTestContext.getPulsarService();
-
- Set<String> availableCandidates = Sets.newHashSet(
- pulsar1.getBrokerServiceUrl(),
- pulsar2.getBrokerServiceUrl(),
- pulsar3.getBrokerServiceUrl(),
- pulsar4.getBrokerServiceUrl());
- String lookupResult4 =
pulsar4.getAdminClient().lookups().lookupTopic(topic);
- assertTrue(availableCandidates.contains(lookupResult4));
-
- String lookupResult5 =
pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult6 =
pulsar2.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult7 =
pulsar3.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult4, lookupResult5);
- assertEquals(lookupResult4, lookupResult6);
- assertEquals(lookupResult4, lookupResult7);
- assertEquals(lookupResult4, lookupResultBefore1);
-
-
- Pair<TopicName, NamespaceBundle> topicAndBundle2 =
-
getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer2");
- String topic2 = topicAndBundle2.getLeft().toString();
-
- String lookupResult8 =
pulsar1.getAdminClient().lookups().lookupTopic(topic2);
- String lookupResult9 =
pulsar2.getAdminClient().lookups().lookupTopic(topic2);
- String lookupResult10 =
pulsar3.getAdminClient().lookups().lookupTopic(topic2);
- String lookupResult11 =
pulsar4.getAdminClient().lookups().lookupTopic(topic2);
- assertEquals(lookupResult9, lookupResult8);
- assertEquals(lookupResult10, lookupResult8);
- assertEquals(lookupResult11, lookupResult8);
-
- Set<String> availableWebUrlCandidates = Sets.newHashSet(
- pulsar1.getWebServiceAddress(),
- pulsar2.getWebServiceAddress(),
- pulsar3.getWebServiceAddress(),
- pulsar4.getWebServiceAddress());
-
- webServiceUrl1 =
- pulsar1.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl1.isPresent());
-
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
-
- webServiceUrl2 =
- pulsar2.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl2.isPresent());
- assertEquals(webServiceUrl2.get().toString(),
webServiceUrl1.get().toString());
-
- webServiceUrl3 =
- pulsar3.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl3.isPresent());
-
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
-
- var webServiceUrl4 =
- pulsar4.getNamespaceService().getWebServiceUrl(bundle,
options);
- assertTrue(webServiceUrl4.isPresent());
- assertEquals(webServiceUrl4.get().toString(),
webServiceUrl1.get().toString());
- assertEquals(webServiceUrl4.get().toString(),
webServiceUrlBefore1.get().toString());
-
- pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
- for (PulsarService pulsarService : pulsarServices) {
- // Test lookup heartbeat namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupHeartbeatOwner(pulsarService,
- pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl());
- }
- // Test lookup SLA namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupSLANamespaceOwner(pulsarService,
- pulsar.getBrokerId(),
pulsar.getBrokerServiceUrl());
- }
- }
- // Simulate broker going offline: clean ownerships first (as
disableBroker() does),
- // then unregister from the broker registry.
- var wrapper = (ExtensibleLoadManagerWrapper)
pulsar4.getLoadManager().get();
- var loadManager4 = spy((ExtensibleLoadManagerImpl)
- FieldUtils.readField(wrapper, "loadManager", true));
- ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
- FieldUtils.readField(loadManager4,
"serviceUnitStateChannel", true);
- channel4.cleanOwnerships();
- // Simulate broker going offline by removing it from the
broker registry.
- // Set state to Closed BEFORE deleting the ZK node to prevent
the notification
- // handler's session-expiry recovery from auto-re-registering
broker4.
- // In production, the PulsarService shuts down after
unregister(), so the handler
- // never fires. In tests, the service stays running, creating
a race.
- var registry4 = (BrokerRegistryImpl)
loadManager4.getBrokerRegistry();
- registry4.state.set(BrokerRegistryImpl.State.Closed);
- String brokerZkPath = "/loadbalance/brokers/" +
pulsar4.getBrokerId();
- pulsar4.getLocalMetadataStore().delete(brokerZkPath,
Optional.empty()).get();
-
- NamespaceName slaMonitorNamespace =
- getSLAMonitorNamespace(pulsar4.getBrokerId(),
pulsar.getConfiguration());
- String slaMonitorTopic =
slaMonitorNamespace.getPersistentTopicName("test");
- // Wait for ownership to be reassigned after broker
unregistration
- String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
- String lookupResult =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(lookupResult);
- assertNotEquals(lookupResult, pulsar4BrokerUrl);
- });
- String result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- log.info().attr("topic", slaMonitorTopic).attr("owner", result)
- .log("Namespace is re-owned");
-
- Producer<String> producer =
pulsar.getClient().newProducer(Schema.STRING)
- .topic(slaMonitorTopic).create();
- producer.send("t1");
-
- // Test re-register broker and check the lookup result
- // Reset state to Started to allow re-registration.
- registry4.state.set(BrokerRegistryImpl.State.Started);
- registry4.registerAsync().get();
-
- // Wait for ownership to be reassigned back to pulsar4 after
re-registration
- Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
- String reRegResult =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- assertNotNull(reRegResult);
- assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
- });
- result =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
- log.info().attr("topic", slaMonitorTopic).attr("owner", result)
- .log("Namespace is re-owned");
+ // === Phase 3: simulate the cross-impl broker going offline ===
+ // Its SLA namespace must reassign to a remaining broker, and the
ownership
+ // change must propagate through the syncer to brokers using the
other impl.
+ var wrapper3 = (ExtensibleLoadManagerWrapper)
pulsar3.getLoadManager().get();
+ var loadManager3 = (ExtensibleLoadManagerImpl)
+ FieldUtils.readField(wrapper3, "loadManager", true);
+ ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
+ FieldUtils.readField(loadManager3,
"serviceUnitStateChannel", true);
+ channel3.cleanOwnerships();
+ // Set state to Closed BEFORE deleting the ZK node to prevent the
notification
+ // handler's session-expiry recovery from auto-re-registering
broker3. In
+ // production the PulsarService shuts down after unregister(), so
the handler
+ // never fires; in tests the service stays running and creates a
race.
+ var registry3 = (BrokerRegistryImpl)
loadManager3.getBrokerRegistry();
+ registry3.state.set(BrokerRegistryImpl.State.Closed);
+ pulsar3.getLocalMetadataStore()
+ .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(),
Optional.empty()).get();
+
+ String slaMonitorTopic =
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
+ .getPersistentTopicName("test");
+ String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ String reassigned =
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+ assertNotNull(reassigned);
+ assertNotEquals(reassigned, pulsar3BrokerUrl);
+ });
- producer.send("t2");
- Producer<String> producer1 =
pulsar.getClient().newProducer(Schema.STRING)
- .topic(slaMonitorTopic).create();
- producer1.send("t3");
+ // Send a message while the topic is owned by the reassigned
broker; this must
+ // remain durable when ownership migrates back below.
+ @Cleanup
+ Producer<String> producer =
pulsar.getClient().newProducer(Schema.STRING)
+ .topic(slaMonitorTopic).create();
+ producer.send("offline");
+
+ // === Phase 4: re-register the broker and verify ownership
returns ===
+ registry3.state.set(BrokerRegistryImpl.State.Started);
+ registry3.registerAsync().get();
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
+
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
+ pulsar3.getBrokerServiceUrl()));
+
+ // Same producer reconnects to the new owner; a fresh producer
also works.
+ producer.send("after-reconnect");
+ @Cleanup
+ Producer<String> producer2 =
pulsar.getClient().newProducer(Schema.STRING)
+ .topic(slaMonitorTopic).create();
+ producer2.send("from-new-producer");
- producer.close();
- producer1.close();
- @Cleanup
- Consumer<String> consumer =
pulsar.getClient().newConsumer(Schema.STRING)
- .topic(slaMonitorTopic)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionName("test")
- .subscribe();
- // receive message t1 t2 t3
- assertEquals(consumer.receive().getValue(), "t1");
- assertEquals(consumer.receive().getValue(), "t2");
- assertEquals(consumer.receive().getValue(), "t3");
- }
+ @Cleanup
+ Consumer<String> consumer =
pulsar.getClient().newConsumer(Schema.STRING)
+ .topic(slaMonitorTopic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+ assertEquals(consumer.receive().getValue(), "offline");
+ assertEquals(consumer.receive().getValue(), "after-reconnect");
+ assertEquals(consumer.receive().getValue(), "from-new-producer");
}
+ // === Phase 5: disable the syncer and verify it deactivates ===
pulsar.getAdminClient().brokers()
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
- // Wait for config deletion to propagate before leader transition
Awaitility.await().untilAsserted(() ->
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
- Awaitility.await().untilAsserted(() ->
-
assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
- makeSecondaryAsLeader();
- Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ primaryLoadManager.monitor();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() ->
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
.isActive()));
- Awaitility.await().atMost(60, TimeUnit.SECONDS)
- .untilAsserted(() ->
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
- .isActive()));
+
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
}
private void assertLookupHeartbeatOwner(PulsarService pulsar,