lhotari opened a new pull request, #25616:
URL: https://github.com/apache/pulsar/pull/25616
Closes #25597
PIP: PIP-466
### Motivation
PIP-466 introduced six new wire commands for the V5 scalable-topics API
(`SCALABLE_TOPIC_LOOKUP`, `SCALABLE_TOPIC_UPDATE`, `SCALABLE_TOPIC_CLOSE`,
`SCALABLE_TOPIC_SUBSCRIBE`, `SCALABLE_TOPIC_SUBSCRIBE_RESPONSE`,
`SCALABLE_TOPIC_ASSIGNMENT_UPDATE`, types 70–75 in `BaseCommand`). Until now
these commands were emitted by the V5 client unconditionally — there was no
negotiation that the peer broker actually understood them, and no operator
switch to disable the server-side handlers.
This PR adds the missing capability bit and operator switch:
1. A wire capability bit so a V5 client can detect a peer that does not speak
scalable topics and fail fast with a clear error instead of writing bytes
the broker will silently drop or close on.
2. A broker-side master switch (default on) so an operator can disable
scalable topics on a particular broker; that broker then advertises the
bit as `false` so clients route around it (and rejects any straggler
scalable-topic command it receives — defense-in-depth).
The pattern mirrors the existing `supports_topic_watchers` /
`enableBrokerTopicListWatcher` flow exactly — no new infrastructure.
### Modifications
**Wire / config**
- `PulsarApi.proto`: new `optional bool supports_scalable_topics = 8` field
on
`FeatureFlags` (default `false`).
- `ServiceConfiguration`: new `scalableTopicsEnabled = true` (default on) in
`CATEGORY_POLICIES`.
**Broker (advertises + enforces)**
- `Commands.newConnected*` / `newConnectedCommand`: new trailing
`boolean supportsScalableTopics`, sets the bit on the response's
`FeatureFlags`.
- `ServerCnx`: caches `conf.isScalableTopicsEnabled()`, threads it into
`Commands.newConnected(...)`. When disabled, rejects
`handleCommandScalableTopicLookup` and
`handleCommandScalableTopicSubscribe`
with `ServerError.NotAllowedError`. The close handler is left as a silent
no-op since close is best-effort.
- `PulsarCommandSender` / `PulsarCommandSenderImpl`: interface and impl
signatures extended to match.
**Proxy + tests**
- `ProxyConnection`: propagates the broker's bit through the forwarded
`CommandConnected` (mirrors the existing `supportsTopicWatchers`
forwarding).
- `MockBrokerService` and `ClientErrorsTest`: updated to the new 3-arg
`newConnected` signature.
**Client (consumes + fails fast)**
- `ClientCnx`: caches the bit from `CommandConnected.feature_flags` into
`@Getter private boolean supportsScalableTopics`.
- `PulsarClientException.FailedFeatureCheck`: gains `SupportsScalableTopics`.
- `DagWatchClient.start` and `ScalableConsumerClient.start`: now check
`cnx.isSupportsScalableTopics()` after acquiring the connection and fail
the future with
`FeatureNotSupportedException(SupportsScalableTopics)` before writing any
wire bytes.
The default-on path is unchanged: broker default is `true` → bit is `true`
on the wire → V5 client check is a no-op. Disabling `scalableTopicsEnabled`
on a broker both advertises `false` to clients (so they fail fast) and
rejects any scalable-topic command that arrives anyway.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
This change is a wire-additive default-on feature flag; no new tests are
added in this PR. The default-on path leaves all existing V5 behavior
unchanged. Targeted tests for the disabled-broker path (client receives
`FeatureNotSupportedException(SupportsScalableTopics)`) and the
defense-in-depth path (broker rejects scalable-topic commands with
`NotAllowedError` even when bypassed) are intended as a follow-up.
### Does this pull request potentially affect one of the following parts:
- [x] The default values of configurations — adds `scalableTopicsEnabled`
defaulting to `true` (preserves current behavior).
- [x] The binary protocol — adds `FeatureFlags.supports_scalable_topics`
(optional, default `false`). Wire-additive, safe across mixed-version
clusters: old brokers will not set the bit, V5 clients will then refuse
to send scalable-topic commands; old clients never read or set the bit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]