This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db94b9534f0 [improve][broker] Use full bundle name for namespace 
bundle destination affinity in ModularLoadManagerImpl (#25518)
db94b9534f0 is described below

commit db94b9534f0900a5ec4eb9fa5191296513aa0575
Author: Kai Wang <[email protected]>
AuthorDate: Thu Apr 23 04:47:27 2026 +0800

    [improve][broker] Use full bundle name for namespace bundle destination 
affinity in ModularLoadManagerImpl (#25518)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java     |  6 +++++-
 .../loadbalance/impl/ModularLoadManagerWrapper.java  |  3 +--
 .../loadbalance/impl/ModularLoadManagerImplTest.java | 20 ++++++++++++++++++++
 3 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 429f7f52b0e..76ecfd05d38 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1457,6 +1457,7 @@ public abstract class NamespacesBase extends 
AdminResource {
         if (StringUtils.isBlank(destinationBroker)) {
             return CompletableFuture.completedFuture(null);
         }
+
         return pulsar().getLoadManager().get().getAvailableBrokersAsync()
                 .thenCompose(brokers -> {
                     if (!brokers.contains(destinationBroker)) {
@@ -1480,8 +1481,11 @@ public abstract class NamespacesBase extends 
AdminResource {
                     if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
                         return;
                     }
+                    final String bundleName = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                            .getBundle(namespaceName.toString(), bundleRange)
+                            .toString();
                     // For ExtensibleLoadManager, this operation will be 
ignored.
-                    
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
+                    
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleName, 
destinationBroker);
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index c8d81bda1bc..b5bb4d52717 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -64,8 +64,7 @@ public class ModularLoadManagerWrapper implements LoadManager 
{
 
     @Override
     public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId 
serviceUnit) {
-        String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
-        String affinityBroker = 
loadManager.setNamespaceBundleAffinity(bundleRange, null);
+        String affinityBroker = 
loadManager.setNamespaceBundleAffinity(serviceUnit.toString(), null);
         if (!StringUtils.isBlank(affinityBroker)) {
             return Optional.of(buildBrokerResourceUnit(affinityBroker));
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 988d3eeb7e8..c3b5a02e5ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -415,6 +415,26 @@ public class ModularLoadManagerImplTest {
         Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload);
     }
 
+    @Test
+    public void testBrokerAffinityLookupUsesFullBundleName() throws Exception {
+        String affinityBroker1 = "affinity-broker-1";
+        String affinityBroker2 = "affinity-broker-2";
+        NamespaceBundle bundle1 = makeBundle("tenant-1", "ns-1");
+        NamespaceBundle bundle2 = makeBundle("tenant-1", "ns-2");
+
+        LoadManager wrapper = pulsar1.getLoadManager().get();
+        wrapper.setNamespaceBundleAffinity(bundle1.toString(), 
affinityBroker1);
+        wrapper.setNamespaceBundleAffinity(bundle2.toString(), 
affinityBroker2);
+
+        Optional<ResourceUnit> leastLoadedBroker1 = 
wrapper.getLeastLoaded(bundle1);
+        Optional<ResourceUnit> leastLoadedBroker2 = 
wrapper.getLeastLoaded(bundle2);
+
+        Assert.assertTrue(leastLoadedBroker1.isPresent());
+        Assert.assertTrue(leastLoadedBroker2.isPresent());
+        Assert.assertEquals(leastLoadedBroker1.get().getResourceId(), 
affinityBroker1);
+        Assert.assertEquals(leastLoadedBroker2.get().getResourceId(), 
affinityBroker2);
+    }
+
     /**
      * It verifies that once broker owns max-number of topics: load-manager 
doesn't allocates new bundles to that broker
      * unless all the brokers are in same state.

Reply via email to