lhotari commented on PR #25642:
URL: https://github.com/apache/pulsar/pull/25642#issuecomment-4359524285
**Code review (Claude Code)**
Posting three correctness concerns surfaced by a local review with Claude
Code.
### 1. [BUG] Backoff reset is ineffective if a poll is already scheduled
In `SubscriptionCoordinator` (`registerConsumer`, `onLayoutChange`,
`markSegmentsDrained`):
```java
drainBackoff.reset();
ensureDrainPollerRunning();
```
`ensureDrainPollerRunning` is a no-op when `drainPollTask` is still
scheduled and not yet cancelled/done. If the poller had backed off to the
15-minute cap during a quiet period, a fresh EARLIEST consumer joining (or a
layout change, or an observed drain) will still wait up to 15 minutes for the
next drain check — directly contradicting the comment (*"kick off drain checks
at the shortest delay"*) and the docstring on `DEFAULT_DRAIN_INITIAL_DELAY`
(*"a freshly subscribed EARLIEST consumer doesn't wait"*). Fix: cancel the
existing scheduled task before re-arming, or have these progress events
explicitly reschedule with the reset delay.
### 2. [BUG] `close()` can race with an in-flight poll and leak a scheduled
task
Also in `SubscriptionCoordinator`: `pollDrainStatus` nullifies
`drainPollTask` at the top of its `synchronized` block, then the async
`whenComplete` callback calls `ensureDrainPollerRunning()` which schedules a
new task. If `close()` runs in that window, it sees `drainPollTask == null` and
does nothing; the subsequent re-arm then schedules a task that fires after the
controller has gone away. There is no `closed` flag checked in
`ensureDrainPollerRunning` or in the `whenComplete` rearm path. Fix: add a
`closed` boolean (set in `close()`) and short-circuit
`ensureDrainPollerRunning` / the `whenComplete` rearm on it.
### 3. [BUG] Restore path defaults every recovered subscription to STREAM,
deadlocking CHECKPOINT/QUEUE on leader handoff
In `ScalableTopicController.createCoordinator(String)`:
```java
private SubscriptionCoordinator createCoordinator(String subscription) {
return createCoordinator(subscription, ScalableConsumerType.STREAM);
}
```
`restoreSubscription` calls this — meaning after any controller restart /
leader change, every restored subscription gets a coordinator with parent-drain
enforcement, regardless of the consumer type that originally created it. The
Javadoc on `registerConsumer` confirms the policy is then locked: *"The
coordinator's setting is fixed at first registration; subsequent registers with
a different type still work but won't change the ordering policy."*
For a CHECKPOINT subscription on a topic with sealed parents, the parent
will never report drained (CHECKPOINT doesn't create per-segment cursors —
explicitly noted in the PR body), so children remain blocked indefinitely after
a restart. The *"conservative default"* comment is inverted — STREAM is the
*blocking* mode, so it's the least-conservative default in this context. Fix:
persist the subscription type in metadata alongside the subscription so it can
be restored faithfully, or use a `null` checker (no enforcement) on the restore
path until a real consumer re-registers and reveals the type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]