merlimat opened a new pull request, #25622:
URL: https://github.com/apache/pulsar/pull/25622

   ## Summary
   
   Add `CheckpointConsumerBuilder.consumerGroup(String)`. When set, consumers 
in the same group share the topic's segments via the broker's 
`SubscriptionCoordinator` (same wire path the StreamConsumer already uses); 
when unset, the consumer keeps its original unmanaged behavior of reading every 
active segment independently.
   
   Unlike `StreamConsumer`, joining a group does **not** cause the broker to 
persist a cursor — each `CheckpointConsumer` still resumes from the 
`Checkpoint` the application provides at create time. The group only shapes 
which segments each member reads. Once every member of a group goes away and 
the controller's grace timer fires, the persisted registration is deleted; the 
broker holds no leftover state for the group.
   
   ### Implementation
   - `ScalableCheckpointConsumer` refactored to take a generic segment source — 
either a `DagWatchClient` (unmanaged, active + sealed) or a 
`ScalableConsumerClient` (managed, controller-driven). The unmanaged path now 
also subscribes to sealed segments so a checkpoint taken before a split or 
merge still resumes correctly: the reader on each sealed parent picks up from 
the saved position and drains its remainder.
   - Read loop stops cleanly on `AlreadyClosed` (segment removed from 
assignment) and on `TopicTerminated` (sealed segment fully drained), closing 
and dropping the reader; transient failures hop to the v4 client's executor so 
the stack doesn't grow unboundedly.
   - For checkpoints with no position for a segment (e.g. the child of a 
post-checkpoint split or merge), `resolveStartPosition` now defaults to 
EARLIEST instead of LATEST — every message in such a segment is newer than the 
checkpoint by construction.
   - `V5RandomIds` extracted as a small util shared by 
`CheckpointConsumerBuilderV5` and `StreamConsumerBuilderV5` for generating 
short alphanumeric default consumer names.
   
   ### Tests (`V5CheckpointConsumerGroupTest`, 7 scenarios)
   - Two consumers in a group split segments disjointly.
   - Scaling from one to two consumers.
   - Resume from a client-provided checkpoint (single segment, per-message 
key/value assertions).
   - After every group member's connection drops past the grace period, the 
broker stats report no consumers (no leftover metadata).
   - `receiveMulti` returns up to N messages within a timeout; every produced 
message is surfaced exactly once.
   - Resume across a split: a checkpoint taken on the original single segment, 
after split, replays the parent's tail and every child message.
   - Resume across a merge: a checkpoint taken on two parents, after merge, 
replays both parents' tails and every merged-child message.
   
   ## Test plan
   - [x] `./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.client.api.v5.V5CheckpointConsumerGroupTest\"` — all 7 pass.
   
   ## Matching PR
   - [ ] No matching PR.
   
   ### area/client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to