lhotari opened a new pull request, #25379:
URL: https://github.com/apache/pulsar/pull/25379

   ### Motivation
   
   When using `ExtensibleLoadManagerImpl`, a restarted broker can end up with a 
namespace bundle stuck in `Assigning` state for up to **60 seconds** after it 
comes back online, blocking all topic lookups for that bundle during that 
window.
   
   #### Root cause
   
   `ServiceUnitStateChannelImpl.handleExisting()` is called during channel 
startup to replay the current table view state. It only handles `Owned` entries:
   
   ```java
   private void handleExisting(String serviceUnit, ServiceUnitStateData data) {
       ServiceUnitState state = state(data);
       if (state.equals(Owned) && isTargetBroker(data.dstBroker())) {
           pulsar.getNamespaceService()
                   
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, 
serviceUnit));
       }
       // Assigning state: silently ignored ← bug
   }
   ```
   
   If a broker restarts while a bundle is in `Assigning` state with that broker 
as the destination, `handleExisting` ignores it. The `handleEvent` path (for 
new topic messages) also won't re-deliver the event because the message was 
published before this broker subscribed to the topic stream. The result: the 
`Assigning` state is never actioned by the target broker.
   
   #### Why 60 seconds?
   
   The fallback is `monitorOwnerships()`, which runs every 
`loadBalancerServiceUnitStateMonitorIntervalInSeconds` (default: **60s**). It 
detects the stuck `Assigning` state (stale longer than 
`loadBalancerInFlightServiceUnitStateWaitingTimeInMillis`, default 30s) and 
calls `overrideOwnership()` to forcibly resolve it. In the worst case this 
takes a full 60-second monitor cycle + processing overhead.
   
   #### Observed failure
   
   This was identified by investigating a flaky failure of 
`ClusterMigrationTest.testClusterMigrationWithReplicationBacklog` in CI. After 
`broker3.restart()`, the bundle `pulsar/migrationNs/0x00000000_0xffffffff` 
entered `Assigning` state at `22:47:10` and wasn't resolved to `Owned` until 
`22:48:19` — a 69-second delay. The test's 60-second `Awaitility` timeout 
expired just before ownership resolved:
   
   ```
   22:47:10  Overriding inactiveBroker:localhost:36873 ... to 
overrideData:ServiceUnitStateData[state=Free...]
   22:47:10  [Assigning state published for 
pulsar/migrationNs/0x00000000_0xffffffff]
   22:47:10  cancel the lookup request for 
pulsar/migrationNs/0x00000000_0xffffffff when receiving Assigning
   ...69 seconds of ServiceUnitNotReadyException / replicator backoff...
   22:48:19  [Owned state finally published]
   22:48:19  Re-Sending 1 messages to server  ← replicator reconnects, too late
   ```
   
   ### Modifications
   
   In `handleExisting()`, add handling for `Assigning` states where this broker 
is the target, delegating to the existing `handleAssignEvent()` which publishes 
the `Owned` state:
   
   ```java
   } else if (state.equals(Assigning) && isTargetBroker(data.dstBroker())) {
       // If this broker is the assignment target and the Assigning event was 
published before this
       // broker's channel finished starting (e.g., after a restart), handle it 
now so ownership
       // is resolved immediately rather than waiting for the ownership monitor 
(up to 60s).
       handleAssignEvent(serviceUnit, data);
   }
   ```
   
   #### Why this is safe
   
   - **No channel state guard needed**: `pubAsync` is `tableview.put()` with no 
`channelState` check. The existing `handleExisting` for `Owned` already 
performs side effects at this stage.
   - **`stateChangeListeners` fires asynchronously**: `notifyOnCompletion` 
resolves only after `pubAsync` completes — by which time `channelState = 
Started`.
   - **No double-delivery**: `handleExisting` processes the initial snapshot; 
`handleEvent` processes messages published after the consumer subscribed. The 
same `Assigning` message is never delivered to both.
   - **Idempotent**: If `monitorOwnerships` already rescued the bundle before 
this broker's tableview initializes, `handleExisting` sees `Owned` (not 
`Assigning`) and the existing path handles it normally.
   - **Scoped by `isTargetBroker`**: Only the assigned destination broker 
responds; all others ignore it — identical to the `handleEvent` path.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change is already covered by existing tests, such as 
`ClusterMigrationTest.testClusterMigrationWithReplicationBacklog` which was the 
test that exposed this bug.
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   - [x] `doc-not-needed`
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
https://github.com/lhotari/pulsar/pull/new/fix/extensible-load-manager-assigning-state-on-restart


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to