merlimat opened a new pull request, #25612: URL: https://github.com/apache/pulsar/pull/25612
## Summary The V5 StreamConsumer was naively subscribing to every active segment with the v4 Exclusive subscription type, so a second consumer on the same subscription collided on every segment and failed. Plug the missing piece: the broker has `SubscriptionCoordinator` and `ConsumerSession` infrastructure that assigns segments to consumers and rebalances on join/leave. The V5 client now participates in that protocol. ### Wire layer (pulsar-common / pulsar-client) - `Commands.newScalableTopicSubscribe(...)` builder for the new client→broker command. - `ClientCnx` handlers for `ScalableTopicSubscribeResponse` (one-shot, matched by request id) and `ScalableTopicAssignmentUpdate` (push, matched by consumer id), plus a `scalableConsumerSessions` registry that mirrors `dagWatchSessions`. - `PulsarClientImpl.newConsumerId()` exposed publicly so V5 can mint consumer ids that the broker uses as the assignment-update routing key. - `ScalableConsumerSession` callback interface used by `ClientCnx` to dispatch pushed assignments back to the V5 layer. ### V5 (pulsar-client-v5) - `ScalableConsumerClient`: per-consumer session that resolves the controller-leader broker URL via a one-shot DAG-watch lookup, opens a v4 connection to that broker directly, sends the subscribe command, awaits the initial `ScalableConsumerAssignment`, and dispatches pushed updates as a sorted `ActiveSegment` list. Falls back to the configured service URL when the controller URL isn't yet advertised (leader election still in flight). Mirrors `DagWatchClient` in shape. - `StreamConsumerBuilderV5`: uses `ScalableConsumerClient` instead of `DagWatchClient`. Generates a default `consumerName` (full UUID) when the user didn't set one — it's the registration key with the controller, must be unique. - `ScalableStreamConsumer`: refactored to implement `AssignmentChangeListener`. Subscribes only to assigned segments; closes v4 consumers for segments removed by a rebalance so the Exclusive lock is released for whoever the controller just gave them to. ### Tests (pulsar-broker) - `V5MultipleConsumersTest.testStreamConsumersSplitSegmentsAcrossConsumers`: two stream consumers on the same subscription must split a 4-segment topic between them — both receive a non-empty disjoint share, every produced message arrives exactly once across the consumer set. - `V5MultipleConsumersTest` also covers QueueConsumer Shared semantics (load-balanced) and CheckpointConsumer (each independently sees the full stream). - `V5ConcurrentProducersConsumersTest`: 4 producers + 3 QueueConsumers on a 4-segment topic, sustained-throughput sanity check (no drop, no dup, every consumer pulls a non-trivial share). CheckpointConsumer integration with the coordinator is a separate change. ## Test plan - [x] `./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.client.api.v5.V5MultipleConsumersTest\"` - [x] `./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.client.api.v5.V5ConcurrentProducersConsumersTest\"` - [x] `./gradlew :pulsar-broker:test --tests \"org.apache.pulsar.client.api.v5.V5StreamConsumerBasicTest\"` ## Matching PR - [ ] No matching PR. ### area/client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
