merlimat opened a new pull request, #25622: URL: https://github.com/apache/pulsar/pull/25622
## Summary Add `CheckpointConsumerBuilder.consumerGroup(String)`. When set, consumers in the same group share the topic's segments via the broker's `SubscriptionCoordinator` (same wire path the StreamConsumer already uses); when unset, the consumer keeps its original unmanaged behavior of reading every active segment independently. Unlike `StreamConsumer`, joining a group does **not** cause the broker to persist a cursor — each `CheckpointConsumer` still resumes from the `Checkpoint` the application provides at create time. The group only shapes which segments each member reads. Once every member of a group goes away and the controller's grace timer fires, the persisted registration is deleted; the broker holds no leftover state for the group. ### Implementation - `ScalableCheckpointConsumer` refactored to take a generic segment source — either a `DagWatchClient` (unmanaged, active + sealed) or a `ScalableConsumerClient` (managed, controller-driven). The unmanaged path now also subscribes to sealed segments so a checkpoint taken before a split or merge still resumes correctly: the reader on each sealed parent picks up from the saved position and drains its remainder. - Read loop stops cleanly on `AlreadyClosed` (segment removed from assignment) and on `TopicTerminated` (sealed segment fully drained), closing and dropping the reader; transient failures hop to the v4 client's executor so the stack doesn't grow unboundedly. - For checkpoints with no position for a segment (e.g. the child of a post-checkpoint split or merge), `resolveStartPosition` now defaults to EARLIEST instead of LATEST — every message in such a segment is newer than the checkpoint by construction. - `V5RandomIds` extracted as a small util shared by `CheckpointConsumerBuilderV5` and `StreamConsumerBuilderV5` for generating short alphanumeric default consumer names. ### Tests (`V5CheckpointConsumerGroupTest`, 7 scenarios) - Two consumers in a group split segments disjointly. - Scaling from one to two consumers. - Resume from a client-provided checkpoint (single segment, per-message key/value assertions). - After every group member's connection drops past the grace period, the broker stats report no consumers (no leftover metadata). - `receiveMulti` returns up to N messages within a timeout; every produced message is surfaced exactly once. - Resume across a split: a checkpoint taken on the original single segment, after split, replays the parent's tail and every child message. - Resume across a merge: a checkpoint taken on two parents, after merge, replays both parents' tails and every merged-child message. ## Test plan - [x] `./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.client.api.v5.V5CheckpointConsumerGroupTest\"` — all 7 pass. ## Matching PR - [ ] No matching PR. ### area/client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
