merlimat opened a new pull request, #25642:
URL: https://github.com/apache/pulsar/pull/25642
## Summary
The Stream and Checkpoint consumer subscriptions are controller-driven: their
segment assignment comes from `SubscriptionCoordinator`. Previously
`computeAssignment` handed out only active segments, so a fresh EARLIEST
consumer
joining after a split couldn't read messages produced before the split —
those
live on the now-sealed parent and were silently orphaned. The QueueConsumer
fix
in #25611 already extended the queue flow to active+sealed; this PR brings
StreamConsumer / CheckpointConsumer in line and adds a per-subscription
parent-drain
ordering guarantee on top.
## Behavior
- Sealed segments are always included in the assignment. The client drains
them
via the existing per-segment v4 receive loop / Reader; an already-drained
segment yields `TopicTerminated` immediately and the v4 consumer/Reader
closes.
- An active child of a split / merge is held back until *every* parent in the
DAG has been drained for this subscription. Without this we'd hand a
consumer
the child of a just-split segment immediately, which breaks per-key
ordering
against unread messages still sitting in the parent. Initial active
segments
(no parents) are unaffected.
- Parent-drain ordering only applies to **STREAM** consumers — they're the
ones
that need per-key ordering across a split. **CHECKPOINT** consumers track
position client-side via `Checkpoint` and never create per-segment
subscription
cursors (they read via Readers), so the ordering machinery would block
their
children indefinitely. **QUEUE** consumers are shared and accept
out-of-order
delivery by design. The coordinator skips the ordering filter for both.
## Implementation
- New `SegmentDrainChecker` interface — async predicate for "is segment X
drained
for subscription S".
- `SubscriptionCoordinator` gets the checker + an exponential `Backoff` (2s
initial, 15min max). Tracks `drainedSegmentIds` in-memory, runs a
self-rescheduling
poll that queries every undrained sealed segment and rebalances when state
changes. The backoff resets on every progress event (drain detected, layout
change, fresh consumer registration); the poller stops entirely once all
sealed segments are drained.
- `ScalableTopicController.isSegmentDrained` calls the new admin endpoint
`GET
/segments/{tenant}/{ns}/{topic}/{descriptor}/subscription/{sub}/backlog`,
which the v4 admin routes to the segment topic's owner — works whether the
controller and the segment colocate or not. 404 (topic / subscription not
yet
loaded) is treated as "not drained, retry on the next poll".
- `ScalableConsumerType` plumbed from `ServerCnx` through
`ScalableTopicService`
→ `ScalableTopicController` → `SubscriptionCoordinator`. The controller
passes a real drain checker only for STREAM; for CHECKPOINT / QUEUE the
coordinator gets a `null` checker and skips parent-drain entirely.
- `ScalableTopicController.close` stops every coordinator's drain poller.
## Tests
### `V5StreamConsumerDagReplayTest` (new)
- `testEarliestSubscribePostSplitReadsSealedBacklog` — fresh EARLIEST
consumer
reads pre-split sealed backlog AND post-split active.
- `testEarliestSubscribeAfterTwoSplitsReadsAllBacklog` — two-deep DAG; fresh
consumer reads every message across both sealed generations.
- `testSealedBacklogNotRedeliveredAfterFirstConsumerDrains` — second consumer
joining after the first drained sees nothing (cursor at end on the sealed
segment).
### `V5CheckpointConsumerDagReplayTest` (new)
Mirror of the Stream test for both unmanaged and managed (consumerGroup)
paths,
single-split and two-splits scenarios. Four tests total.
###
`SubscriptionCoordinatorTest.testActiveChildrenBlockedUntilParentDrained` (new)
Controllable mock checker; asserts children of segment 0 are excluded until
segment 0 is marked drained, then included after the next poll.
### Matching PR(s) in forked repositories
- area/broker
- area/test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]