This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4c7b101e3c5b7c38596320971df97d69f15e1dea Author: Kai Wang <[email protected]> AuthorDate: Thu Apr 23 04:47:27 2026 +0800 [improve][broker] Use full bundle name for namespace bundle destination affinity in ModularLoadManagerImpl (#25518) (cherry picked from commit db94b9534f0900a5ec4eb9fa5191296513aa0575) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 6 ++++- .../impl/ModularLoadManagerWrapper.java | 3 +-- .../impl/ModularLoadManagerImplTest.java | 30 +++++++++++++++++++--- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d79e8cd757f..9c26fc75cf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1096,6 +1096,7 @@ public abstract class NamespacesBase extends AdminResource { if (StringUtils.isBlank(destinationBroker)) { return CompletableFuture.completedFuture(null); } + return pulsar().getLoadManager().get().getAvailableBrokersAsync() .thenCompose(brokers -> { if (!brokers.contains(destinationBroker)) { @@ -1116,8 +1117,11 @@ public abstract class NamespacesBase extends AdminResource { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) { return; } + final String bundleName = pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundle(namespaceName.toString(), bundleRange) + .toString(); // For ExtensibleLoadManager, this operation will be ignored. - pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); + pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleName, destinationBroker); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index c8d81bda1bc..b5bb4d52717 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -64,8 +64,7 @@ public class ModularLoadManagerWrapper implements LoadManager { @Override public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) { - String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); - String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null); + String affinityBroker = loadManager.setNamespaceBundleAffinity(serviceUnit.toString(), null); if (!StringUtils.isBlank(affinityBroker)) { return Optional.of(buildBrokerResourceUnit(affinityBroker)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 577a8c19485..f6061204b62 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -282,14 +282,18 @@ public class ModularLoadManagerImplTest { } } - private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) { - return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace), + private NamespaceBundle makeBundle(NamespaceName nsname) { + return nsFactory.getBundle(nsname, Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND, BoundType.CLOSED)); } + private NamespaceBundle makeBundle(final String tenant, final String namespace) { + return makeBundle(NamespaceName.get(tenant, namespace)); + } + private NamespaceBundle makeBundle(final String all) { - return makeBundle(all, all, all); + return makeBundle(NamespaceName.get(all, all, all)); } private String mockBundleName(final int i) { @@ -414,6 +418,26 @@ public class ModularLoadManagerImplTest { Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload); } + @Test + public void testBrokerAffinityLookupUsesFullBundleName() throws Exception { + String affinityBroker1 = "affinity-broker-1"; + String affinityBroker2 = "affinity-broker-2"; + NamespaceBundle bundle1 = makeBundle("tenant-1", "ns-1"); + NamespaceBundle bundle2 = makeBundle("tenant-1", "ns-2"); + + LoadManager wrapper = pulsar1.getLoadManager().get(); + wrapper.setNamespaceBundleAffinity(bundle1.toString(), affinityBroker1); + wrapper.setNamespaceBundleAffinity(bundle2.toString(), affinityBroker2); + + Optional<ResourceUnit> leastLoadedBroker1 = wrapper.getLeastLoaded(bundle1); + Optional<ResourceUnit> leastLoadedBroker2 = wrapper.getLeastLoaded(bundle2); + + Assert.assertTrue(leastLoadedBroker1.isPresent()); + Assert.assertTrue(leastLoadedBroker2.isPresent()); + Assert.assertEquals(leastLoadedBroker1.get().getResourceId(), affinityBroker1); + Assert.assertEquals(leastLoadedBroker2.get().getResourceId(), affinityBroker2); + } + /** * It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker * unless all the brokers are in same state.
