This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 88fcfa6b425 [feat][broker] PIP-466: Add supports_scalable_topics 
feature flag and broker config (#25616)
88fcfa6b425 is described below

commit 88fcfa6b425c47f830cc690a70ef6cfe31ec4d33
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 29 19:03:01 2026 +0300

    [feat][broker] PIP-466: Add supports_scalable_topics feature flag and 
broker config (#25616)
---
 .../org/apache/pulsar/broker/ServiceConfiguration.java  |  9 +++++++++
 .../pulsar/broker/service/PulsarCommandSender.java      |  3 ++-
 .../pulsar/broker/service/PulsarCommandSenderImpl.java  |  5 +++--
 .../org/apache/pulsar/broker/service/ServerCnx.java     | 17 ++++++++++++++++-
 .../org/apache/pulsar/client/api/ClientErrorsTest.java  |  4 ++--
 .../org/apache/pulsar/client/api/MockBrokerService.java |  2 +-
 .../apache/pulsar/client/api/PulsarClientException.java |  3 ++-
 .../apache/pulsar/client/impl/v5/DagWatchClient.java    |  7 +++++++
 .../pulsar/client/impl/v5/ScalableConsumerClient.java   |  7 +++++++
 .../java/org/apache/pulsar/client/impl/ClientCnx.java   |  4 ++++
 .../org/apache/pulsar/common/protocol/Commands.java     | 15 ++++++++++-----
 pulsar-common/src/main/proto/PulsarApi.proto            |  1 +
 .../org/apache/pulsar/proxy/server/ProxyConnection.java |  5 +++--
 13 files changed, 67 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b03af49c56e..d62cb1085ce 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1289,6 +1289,15 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean enableBrokerTopicListWatcher = true;
 
+    @FieldContext(
+            dynamic = false,
+            category = CATEGORY_POLICIES,
+            doc = "Enables the scalable-topics V5 API on this broker. When 
disabled, "
+                    + "the broker advertises supports_scalable_topics=false in 
CommandConnected "
+                    + "feature flags and rejects scalable-topic commands from 
clients."
+    )
+    private boolean scalableTopicsEnabled = true;
+
     @FieldContext(
             dynamic = false,
             category = CATEGORY_POLICIES,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 3a8ad10f8c6..d98fc358583 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -68,7 +68,8 @@ public interface PulsarCommandSender {
 
     void sendGetOrCreateSchemaErrorResponse(long requestId, ServerError error, 
String errorMessage);
 
-    void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize, 
boolean supportsTopicWatchers);
+    void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize, 
boolean supportsTopicWatchers,
+                               boolean supportsScalableTopics);
 
     void sendLookupResponse(String brokerServiceUrl, String 
brokerServiceUrlTls, boolean authoritative,
                             CommandLookupTopicResponse.LookupType response, 
long requestId,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 5e65b8a5571..f4998183999 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -174,9 +174,10 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     }
 
     @Override
-    public void sendConnectedResponse(int clientProtocolVersion, int 
maxMessageSize, boolean supportsTopicWatchers) {
+    public void sendConnectedResponse(int clientProtocolVersion, int 
maxMessageSize, boolean supportsTopicWatchers,
+                                      boolean supportsScalableTopics) {
         BaseCommand command = Commands.newConnectedCommand(
-                clientProtocolVersion, maxMessageSize, supportsTopicWatchers);
+                clientProtocolVersion, maxMessageSize, supportsTopicWatchers, 
supportsScalableTopics);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         writeAndFlush(outBuf);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cc09a3109e6..4f130e28961 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -216,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private final boolean enableSubscriptionPatternEvaluation;
     private final boolean enableTopicListWatcher;
+    private final boolean scalableTopicsEnabled;
     private final int maxSubscriptionPatternLength;
     private final TopicListService topicListService;
     private final BrokerInterceptor brokerInterceptor;
@@ -383,6 +384,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.maxTopicListInFlightLimiter = 
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
         this.enableSubscriptionPatternEvaluation = 
conf.isEnableBrokerSideSubscriptionPatternEvaluation();
         this.enableTopicListWatcher = conf.isEnableBrokerTopicListWatcher();
+        this.scalableTopicsEnabled = conf.isScalableTopicsEnabled();
         this.maxSubscriptionPatternLength = 
conf.getSubscriptionPatternMaxLength();
         this.topicListService = new TopicListService(pulsar, this,
                 enableSubscriptionPatternEvaluation, 
maxSubscriptionPatternLength);
@@ -759,6 +761,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         log.debug().attr("topic", topicStr).attr("sessionId", sessionId)
                 .log("Received ScalableTopicLookup");
 
+        if (!scalableTopicsEnabled) {
+            ctx.writeAndFlush(Commands.newScalableTopicError(sessionId, 
ServerError.NotAllowedError,
+                    "Scalable topics are disabled on this broker"));
+            return;
+        }
+
         final TopicName topicName;
         try {
             topicName = TopicName.get(topicStr);
@@ -850,6 +858,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 .attr("consumerName", consumerName).attr("requestId", 
requestId)
                 .log("Received ScalableTopicSubscribe");
 
+        if (!scalableTopicsEnabled) {
+            getCommandSender().sendScalableTopicSubscribeError(requestId, 
ServerError.NotAllowedError,
+                    "Scalable topics are disabled on this broker");
+            return;
+        }
+
         final TopicName topicName;
         try {
             topicName = TopicName.get(topicStr);
@@ -1117,7 +1131,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
             maybeScheduleAuthenticationCredentialsRefresh();
         }
-        writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, enableTopicListWatcher));
+        writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, enableTopicListWatcher,
+                scalableTopicsEnabled));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
         log.debug()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index b2a5a65d813..f34efa6b2b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -771,7 +771,7 @@ public class ClientErrorsTest {
         AtomicBoolean msgSent = new AtomicBoolean();
         mockBrokerService.setHandleConnect((ctx, connect) -> {
             channelCtx.set(ctx);
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
+            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false, 
false));
             if (numOfConnections.incrementAndGet() == 2) {
                 // close the cnx immediately when trying to connect the 2nd 
time
                 ctx.channel().close();
@@ -812,7 +812,7 @@ public class ClientErrorsTest {
         CountDownLatch latch = new CountDownLatch(1);
         mockBrokerService.setHandleConnect((ctx, connect) -> {
             channelCtx.set(ctx);
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
+            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false, 
false));
             if (numOfConnections.incrementAndGet() == 2) {
                 // close the cnx immediately when trying to connect the 2nd 
time
                 ctx.channel().close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index 58cb247f90a..16efd02ad7c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -145,7 +145,7 @@ public class MockBrokerService {
                 return;
             }
             // default
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false));
+            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false, 
false));
         }
 
         @Override
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 5005549f476..b4d9a232b14 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -743,7 +743,8 @@ public class PulsarClientException extends IOException {
      * "supports_topic_watchers" was introduced at "2.11" and is no longer 
supported, so skip this enum.
      */
     public enum FailedFeatureCheck {
-        SupportsGetPartitionedMetadataWithoutAutoCreation;
+        SupportsGetPartitionedMetadataWithoutAutoCreation,
+        SupportsScalableTopics;
     }
 
     /**
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index 632692fd91c..b13828e7561 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -74,6 +74,13 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
         v4Client.getConnection(topicName.toString())
                 .thenAccept(cnx -> {
                     this.cnx = cnx;
+                    if (!cnx.isSupportsScalableTopics()) {
+                        initialLayoutFuture.completeExceptionally(
+                                new 
PulsarClientException.FeatureNotSupportedException(
+                                        "Broker does not support scalable 
topics",
+                                        
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
+                        return;
+                    }
                     // Register this session to receive updates
                     cnx.registerDagWatchSession(sessionId, this);
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index cceaa3b3edf..08c3643c601 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -126,6 +126,13 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
                 })
                 .thenAccept(cnx -> {
                     this.cnx = cnx;
+                    if (!cnx.isSupportsScalableTopics()) {
+                        initialAssignmentFuture.completeExceptionally(
+                                new 
PulsarClientException.FeatureNotSupportedException(
+                                        "Broker does not support scalable 
topics",
+                                        
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
+                        return;
+                    }
                     cnx.registerScalableConsumerSession(consumerId, this);
 
                     long requestId = v4Client.newRequestId();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 75131f0af1d..f45fc2c91f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -219,6 +219,8 @@ public class ClientCnx extends PulsarHandler {
     private boolean brokerSupportsReplDedupByLidAndEid;
     @Getter
     private boolean supportsTopicWatcherReconcile;
+    @Getter
+    private boolean supportsScalableTopics;
 
     /** Idle stat. **/
     @Getter
@@ -435,6 +437,8 @@ public class ClientCnx extends PulsarHandler {
             connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsReplDedupByLidAndEid();
         supportsTopicWatcherReconcile =
             connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatcherReconcile();
+        supportsScalableTopics =
+            connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsScalableTopics();
 
         // set remote protocol version to the correct version before we 
complete the connection future
         setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 227aae5ee17..523f47b4805 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -305,12 +305,14 @@ public class Commands {
         return cmd;
     }
 
-    public static ByteBuf newConnected(int clientProtocoVersion,  boolean 
supportsTopicWatchers) {
-        return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE, 
supportsTopicWatchers);
+    public static ByteBuf newConnected(int clientProtocoVersion, boolean 
supportsTopicWatchers,
+                                       boolean supportsScalableTopics) {
+        return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE, 
supportsTopicWatchers,
+                supportsScalableTopics);
     }
 
     public static BaseCommand newConnectedCommand(int clientProtocolVersion, 
int maxMessageSize,
-                                                  boolean 
supportsTopicWatchers) {
+                                                  boolean 
supportsTopicWatchers, boolean supportsScalableTopics) {
         BaseCommand cmd = localCmd(Type.CONNECTED);
         CommandConnected connected = cmd.setConnected()
                 .setServerVersion("Pulsar Server" + 
PulsarVersion.getVersion());
@@ -330,11 +332,14 @@ public class Commands {
         
connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true);
         connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true);
         
connected.setFeatureFlags().setSupportsTopicWatcherReconcile(supportsTopicWatchers);
+        
connected.setFeatureFlags().setSupportsScalableTopics(supportsScalableTopics);
         return cmd;
     }
 
-    public static ByteBuf newConnected(int clientProtocolVersion, int 
maxMessageSize,  boolean supportsTopicWatchers) {
-        return serializeWithSize(newConnectedCommand(clientProtocolVersion, 
maxMessageSize, supportsTopicWatchers));
+    public static ByteBuf newConnected(int clientProtocolVersion, int 
maxMessageSize, boolean supportsTopicWatchers,
+                                       boolean supportsScalableTopics) {
+        return serializeWithSize(newConnectedCommand(clientProtocolVersion, 
maxMessageSize, supportsTopicWatchers,
+                supportsScalableTopics));
     }
 
     public static ByteBuf newAuthChallenge(String authMethod, AuthData 
brokerData, int clientProtocolVersion) {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 114d083e12f..195fc597ecc 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -316,6 +316,7 @@ message FeatureFlags {
   optional bool supports_get_partitioned_metadata_without_auto_creation = 5 
[default = false];
   optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false];
   optional bool supports_topic_watcher_reconcile = 7 [default = false];
+  optional bool supports_scalable_topics = 8 [default = false];
 }
 
 message CommandConnected {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 2dbd26b00cb..bfdfb83f8cc 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -454,7 +454,7 @@ public class ProxyConnection extends PulsarHandler {
             state = State.ProxyLookupRequests;
             lookupProxyHandler = service.newLookupProxyHandler(this);
             startAuthRefreshTaskIfNotStarted();
-            final ByteBuf msg = 
Commands.newConnected(protocolVersionToAdvertise, false);
+            final ByteBuf msg = 
Commands.newConnected(protocolVersionToAdvertise, false, false);
             writeAndFlush(msg);
         }
     }
@@ -467,7 +467,8 @@ public class ProxyConnection extends PulsarHandler {
             int maxMessageSize =
                     connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
             final ByteBuf msg = 
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
-                    connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers());
+                    connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers(),
+                    connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsScalableTopics());
             writeAndFlush(msg);
             // Start auth refresh task only if we are not forwarding 
authorization credentials
             if 
(!service.getConfiguration().isForwardAuthorizationCredentials()) {

Reply via email to