merlimat opened a new pull request, #25642:
URL: https://github.com/apache/pulsar/pull/25642

   ## Summary
   
   The Stream and Checkpoint consumer subscriptions are controller-driven: their
   segment assignment comes from `SubscriptionCoordinator`. Previously
   `computeAssignment` handed out only active segments, so a fresh EARLIEST 
consumer
   joining after a split couldn't read messages produced before the split — 
those
   live on the now-sealed parent and were silently orphaned. The QueueConsumer 
fix
   in #25611 already extended the queue flow to active+sealed; this PR brings
   StreamConsumer / CheckpointConsumer in line and adds a per-subscription 
parent-drain
   ordering guarantee on top.
   
   ## Behavior
   
   - Sealed segments are always included in the assignment. The client drains 
them
     via the existing per-segment v4 receive loop / Reader; an already-drained
     segment yields `TopicTerminated` immediately and the v4 consumer/Reader 
closes.
   
   - An active child of a split / merge is held back until *every* parent in the
     DAG has been drained for this subscription. Without this we'd hand a 
consumer
     the child of a just-split segment immediately, which breaks per-key 
ordering
     against unread messages still sitting in the parent. Initial active 
segments
     (no parents) are unaffected.
   
   - Parent-drain ordering only applies to **STREAM** consumers — they're the 
ones
     that need per-key ordering across a split. **CHECKPOINT** consumers track
     position client-side via `Checkpoint` and never create per-segment 
subscription
     cursors (they read via Readers), so the ordering machinery would block 
their
     children indefinitely. **QUEUE** consumers are shared and accept 
out-of-order
     delivery by design. The coordinator skips the ordering filter for both.
   
   ## Implementation
   
   - New `SegmentDrainChecker` interface — async predicate for "is segment X 
drained
     for subscription S".
   
   - `SubscriptionCoordinator` gets the checker + an exponential `Backoff` (2s
     initial, 15min max). Tracks `drainedSegmentIds` in-memory, runs a 
self-rescheduling
     poll that queries every undrained sealed segment and rebalances when state
     changes. The backoff resets on every progress event (drain detected, layout
     change, fresh consumer registration); the poller stops entirely once all
     sealed segments are drained.
   
   - `ScalableTopicController.isSegmentDrained` calls the new admin endpoint
     `GET 
/segments/{tenant}/{ns}/{topic}/{descriptor}/subscription/{sub}/backlog`,
     which the v4 admin routes to the segment topic's owner — works whether the
     controller and the segment colocate or not. 404 (topic / subscription not 
yet
     loaded) is treated as "not drained, retry on the next poll".
   
   - `ScalableConsumerType` plumbed from `ServerCnx` through 
`ScalableTopicService`
     → `ScalableTopicController` → `SubscriptionCoordinator`. The controller
     passes a real drain checker only for STREAM; for CHECKPOINT / QUEUE the
     coordinator gets a `null` checker and skips parent-drain entirely.
   
   - `ScalableTopicController.close` stops every coordinator's drain poller.
   
   ## Tests
   
   ### `V5StreamConsumerDagReplayTest` (new)
   
   - `testEarliestSubscribePostSplitReadsSealedBacklog` — fresh EARLIEST 
consumer
     reads pre-split sealed backlog AND post-split active.
   - `testEarliestSubscribeAfterTwoSplitsReadsAllBacklog` — two-deep DAG; fresh
     consumer reads every message across both sealed generations.
   - `testSealedBacklogNotRedeliveredAfterFirstConsumerDrains` — second consumer
     joining after the first drained sees nothing (cursor at end on the sealed
     segment).
   
   ### `V5CheckpointConsumerDagReplayTest` (new)
   
   Mirror of the Stream test for both unmanaged and managed (consumerGroup) 
paths,
   single-split and two-splits scenarios. Four tests total.
   
   ### 
`SubscriptionCoordinatorTest.testActiveChildrenBlockedUntilParentDrained` (new)
   
   Controllable mock checker; asserts children of segment 0 are excluded until
   segment 0 is marked drained, then included after the next poll.
   
   ### Matching PR(s) in forked repositories
   
   - area/broker
   - area/test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to