This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 9b9b071f8c7 [fix][broker] Fix ExtensibleLoadManagerImpl stuck
Assigning bundle state after broker restart (#25379)
9b9b071f8c7 is described below
commit 9b9b071f8c7faaab8cfc411a2c125fe1df5b4cb7
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Mar 21 15:04:21 2026 +0200
[fix][broker] Fix ExtensibleLoadManagerImpl stuck Assigning bundle state
after broker restart (#25379)
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../channel/ServiceUnitStateChannelImpl.java | 5 ++
.../channel/ServiceUnitStateChannelTest.java | 60 ++++++++++++++++++++++
2 files changed, 65 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 0e7f15e64db..1c758e0f817 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -763,6 +763,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (state.equals(Owned) && isTargetBroker(data.dstBroker())) {
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar,
serviceUnit));
+ } else if (state.equals(Assigning) &&
isTargetBroker(data.dstBroker())) {
+ // If this broker is the assignment target and the Assigning event
was published before this
+ // broker's channel finished starting (e.g., after a restart),
handle it now so ownership
+ // is resolved immediately rather than waiting for the ownership
monitor (up to 60s).
+ handleAssignEvent(serviceUnit, data);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 5baf91f2d5d..6381a2851a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -2014,6 +2014,66 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
channel2.cleanOwnerships();
}
+ @Test(priority = 24)
+ public void testHandleExistingResolvesAssigningStateOnChannelRestart()
+ throws Exception {
+ // Regression test for: handleExisting() must immediately resolve an
Assigning state
+ // targeting this broker to Owned, simulating the broker-restart
recovery scenario.
+ //
+ // When a broker restarts, its ServiceUnitStateChannel calls
handleExisting() for each
+ // entry in the table view during start(). Without the fix, Assigning
states were silently
+ // ignored, leaving bundles stuck until the ownership monitor rescued
them after
+ // inFlightStateWaitingTimeInMillis (default 30s). The fix is verified
by asserting
+ // that Owned state appears within 15s — shorter than the 30s monitor
threshold —
+ // which proves handleExisting() drove the resolution, not the
ownership monitor.
+
+ // Case 1: Assigning targeting brokerId1 with no source broker
+ // (fresh assignment after a Free override when no broker was
available)
+ String assigningBundle1 =
"public/test-existing-assigning1/0xfffffff0_0xffffffff";
+ var assigningData1 = new ServiceUnitStateData(Assigning, brokerId1,
null, 1);
+
+ // Case 2: Assigning targeting brokerId1 with a source broker
+ // (transfer interrupted mid-flight by broker restart)
+ String assigningBundle2 =
"public/test-existing-assigning2/0xfffffff0_0xffffffff";
+ var assigningData2 = new ServiceUnitStateData(Assigning, brokerId1,
brokerId2, 1);
+
+ // Pre-populate the Assigning states in the tableview while channels
are disabled.
+ // This is required for the metadata store implementation: the
conflict resolver
+ // checks that the existing versionId == (new versionId - 1), so
Owned(v=2) is
+ // only accepted when Assigning(v=1) is already stored. Without
pre-population,
+ // shouldKeepLeft(null, Owned(v=2)) returns true (conflict) and the
put is silently
+ // dropped, leaving the bundle stuck in the Init state.
+ try {
+ disableChannels();
+ overrideTableViews(assigningBundle1, assigningData1);
+ overrideTableViews(assigningBundle2, assigningData2);
+ } finally {
+ enableChannels();
+ }
+
+ var handleExistingMethod = ServiceUnitStateChannelImpl.class
+ .getDeclaredMethod("handleExisting", String.class,
ServiceUnitStateData.class);
+ handleExistingMethod.setAccessible(true);
+
+ // Simulate restart: handleExisting() is called by
ServiceUnitStateTableView.start() for
+ // each entry present in the tableview snapshot when the channel
starts up.
+ handleExistingMethod.invoke(channel1, assigningBundle1,
assigningData1);
+ handleExistingMethod.invoke(channel1, assigningBundle2,
assigningData2);
+
+ try {
+ // Both bundles must reach Owned state within 15s (<
inFlightStateWaitingTimeInMillis 30s).
+ // Without the fix, the tableview state would remain Assigning
until the monitor runs at ~30s.
+ Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(200,
TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ assertEquals(Owned,
state(getTableView(channel1).get(assigningBundle1)));
+ assertEquals(Owned,
state(getTableView(channel2).get(assigningBundle1)));
+ assertEquals(Owned,
state(getTableView(channel1).get(assigningBundle2)));
+ assertEquals(Owned,
state(getTableView(channel2).get(assigningBundle2)));
+ });
+ } finally {
+ cleanTableViews();
+ }
+ }
private static ConcurrentHashMap<String,
CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {