This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new db94b9534f0 [improve][broker] Use full bundle name for namespace
bundle destination affinity in ModularLoadManagerImpl (#25518)
db94b9534f0 is described below
commit db94b9534f0900a5ec4eb9fa5191296513aa0575
Author: Kai Wang <[email protected]>
AuthorDate: Thu Apr 23 04:47:27 2026 +0800
[improve][broker] Use full bundle name for namespace bundle destination
affinity in ModularLoadManagerImpl (#25518)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 6 +++++-
.../loadbalance/impl/ModularLoadManagerWrapper.java | 3 +--
.../loadbalance/impl/ModularLoadManagerImplTest.java | 20 ++++++++++++++++++++
3 files changed, 26 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 429f7f52b0e..76ecfd05d38 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1457,6 +1457,7 @@ public abstract class NamespacesBase extends
AdminResource {
if (StringUtils.isBlank(destinationBroker)) {
return CompletableFuture.completedFuture(null);
}
+
return pulsar().getLoadManager().get().getAvailableBrokersAsync()
.thenCompose(brokers -> {
if (!brokers.contains(destinationBroker)) {
@@ -1480,8 +1481,11 @@ public abstract class NamespacesBase extends
AdminResource {
if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return;
}
+ final String bundleName =
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespaceName.toString(), bundleRange)
+ .toString();
// For ExtensibleLoadManager, this operation will be
ignored.
-
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange,
destinationBroker);
+
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleName,
destinationBroker);
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index c8d81bda1bc..b5bb4d52717 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -64,8 +64,7 @@ public class ModularLoadManagerWrapper implements LoadManager
{
@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId
serviceUnit) {
- String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
- String affinityBroker =
loadManager.setNamespaceBundleAffinity(bundleRange, null);
+ String affinityBroker =
loadManager.setNamespaceBundleAffinity(serviceUnit.toString(), null);
if (!StringUtils.isBlank(affinityBroker)) {
return Optional.of(buildBrokerResourceUnit(affinityBroker));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 988d3eeb7e8..c3b5a02e5ac 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -415,6 +415,26 @@ public class ModularLoadManagerImplTest {
Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload);
}
+ @Test
+ public void testBrokerAffinityLookupUsesFullBundleName() throws Exception {
+ String affinityBroker1 = "affinity-broker-1";
+ String affinityBroker2 = "affinity-broker-2";
+ NamespaceBundle bundle1 = makeBundle("tenant-1", "ns-1");
+ NamespaceBundle bundle2 = makeBundle("tenant-1", "ns-2");
+
+ LoadManager wrapper = pulsar1.getLoadManager().get();
+ wrapper.setNamespaceBundleAffinity(bundle1.toString(),
affinityBroker1);
+ wrapper.setNamespaceBundleAffinity(bundle2.toString(),
affinityBroker2);
+
+ Optional<ResourceUnit> leastLoadedBroker1 =
wrapper.getLeastLoaded(bundle1);
+ Optional<ResourceUnit> leastLoadedBroker2 =
wrapper.getLeastLoaded(bundle2);
+
+ Assert.assertTrue(leastLoadedBroker1.isPresent());
+ Assert.assertTrue(leastLoadedBroker2.isPresent());
+ Assert.assertEquals(leastLoadedBroker1.get().getResourceId(),
affinityBroker1);
+ Assert.assertEquals(leastLoadedBroker2.get().getResourceId(),
affinityBroker2);
+ }
+
/**
* It verifies that once broker owns max-number of topics: load-manager
doesn't allocates new bundles to that broker
* unless all the brokers are in same state.