This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 9b9b071f8c7 [fix][broker] Fix ExtensibleLoadManagerImpl stuck 
Assigning bundle state after broker restart (#25379)
9b9b071f8c7 is described below

commit 9b9b071f8c7faaab8cfc411a2c125fe1df5b4cb7
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Mar 21 15:04:21 2026 +0200

    [fix][broker] Fix ExtensibleLoadManagerImpl stuck Assigning bundle state 
after broker restart (#25379)
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../channel/ServiceUnitStateChannelImpl.java       |  5 ++
 .../channel/ServiceUnitStateChannelTest.java       | 60 ++++++++++++++++++++++
 2 files changed, 65 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 0e7f15e64db..1c758e0f817 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -763,6 +763,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         if (state.equals(Owned) && isTargetBroker(data.dstBroker())) {
             pulsar.getNamespaceService()
                     
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, 
serviceUnit));
+        } else if (state.equals(Assigning) && 
isTargetBroker(data.dstBroker())) {
+            // If this broker is the assignment target and the Assigning event 
was published before this
+            // broker's channel finished starting (e.g., after a restart), 
handle it now so ownership
+            // is resolved immediately rather than waiting for the ownership 
monitor (up to 60s).
+            handleAssignEvent(serviceUnit, data);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 5baf91f2d5d..6381a2851a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -2014,6 +2014,66 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         channel2.cleanOwnerships();
     }
 
+    @Test(priority = 24)
+    public void testHandleExistingResolvesAssigningStateOnChannelRestart()
+            throws Exception {
+        // Regression test for: handleExisting() must immediately resolve an 
Assigning state
+        // targeting this broker to Owned, simulating the broker-restart 
recovery scenario.
+        //
+        // When a broker restarts, its ServiceUnitStateChannel calls 
handleExisting() for each
+        // entry in the table view during start(). Without the fix, Assigning 
states were silently
+        // ignored, leaving bundles stuck until the ownership monitor rescued 
them after
+        // inFlightStateWaitingTimeInMillis (default 30s). The fix is verified 
by asserting
+        // that Owned state appears within 15s — shorter than the 30s monitor 
threshold —
+        // which proves handleExisting() drove the resolution, not the 
ownership monitor.
+
+        // Case 1: Assigning targeting brokerId1 with no source broker
+        //         (fresh assignment after a Free override when no broker was 
available)
+        String assigningBundle1 = 
"public/test-existing-assigning1/0xfffffff0_0xffffffff";
+        var assigningData1 = new ServiceUnitStateData(Assigning, brokerId1, 
null, 1);
+
+        // Case 2: Assigning targeting brokerId1 with a source broker
+        //         (transfer interrupted mid-flight by broker restart)
+        String assigningBundle2 = 
"public/test-existing-assigning2/0xfffffff0_0xffffffff";
+        var assigningData2 = new ServiceUnitStateData(Assigning, brokerId1, 
brokerId2, 1);
+
+        // Pre-populate the Assigning states in the tableview while channels 
are disabled.
+        // This is required for the metadata store implementation: the 
conflict resolver
+        // checks that the existing versionId == (new versionId - 1), so 
Owned(v=2) is
+        // only accepted when Assigning(v=1) is already stored. Without 
pre-population,
+        // shouldKeepLeft(null, Owned(v=2)) returns true (conflict) and the 
put is silently
+        // dropped, leaving the bundle stuck in the Init state.
+        try {
+            disableChannels();
+            overrideTableViews(assigningBundle1, assigningData1);
+            overrideTableViews(assigningBundle2, assigningData2);
+        } finally {
+            enableChannels();
+        }
+
+        var handleExistingMethod = ServiceUnitStateChannelImpl.class
+                .getDeclaredMethod("handleExisting", String.class, 
ServiceUnitStateData.class);
+        handleExistingMethod.setAccessible(true);
+
+        // Simulate restart: handleExisting() is called by 
ServiceUnitStateTableView.start() for
+        // each entry present in the tableview snapshot when the channel 
starts up.
+        handleExistingMethod.invoke(channel1, assigningBundle1, 
assigningData1);
+        handleExistingMethod.invoke(channel1, assigningBundle2, 
assigningData2);
+
+        try {
+            // Both bundles must reach Owned state within 15s (< 
inFlightStateWaitingTimeInMillis 30s).
+            // Without the fix, the tableview state would remain Assigning 
until the monitor runs at ~30s.
+            Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(200, 
TimeUnit.MILLISECONDS)
+                    .untilAsserted(() -> {
+                        assertEquals(Owned, 
state(getTableView(channel1).get(assigningBundle1)));
+                        assertEquals(Owned, 
state(getTableView(channel2).get(assigningBundle1)));
+                        assertEquals(Owned, 
state(getTableView(channel1).get(assigningBundle2)));
+                        assertEquals(Owned, 
state(getTableView(channel2).get(assigningBundle2)));
+                    });
+        } finally {
+            cleanTableViews();
+        }
+    }
 
     private static ConcurrentHashMap<String, 
CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {

Reply via email to