This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 2e289e64808 [improve][broker] PIP-380: 
Support-setting-up-specific-namespaces-to-skipping-the-load-shedding (#23549)
2e289e64808 is described below

commit 2e289e6480801a0e0e694d1b1661e3984d675b30
Author: Kai Wang <[email protected]>
AuthorDate: Tue Jun 10 14:35:04 2025 +0800

    [improve][broker] PIP-380: 
Support-setting-up-specific-namespaces-to-skipping-the-load-shedding (#23549)
    
    (cherry picked from commit 0f9ea181b084907ec8cb3d25535f7c6e3d2ffdc2)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++++
 .../extensions/ExtensibleLoadManagerImpl.java      | 28 +++++++++++++++-
 .../loadbalance/extensions/models/TopKBundles.java | 11 ++++--
 .../extensions/scheduler/TransferShedder.java      |  9 +++++
 .../RoundRobinBrokerSelectionStrategy.java         | 37 ++++++++++++++++++++
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 39 ++++++++++++++++++++--
 .../extensions/ExtensibleLoadManagerImplTest.java  | 35 +++++++++++++++++++
 .../extensions/models/TopKBundlesTest.java         | 22 ++++++++++++
 .../extensions/scheduler/TransferShedderTest.java  | 19 +++++++++++
 9 files changed, 201 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dede4543fc3..820c6b515e5 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3101,6 +3101,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "The namespaces to be excluded from load shedding"
+    )
+    private Set<String> loadBalancerSheddingExcludedNamespaces = new 
HashSet<>();
+
     @FieldContext(
             category = CATEGORY_LOAD_BALANCER,
             doc = "Time to wait before fixing any stuck in-flight service unit 
states. "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index ef29d7d9a74..af95df60174 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -84,6 +84,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactor
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
 import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
+import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.RoundRobinBrokerSelectionStrategy;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
 import org.apache.pulsar.broker.namespace.LookupOptions;
@@ -161,6 +162,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     @Getter
     private final BrokerSelectionStrategy brokerSelectionStrategy;
 
+    private final BrokerSelectionStrategy 
sheddingExcludedNamespaceSelectionStrategy;
+
     @Getter
     private final List<BrokerFilter> brokerFilterPipeline;
 
@@ -254,6 +257,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
         this.brokerFilterPipeline.add(new BrokerVersionFilter());
         this.brokerSelectionStrategy = createBrokerSelectionStrategy();
+        this.sheddingExcludedNamespaceSelectionStrategy = new 
RoundRobinBrokerSelectionStrategy();
     }
 
     public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
@@ -636,11 +640,33 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                             return Optional.empty();
                         }
                         Set<String> candidateBrokers = 
availableBrokerCandidates.keySet();
-                        return 
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
+                        return 
getBrokerSelectionStrategy(bundle).select(candidateBrokers, bundle, context);
                     });
                 });
     }
 
+    /**
+     * For shedding excluded namespaces, use RoundRobinBrokerSelector to 
assign the ownership,
+     * it can make the assignment more average because these will not 
automatically rebalance to
+     * another broker unless manually unloaded it.
+     *
+     * @param bundle the bundle to assign
+     * @return the broker selection strategy
+     */
+    private BrokerSelectionStrategy getBrokerSelectionStrategy(ServiceUnitId 
bundle) {
+
+        Set<String> sheddingExcludedNamespaces = 
conf.getLoadBalancerSheddingExcludedNamespaces();
+
+        var namespace = NamespaceBundle.getBundleNamespace(bundle.toString());
+        if (sheddingExcludedNamespaces.contains(namespace)) {
+            if (debug(conf, log)) {
+                log.info("Use round robin broker selector for {}", bundle);
+            }
+            return sheddingExcludedNamespaceSelectionStrategy;
+        }
+        return brokerSelectionStrategy;
+    }
+
     @Override
     public CompletableFuture<Boolean> 
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
         return getOwnershipAsync(topic, bundleUnit)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index 9c6e9634178..481e907d044 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
@@ -68,8 +69,10 @@ public class TopKBundles {
     public void update(Map<String, NamespaceBundleStats> bundleStats, int 
topk) {
         arr.clear();
         try {
+            var conf = pulsar.getConfiguration();
             var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
-                    
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
+                    conf.isLoadBalancerSheddingBundlesWithPoliciesEnabled();
+            Set<String> sheddingExcludedNamespaces = 
conf.getLoadBalancerSheddingExcludedNamespaces();
             for (var etr : bundleStats.entrySet()) {
                 String bundle = etr.getKey();
                 var stat = etr.getValue();
@@ -79,12 +82,16 @@ public class TopKBundles {
                     continue;
                 }
                 // TODO: do not filter system topic while shedding
-                if 
(NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle)))
 {
+                String namespace = NamespaceBundle.getBundleNamespace(bundle);
+                if (NamespaceService.isSystemServiceNamespace(namespace)) {
                     continue;
                 }
                 if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && 
hasPolicies(bundle)) {
                     continue;
                 }
+                if (sheddingExcludedNamespaces.contains(namespace)) {
+                    continue;
+                }
                 arr.add(etr);
             }
             var topKBundlesLoadData = loadData.getTopBundlesLoadData();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index b5255f2713a..18555bc18fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -493,6 +493,7 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
                 }
 
                 int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
+                Set<String> sheddingExcludedNamespaces = 
conf.getLoadBalancerSheddingExcludedNamespaces();
                 for (var e : maxBrokerTopBundlesLoadData) {
                     String bundle = e.bundleName();
                     if (channel != null && !channel.isOwner(bundle, 
maxBroker)) {
@@ -502,6 +503,14 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
                         }
                         continue;
                     }
+                    final String namespaceName = 
NamespaceBundle.getBundleNamespace(bundle);
+                    if (sheddingExcludedNamespaces.contains(namespaceName)) {
+                        if (debugMode) {
+                            log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+                                    + " Bundle namespace has been found in 
sheddingExcludedNamespaces", bundle));
+                        }
+                        continue;
+                    }
                     if (recentlyUnloadedBundles.containsKey(bundle)) {
                         if (debugMode) {
                             log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java
new file mode 100644
index 00000000000..2f356ec1f5e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
+
+import java.util.Optional;
+import java.util.Set;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Simple Round Robin Broker Selection Strategy.
+ */
+public class RoundRobinBrokerSelectionStrategy implements 
BrokerSelectionStrategy {
+    private final RoundRobinBrokerSelector selector = new 
RoundRobinBrokerSelector();
+
+    @Override
+    public Optional<String> select(Set<String> brokers, ServiceUnitId bundle, 
LoadManagerContext context) {
+        return selector.selectBroker(brokers, null, null, 
context.brokerConfiguration());
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 1f549cbb66e..a9d7ddd78e0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -152,6 +152,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // Strategy used to determine where new topics should be placed.
     private ModularLoadManagerStrategy placementStrategy;
 
+    private ModularLoadManagerStrategy 
sheddingExcludedNamespaceSelectionStrategy;
+
     // Policies used to determine which brokers are available for particular 
namespaces.
     private SimpleResourceAllocationPolicies policies;
 
@@ -252,6 +254,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;
 
         placementStrategy = ModularLoadManagerStrategy.create(conf);
+        sheddingExcludedNamespaceSelectionStrategy = new 
RoundRobinBrokerSelector();
         policies = new SimpleResourceAllocationPolicies(pulsar);
         filterPipeline.add(new BrokerLoadManagerClassFilter());
         filterPipeline.add(new BrokerVersionFilter());
@@ -641,6 +644,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         final Map<String, Long> recentlyUnloadedBundles = 
loadData.getRecentlyUnloadedBundles();
         recentlyUnloadedBundles.keySet().removeIf(e -> 
recentlyUnloadedBundles.get(e) < timeout);
 
+        Set<String> sheddingExcludedNamespaces = 
conf.getLoadBalancerSheddingExcludedNamespaces();
         final Multimap<String, String> bundlesToUnload = 
loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
 
         bundlesToUnload.asMap().forEach((broker, bundles) -> {
@@ -648,6 +652,13 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
             bundles.forEach(bundle -> {
                 final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                 final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                if (sheddingExcludedNamespaces.contains(namespaceName)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Skipping load shedding for namespace 
{}",
+                                
loadSheddingStrategy.getClass().getSimpleName(), namespaceName);
+                    }
+                    return;
+                }
                 if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, 
broker)) {
                     return;
                 }
@@ -931,8 +942,22 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                         brokerTopicLoadingPredicate);
             }
 
-            // Choose a broker among the potentially smaller filtered list, 
when possible
-            Optional<String> broker = 
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+            Optional<String> broker;
+            // For shedding excluded namespaces, use RoundRobinBrokerSelector 
to assign the ownership,
+            // it can make the assignment more average because these will not 
automatically rebalance to
+            // another broker unless manually unloaded it.
+            Set<String> sheddingExcludedNamespaces = 
conf.getLoadBalancerSheddingExcludedNamespaces();
+            String namespaceNameFromBundleName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+            if 
(sheddingExcludedNamespaces.contains(namespaceNameFromBundleName)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Use round robin broker selector for {}", 
bundle);
+                }
+                broker = sheddingExcludedNamespaceSelectionStrategy
+                        .selectBroker(brokerCandidateCache, data, loadData, 
conf);
+            } else {
+                // Choose a broker among the potentially smaller filtered 
list, when possible
+                broker = placementStrategy.selectBroker(brokerCandidateCache, 
data, loadData, conf);
+            }
             if (log.isDebugEnabled()) {
                 log.debug("Selected broker {} from candidate brokers {}", 
broker, brokerCandidateCache);
             }
@@ -1139,7 +1164,15 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
      */
     private int selectTopKBundle() {
         bundleArr.clear();
-        bundleArr.addAll(loadData.getBundleData().entrySet());
+        Set<String> sheddingExcludedNamespaces = 
conf.getLoadBalancerSheddingExcludedNamespaces();
+        for (Map.Entry<String, BundleData> entry : 
loadData.getBundleData().entrySet()) {
+            String bundle = entry.getKey();
+            String namespace = NamespaceBundle.getBundleNamespace(bundle);
+            if (sheddingExcludedNamespaces.contains(namespace)) {
+                continue;
+            }
+            bundleArr.add(entry);
+        }
 
         int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration()
                 .getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 97921488002..65d017499fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -206,6 +206,41 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         assertEquals(webServiceUrl.get().toString(), 
brokerLookupData.get().getWebServiceUrl());
     }
 
+    // Test that the load manager will use round-robin assignment
+    // if the namespace is in loadBalancerSheddingExcludedNamespaces.
+    @Test
+    public void testSelectBrokerForSheddingExcludedNamespaces() throws 
Exception {
+        
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of(defaultTestNamespace));
+        try {
+            Pair<TopicName, NamespaceBundle> topicAndBundle =
+                    getBundleIsNotOwnByChangeEventTopic("test-topic" + 
UUID.randomUUID());
+            NamespaceBundle bundle1 = topicAndBundle.getRight();
+            Optional<BrokerLookupData> brokerLookupData1 = 
primaryLoadManager.assign(Optional.empty(), bundle1,
+                    LookupOptions.builder().build()).get();
+            assertTrue(brokerLookupData1.isPresent());
+            log.info("Assign the bundle1 {} to {}", bundle1, 
brokerLookupData1);
+
+            String webServiceUrl1 = brokerLookupData1.get().getWebServiceUrl();
+
+            Pair<TopicName, NamespaceBundle> topicAndBundle2 =
+                    getBundleIsNotOwnByChangeEventTopic("test-topic-" + 
UUID.randomUUID());
+
+            while 
(topicAndBundle2.getRight().toString().equals(topicAndBundle.getRight().toString())
+                    || 
primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
topicAndBundle2.getRight()).get()) {
+                topicAndBundle2 = 
getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID());
+            }
+            NamespaceBundle bundle2 = topicAndBundle2.getRight();
+            Optional<BrokerLookupData> brokerLookupData2 = 
primaryLoadManager.assign(Optional.empty(), bundle2,
+                    LookupOptions.builder().build()).get();
+            assertTrue(brokerLookupData2.isPresent());
+            log.info("Assign the bundle2 {} to {}", bundle2, 
brokerLookupData2);
+            String webServiceUrl2 = brokerLookupData2.get().getWebServiceUrl();
+            assertNotEquals(webServiceUrl1, webServiceUrl2);
+        } finally {
+            
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of());
+        }
+    }
+
     @Test
     public void testLookupOptions() throws Exception {
         Pair<TopicName, NamespaceBundle> topicAndBundle =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
index 0f6ae9b2629..2be3108c638 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
@@ -136,6 +137,27 @@ public class TopKBundlesTest {
         assertEquals(top0.bundleName(), bundle1);
     }
 
+    @Test
+    public void testSheddingExcludedNamespaces() {
+        Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
+        var topKBundles = new TopKBundles(pulsar);
+        
pulsar.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of("my-tenant/my-namespace2"));
+        NamespaceBundleStats stats1 = new NamespaceBundleStats();
+        stats1.msgRateIn = 500;
+        bundleStats.put("my-tenant/my-namespace2/0x00000000_0x0FFFFFFF", 
stats1);
+
+        NamespaceBundleStats stats2 = new NamespaceBundleStats();
+        stats2.msgRateIn = 10000;
+        stats2.msgThroughputOut = 10;
+        bundleStats.put(bundle1, stats2);
+
+        topKBundles.update(bundleStats, 2);
+
+        assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 
1);
+        var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0);
+        assertEquals(top0.bundleName(), bundle1);
+    }
+
     @Test
     public void testZeroMsgThroughputBundleStats() {
         Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 716e1f2a2a2..a144d84fd7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -621,6 +621,25 @@ public class TransferShedderTest {
         assertEquals(counter.getLoadStd(), setupLoadStd, delta);
     }
 
+    @Test
+    public void testSheddingExcludedNamespaces() {
+        UnloadCounter counter = new UnloadCounter();
+        TransferShedder transferShedder = new TransferShedder(counter);
+        var ctx = setupContext();
+        ctx.brokerConfiguration().setLoadBalancerSheddingExcludedNamespaces(
+                Set.of("my-tenant/my-namespaceE", "my-tenant/my-namespaceD"));
+
+        var res = transferShedder.findBundlesForUnloading(ctx, new 
HashMap<>(), Map.of());
+        var expected = new HashSet<UnloadDecision>();
+        expected.add(new UnloadDecision(new Unload("broker3:8080",
+                "my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
+                Optional.of("broker1:8080")),
+                Success, Overloaded));
+        assertEquals(res, expected);
+        assertEquals(counter.getLoadAvg(), setupLoadAvg);
+        assertEquals(counter.getLoadStd(), setupLoadStd);
+    }
+
     @Test
     public void testGetAvailableBrokersFailed() {
         UnloadCounter counter = new UnloadCounter();

Reply via email to