This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4644dec02df383e1c3d335e1aed7223070ac8a7b
Author: Kai Wang <[email protected]>
AuthorDate: Thu Apr 23 04:47:27 2026 +0800

    [improve][broker] Use full bundle name for namespace bundle destination 
affinity in ModularLoadManagerImpl (#25518)
    
    (cherry picked from commit db94b9534f0900a5ec4eb9fa5191296513aa0575)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  6 ++++-
 .../impl/ModularLoadManagerWrapper.java            |  3 +--
 .../impl/ModularLoadManagerImplTest.java           | 30 +++++++++++++++++++---
 3 files changed, 33 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d79e8cd757f..9c26fc75cf3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1096,6 +1096,7 @@ public abstract class NamespacesBase extends 
AdminResource {
         if (StringUtils.isBlank(destinationBroker)) {
             return CompletableFuture.completedFuture(null);
         }
+
         return pulsar().getLoadManager().get().getAvailableBrokersAsync()
                 .thenCompose(brokers -> {
                     if (!brokers.contains(destinationBroker)) {
@@ -1116,8 +1117,11 @@ public abstract class NamespacesBase extends 
AdminResource {
                     if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
                         return;
                     }
+                    final String bundleName = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                            .getBundle(namespaceName.toString(), bundleRange)
+                            .toString();
                     // For ExtensibleLoadManager, this operation will be 
ignored.
-                    
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, 
destinationBroker);
+                    
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleName, 
destinationBroker);
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index c8d81bda1bc..b5bb4d52717 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -64,8 +64,7 @@ public class ModularLoadManagerWrapper implements LoadManager 
{
 
     @Override
     public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId 
serviceUnit) {
-        String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
-        String affinityBroker = 
loadManager.setNamespaceBundleAffinity(bundleRange, null);
+        String affinityBroker = 
loadManager.setNamespaceBundleAffinity(serviceUnit.toString(), null);
         if (!StringUtils.isBlank(affinityBroker)) {
             return Optional.of(buildBrokerResourceUnit(affinityBroker));
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 577a8c19485..f6061204b62 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -282,14 +282,18 @@ public class ModularLoadManagerImplTest {
         }
     }
 
-    private NamespaceBundle makeBundle(final String property, final String 
cluster, final String namespace) {
-        return nsFactory.getBundle(NamespaceName.get(property, cluster, 
namespace),
+    private NamespaceBundle makeBundle(NamespaceName nsname) {
+        return nsFactory.getBundle(nsname,
                 Range.range(NamespaceBundles.FULL_LOWER_BOUND, 
BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
                         BoundType.CLOSED));
     }
 
+    private NamespaceBundle makeBundle(final String tenant, final String 
namespace) {
+        return makeBundle(NamespaceName.get(tenant, namespace));
+    }
+
     private NamespaceBundle makeBundle(final String all) {
-        return makeBundle(all, all, all);
+        return makeBundle(NamespaceName.get(all, all, all));
     }
 
     private String mockBundleName(final int i) {
@@ -414,6 +418,26 @@ public class ModularLoadManagerImplTest {
         Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload);
     }
 
+    @Test
+    public void testBrokerAffinityLookupUsesFullBundleName() throws Exception {
+        String affinityBroker1 = "affinity-broker-1";
+        String affinityBroker2 = "affinity-broker-2";
+        NamespaceBundle bundle1 = makeBundle("tenant-1", "ns-1");
+        NamespaceBundle bundle2 = makeBundle("tenant-1", "ns-2");
+
+        LoadManager wrapper = pulsar1.getLoadManager().get();
+        wrapper.setNamespaceBundleAffinity(bundle1.toString(), 
affinityBroker1);
+        wrapper.setNamespaceBundleAffinity(bundle2.toString(), 
affinityBroker2);
+
+        Optional<ResourceUnit> leastLoadedBroker1 = 
wrapper.getLeastLoaded(bundle1);
+        Optional<ResourceUnit> leastLoadedBroker2 = 
wrapper.getLeastLoaded(bundle2);
+
+        Assert.assertTrue(leastLoadedBroker1.isPresent());
+        Assert.assertTrue(leastLoadedBroker2.isPresent());
+        Assert.assertEquals(leastLoadedBroker1.get().getResourceId(), 
affinityBroker1);
+        Assert.assertEquals(leastLoadedBroker2.get().getResourceId(), 
affinityBroker2);
+    }
+
     /**
      * It verifies that once broker owns max-number of topics: load-manager 
doesn't allocates new bundles to that broker
      * unless all the brokers are in same state.

Reply via email to