lhotari commented on PR #25642:
URL: https://github.com/apache/pulsar/pull/25642#issuecomment-4359524285

   **Code review (Claude Code)**
   
   Posting three correctness concerns surfaced by a local review with Claude 
Code.
   
   ### 1. [BUG] Backoff reset is ineffective if a poll is already scheduled
   
   In `SubscriptionCoordinator` (`registerConsumer`, `onLayoutChange`, 
`markSegmentsDrained`):
   
   ```java
   drainBackoff.reset();
   ensureDrainPollerRunning();
   ```
   
   `ensureDrainPollerRunning` is a no-op when `drainPollTask` is still 
scheduled and not yet cancelled/done. If the poller had backed off to the 
15-minute cap during a quiet period, a fresh EARLIEST consumer joining (or a 
layout change, or an observed drain) will still wait up to 15 minutes for the 
next drain check — directly contradicting the comment (*"kick off drain checks 
at the shortest delay"*) and the docstring on `DEFAULT_DRAIN_INITIAL_DELAY` 
(*"a freshly subscribed EARLIEST consumer doesn't wait"*). Fix: cancel the 
existing scheduled task before re-arming, or have these progress events 
explicitly reschedule with the reset delay.
   
   ### 2. [BUG] `close()` can race with an in-flight poll and leak a scheduled 
task
   
   Also in `SubscriptionCoordinator`: `pollDrainStatus` nullifies 
`drainPollTask` at the top of its `synchronized` block, then the async 
`whenComplete` callback calls `ensureDrainPollerRunning()` which schedules a 
new task. If `close()` runs in that window, it sees `drainPollTask == null` and 
does nothing; the subsequent re-arm then schedules a task that fires after the 
controller has gone away. There is no `closed` flag checked in 
`ensureDrainPollerRunning` or in the `whenComplete` rearm path. Fix: add a 
`closed` boolean (set in `close()`) and short-circuit 
`ensureDrainPollerRunning` / the `whenComplete` rearm on it.
   
   ### 3. [BUG] Restore path defaults every recovered subscription to STREAM, 
deadlocking CHECKPOINT/QUEUE on leader handoff
   
   In `ScalableTopicController.createCoordinator(String)`:
   
   ```java
   private SubscriptionCoordinator createCoordinator(String subscription) {
       return createCoordinator(subscription, ScalableConsumerType.STREAM);
   }
   ```
   
   `restoreSubscription` calls this — meaning after any controller restart / 
leader change, every restored subscription gets a coordinator with parent-drain 
enforcement, regardless of the consumer type that originally created it. The 
Javadoc on `registerConsumer` confirms the policy is then locked: *"The 
coordinator's setting is fixed at first registration; subsequent registers with 
a different type still work but won't change the ordering policy."*
   
   For a CHECKPOINT subscription on a topic with sealed parents, the parent 
will never report drained (CHECKPOINT doesn't create per-segment cursors — 
explicitly noted in the PR body), so children remain blocked indefinitely after 
a restart. The *"conservative default"* comment is inverted — STREAM is the 
*blocking* mode, so it's the least-conservative default in this context. Fix: 
persist the subscription type in metadata alongside the subscription so it can 
be restored faithfully, or use a `null` checker (no enforcement) on the restore 
path until a real consumer re-registers and reveals the type.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to