merlimat opened a new pull request, #25564:
URL: https://github.com/apache/pulsar/pull/25564
> **Stacked on top of #25559.** This PR is opened as a draft and will be
> rebased on master once #25559 lands. Reviewers: the new code in this
> PR is the single commit titled "PIP-468: Add scalable topic protocol
> commands and connection handling"; everything else comes from #25559.
## Summary
Second PR in the PIP-468 series. Introduces the wire protocol for the
scalable-topic client/broker interaction:
- New protocol commands on `PulsarApi.proto`:
`CommandScalableTopicLookup`, `CommandScalableTopicUpdate`,
`CommandScalableTopicClose`, `CommandScalableTopicSubscribe`,
`CommandScalableTopicSubscribeResponse`,
`CommandScalableTopicAssignmentUpdate`.
- `Commands` factory methods for building each message (including the
error variants for lookup/subscribe).
- `PulsarDecoder` dispatch entries for all six new message types.
- Broker-side `DagWatchSession` — resolves per-segment broker
addresses, watches the metadata store for DAG changes, and pushes
`ScalableTopicUpdate` frames on the wire. Tied to the transport
connection — closed when the connection drops.
- `ServerCnx` handlers for `ScalableTopicLookup` and
`ScalableTopicClose`.
- Client-side `DagWatchSession` interface (callback surface for the V5
client that lands later).
- `ClientCnx` handler for `ScalableTopicUpdate` that dispatches to the
registered session.
- `ConsumerSession.sendAssignmentUpdate` wired through to the new
command sender (previously a placeholder no-op).
## Tests
Three new unit-test classes:
- `CommandsScalableTopicTest` (8 tests) — every new factory method
roundtrips through the full wire frame; uses
`BaseCommand.materialize()` so the buffer can be safely released
before assertions.
- `ConsumerSessionTest` (15 tests) — covers identity/equality,
attach/markDisconnected, grace-timer lifecycle, all three
`sendAssignmentUpdate` branches (connected, disconnected, null
command sender), and the `toProto` static helper.
- `DagWatchSessionTest` (12 tests) — session identity, idempotent
close, `start()` failure on missing topic, metadata-store listener
registration, notification filtering (wrong path / Deleted /
post-close), `pushUpdate` serializing the correct DAG to the
connection, and post-close/no-broker-map edge cases.
The heavy `NamespaceService` lookup path inside `start()` /
`buildResponse` is deferred to broker integration tests — mocking the
full namespace stack here is more noise than signal.
## Test plan
- [x] `./gradlew :pulsar-common:test --tests
"org.apache.pulsar.common.protocol.CommandsScalableTopicTest"`
— 8/8 pass.
- [x] `./gradlew :pulsar-broker:test --tests
"org.apache.pulsar.broker.service.scalable.*"`
— 61/61 pass (5 test classes, including the two existing ones
from the previous PR).
- [x] `./gradlew :pulsar-broker:checkstyleMain :pulsar-broker:checkstyleTest
:pulsar-common:checkstyleMain :pulsar-common:checkstyleTest`
— all clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]