This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 88fcfa6b425 [feat][broker] PIP-466: Add supports_scalable_topics
feature flag and broker config (#25616)
88fcfa6b425 is described below
commit 88fcfa6b425c47f830cc690a70ef6cfe31ec4d33
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 29 19:03:01 2026 +0300
[feat][broker] PIP-466: Add supports_scalable_topics feature flag and
broker config (#25616)
---
.../org/apache/pulsar/broker/ServiceConfiguration.java | 9 +++++++++
.../pulsar/broker/service/PulsarCommandSender.java | 3 ++-
.../pulsar/broker/service/PulsarCommandSenderImpl.java | 5 +++--
.../org/apache/pulsar/broker/service/ServerCnx.java | 17 ++++++++++++++++-
.../org/apache/pulsar/client/api/ClientErrorsTest.java | 4 ++--
.../org/apache/pulsar/client/api/MockBrokerService.java | 2 +-
.../apache/pulsar/client/api/PulsarClientException.java | 3 ++-
.../apache/pulsar/client/impl/v5/DagWatchClient.java | 7 +++++++
.../pulsar/client/impl/v5/ScalableConsumerClient.java | 7 +++++++
.../java/org/apache/pulsar/client/impl/ClientCnx.java | 4 ++++
.../org/apache/pulsar/common/protocol/Commands.java | 15 ++++++++++-----
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
.../org/apache/pulsar/proxy/server/ProxyConnection.java | 5 +++--
13 files changed, 67 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b03af49c56e..d62cb1085ce 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1289,6 +1289,15 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean enableBrokerTopicListWatcher = true;
+ @FieldContext(
+ dynamic = false,
+ category = CATEGORY_POLICIES,
+ doc = "Enables the scalable-topics V5 API on this broker. When
disabled, "
+ + "the broker advertises supports_scalable_topics=false in
CommandConnected "
+ + "feature flags and rejects scalable-topic commands from
clients."
+ )
+ private boolean scalableTopicsEnabled = true;
+
@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 3a8ad10f8c6..d98fc358583 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -68,7 +68,8 @@ public interface PulsarCommandSender {
void sendGetOrCreateSchemaErrorResponse(long requestId, ServerError error,
String errorMessage);
- void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize,
boolean supportsTopicWatchers);
+ void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize,
boolean supportsTopicWatchers,
+ boolean supportsScalableTopics);
void sendLookupResponse(String brokerServiceUrl, String
brokerServiceUrlTls, boolean authoritative,
CommandLookupTopicResponse.LookupType response,
long requestId,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 5e65b8a5571..f4998183999 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -174,9 +174,10 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
}
@Override
- public void sendConnectedResponse(int clientProtocolVersion, int
maxMessageSize, boolean supportsTopicWatchers) {
+ public void sendConnectedResponse(int clientProtocolVersion, int
maxMessageSize, boolean supportsTopicWatchers,
+ boolean supportsScalableTopics) {
BaseCommand command = Commands.newConnectedCommand(
- clientProtocolVersion, maxMessageSize, supportsTopicWatchers);
+ clientProtocolVersion, maxMessageSize, supportsTopicWatchers,
supportsScalableTopics);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
writeAndFlush(outBuf);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cc09a3109e6..4f130e28961 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -216,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final boolean enableSubscriptionPatternEvaluation;
private final boolean enableTopicListWatcher;
+ private final boolean scalableTopicsEnabled;
private final int maxSubscriptionPatternLength;
private final TopicListService topicListService;
private final BrokerInterceptor brokerInterceptor;
@@ -383,6 +384,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.maxTopicListInFlightLimiter =
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
this.enableSubscriptionPatternEvaluation =
conf.isEnableBrokerSideSubscriptionPatternEvaluation();
this.enableTopicListWatcher = conf.isEnableBrokerTopicListWatcher();
+ this.scalableTopicsEnabled = conf.isScalableTopicsEnabled();
this.maxSubscriptionPatternLength =
conf.getSubscriptionPatternMaxLength();
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation,
maxSubscriptionPatternLength);
@@ -759,6 +761,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug().attr("topic", topicStr).attr("sessionId", sessionId)
.log("Received ScalableTopicLookup");
+ if (!scalableTopicsEnabled) {
+ ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
ServerError.NotAllowedError,
+ "Scalable topics are disabled on this broker"));
+ return;
+ }
+
final TopicName topicName;
try {
topicName = TopicName.get(topicStr);
@@ -850,6 +858,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.attr("consumerName", consumerName).attr("requestId",
requestId)
.log("Received ScalableTopicSubscribe");
+ if (!scalableTopicsEnabled) {
+ getCommandSender().sendScalableTopicSubscribeError(requestId,
ServerError.NotAllowedError,
+ "Scalable topics are disabled on this broker");
+ return;
+ }
+
final TopicName topicName;
try {
topicName = TopicName.get(topicStr);
@@ -1117,7 +1131,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
maybeScheduleAuthenticationCredentialsRefresh();
}
- writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, enableTopicListWatcher));
+ writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, enableTopicListWatcher,
+ scalableTopicsEnabled));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
log.debug()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index b2a5a65d813..f34efa6b2b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -771,7 +771,7 @@ public class ClientErrorsTest {
AtomicBoolean msgSent = new AtomicBoolean();
mockBrokerService.setHandleConnect((ctx, connect) -> {
channelCtx.set(ctx);
-
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
+
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false,
false));
if (numOfConnections.incrementAndGet() == 2) {
// close the cnx immediately when trying to connect the 2nd
time
ctx.channel().close();
@@ -812,7 +812,7 @@ public class ClientErrorsTest {
CountDownLatch latch = new CountDownLatch(1);
mockBrokerService.setHandleConnect((ctx, connect) -> {
channelCtx.set(ctx);
-
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
+
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false,
false));
if (numOfConnections.incrementAndGet() == 2) {
// close the cnx immediately when trying to connect the 2nd
time
ctx.channel().close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index 58cb247f90a..16efd02ad7c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -145,7 +145,7 @@ public class MockBrokerService {
return;
}
// default
-
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
+
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false,
false));
}
@Override
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 5005549f476..b4d9a232b14 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -743,7 +743,8 @@ public class PulsarClientException extends IOException {
* "supports_topic_watchers" was introduced at "2.11" and is no longer
supported, so skip this enum.
*/
public enum FailedFeatureCheck {
- SupportsGetPartitionedMetadataWithoutAutoCreation;
+ SupportsGetPartitionedMetadataWithoutAutoCreation,
+ SupportsScalableTopics;
}
/**
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index 632692fd91c..b13828e7561 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -74,6 +74,13 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
v4Client.getConnection(topicName.toString())
.thenAccept(cnx -> {
this.cnx = cnx;
+ if (!cnx.isSupportsScalableTopics()) {
+ initialLayoutFuture.completeExceptionally(
+ new
PulsarClientException.FeatureNotSupportedException(
+ "Broker does not support scalable
topics",
+
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
+ return;
+ }
// Register this session to receive updates
cnx.registerDagWatchSession(sessionId, this);
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index cceaa3b3edf..08c3643c601 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -126,6 +126,13 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
})
.thenAccept(cnx -> {
this.cnx = cnx;
+ if (!cnx.isSupportsScalableTopics()) {
+ initialAssignmentFuture.completeExceptionally(
+ new
PulsarClientException.FeatureNotSupportedException(
+ "Broker does not support scalable
topics",
+
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
+ return;
+ }
cnx.registerScalableConsumerSession(consumerId, this);
long requestId = v4Client.newRequestId();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 75131f0af1d..f45fc2c91f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -219,6 +219,8 @@ public class ClientCnx extends PulsarHandler {
private boolean brokerSupportsReplDedupByLidAndEid;
@Getter
private boolean supportsTopicWatcherReconcile;
+ @Getter
+ private boolean supportsScalableTopics;
/** Idle stat. **/
@Getter
@@ -435,6 +437,8 @@ public class ClientCnx extends PulsarHandler {
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsReplDedupByLidAndEid();
supportsTopicWatcherReconcile =
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatcherReconcile();
+ supportsScalableTopics =
+ connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsScalableTopics();
// set remote protocol version to the correct version before we
complete the connection future
setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 227aae5ee17..523f47b4805 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -305,12 +305,14 @@ public class Commands {
return cmd;
}
- public static ByteBuf newConnected(int clientProtocoVersion, boolean
supportsTopicWatchers) {
- return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE,
supportsTopicWatchers);
+ public static ByteBuf newConnected(int clientProtocoVersion, boolean
supportsTopicWatchers,
+ boolean supportsScalableTopics) {
+ return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE,
supportsTopicWatchers,
+ supportsScalableTopics);
}
public static BaseCommand newConnectedCommand(int clientProtocolVersion,
int maxMessageSize,
- boolean
supportsTopicWatchers) {
+ boolean
supportsTopicWatchers, boolean supportsScalableTopics) {
BaseCommand cmd = localCmd(Type.CONNECTED);
CommandConnected connected = cmd.setConnected()
.setServerVersion("Pulsar Server" +
PulsarVersion.getVersion());
@@ -330,11 +332,14 @@ public class Commands {
connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true);
connected.setFeatureFlags().setSupportsTopicWatcherReconcile(supportsTopicWatchers);
+
connected.setFeatureFlags().setSupportsScalableTopics(supportsScalableTopics);
return cmd;
}
- public static ByteBuf newConnected(int clientProtocolVersion, int
maxMessageSize, boolean supportsTopicWatchers) {
- return serializeWithSize(newConnectedCommand(clientProtocolVersion,
maxMessageSize, supportsTopicWatchers));
+ public static ByteBuf newConnected(int clientProtocolVersion, int
maxMessageSize, boolean supportsTopicWatchers,
+ boolean supportsScalableTopics) {
+ return serializeWithSize(newConnectedCommand(clientProtocolVersion,
maxMessageSize, supportsTopicWatchers,
+ supportsScalableTopics));
}
public static ByteBuf newAuthChallenge(String authMethod, AuthData
brokerData, int clientProtocolVersion) {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 114d083e12f..195fc597ecc 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -316,6 +316,7 @@ message FeatureFlags {
optional bool supports_get_partitioned_metadata_without_auto_creation = 5
[default = false];
optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false];
optional bool supports_topic_watcher_reconcile = 7 [default = false];
+ optional bool supports_scalable_topics = 8 [default = false];
}
message CommandConnected {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 2dbd26b00cb..bfdfb83f8cc 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -454,7 +454,7 @@ public class ProxyConnection extends PulsarHandler {
state = State.ProxyLookupRequests;
lookupProxyHandler = service.newLookupProxyHandler(this);
startAuthRefreshTaskIfNotStarted();
- final ByteBuf msg =
Commands.newConnected(protocolVersionToAdvertise, false);
+ final ByteBuf msg =
Commands.newConnected(protocolVersionToAdvertise, false, false);
writeAndFlush(msg);
}
}
@@ -467,7 +467,8 @@ public class ProxyConnection extends PulsarHandler {
int maxMessageSize =
connected.hasMaxMessageSize() ?
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
final ByteBuf msg =
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
- connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers());
+ connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers(),
+ connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsScalableTopics());
writeAndFlush(msg);
// Start auth refresh task only if we are not forwarding
authorization credentials
if
(!service.getConfiguration().isForwardAuthorizationCredentials()) {