merlimat opened a new pull request, #25612:
URL: https://github.com/apache/pulsar/pull/25612

   ## Summary
   
   The V5 StreamConsumer was naively subscribing to every active segment with 
the v4 Exclusive subscription type, so a second consumer on the same 
subscription collided on every segment and failed. Plug the missing piece: the 
broker has `SubscriptionCoordinator` and `ConsumerSession` infrastructure that 
assigns segments to consumers and rebalances on join/leave. The V5 client now 
participates in that protocol.
   
   ### Wire layer (pulsar-common / pulsar-client)
   - `Commands.newScalableTopicSubscribe(...)` builder for the new 
client→broker command.
   - `ClientCnx` handlers for `ScalableTopicSubscribeResponse` (one-shot, 
matched by request id) and `ScalableTopicAssignmentUpdate` (push, matched by 
consumer id), plus a `scalableConsumerSessions` registry that mirrors 
`dagWatchSessions`.
   - `PulsarClientImpl.newConsumerId()` exposed publicly so V5 can mint 
consumer ids that the broker uses as the assignment-update routing key.
   - `ScalableConsumerSession` callback interface used by `ClientCnx` to 
dispatch pushed assignments back to the V5 layer.
   
   ### V5 (pulsar-client-v5)
   - `ScalableConsumerClient`: per-consumer session that resolves the 
controller-leader broker URL via a one-shot DAG-watch lookup, opens a v4 
connection to that broker directly, sends the subscribe command, awaits the 
initial `ScalableConsumerAssignment`, and dispatches pushed updates as a sorted 
`ActiveSegment` list. Falls back to the configured service URL when the 
controller URL isn't yet advertised (leader election still in flight). Mirrors 
`DagWatchClient` in shape.
   - `StreamConsumerBuilderV5`: uses `ScalableConsumerClient` instead of 
`DagWatchClient`. Generates a default `consumerName` (full UUID) when the user 
didn't set one — it's the registration key with the controller, must be unique.
   - `ScalableStreamConsumer`: refactored to implement 
`AssignmentChangeListener`. Subscribes only to assigned segments; closes v4 
consumers for segments removed by a rebalance so the Exclusive lock is released 
for whoever the controller just gave them to.
   
   ### Tests (pulsar-broker)
   - `V5MultipleConsumersTest.testStreamConsumersSplitSegmentsAcrossConsumers`: 
two stream consumers on the same subscription must split a 4-segment topic 
between them — both receive a non-empty disjoint share, every produced message 
arrives exactly once across the consumer set.
   - `V5MultipleConsumersTest` also covers QueueConsumer Shared semantics 
(load-balanced) and CheckpointConsumer (each independently sees the full 
stream).
   - `V5ConcurrentProducersConsumersTest`: 4 producers + 3 QueueConsumers on a 
4-segment topic, sustained-throughput sanity check (no drop, no dup, every 
consumer pulls a non-trivial share).
   
   CheckpointConsumer integration with the coordinator is a separate change.
   
   ## Test plan
   - [x] `./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.client.api.v5.V5MultipleConsumersTest\"`
   - [x] `./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.client.api.v5.V5ConcurrentProducersConsumersTest\"`
   - [x] `./gradlew :pulsar-broker:test --tests 
\"org.apache.pulsar.client.api.v5.V5StreamConsumerBasicTest\"`
   
   ## Matching PR
   - [ ] No matching PR.
   
   ### area/client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to