This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b45aa4e9159 [cleanup][broker] Clean up scalable topic type references 
in ServerCnx (#25920)
b45aa4e9159 is described below

commit b45aa4e9159aa7719de56e031014b224b76af22b
Author: Ruimin MA <[email protected]>
AuthorDate: Wed Jun 3 22:10:04 2026 +0800

    [cleanup][broker] Clean up scalable topic type references in ServerCnx 
(#25920)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 38 +++++++++++++---------
 1 file changed, 22 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8b7ec2134fb..af7e8930f36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,6 +100,9 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFo
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.scalable.ConsumerSession;
+import org.apache.pulsar.broker.service.scalable.DagWatchSession;
+import org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
@@ -137,6 +140,7 @@ import org.apache.pulsar.common.api.proto.CommandProducer;
 import 
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
 import org.apache.pulsar.common.api.proto.CommandScalableTopicClose;
 import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSeek;
 import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -145,6 +149,8 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
 import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
+import org.apache.pulsar.common.api.proto.CommandWatchScalableTopics;
+import org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
@@ -155,6 +161,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.TxnAction;
@@ -758,9 +765,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     // --- Scalable topic lookup ---
 
-    private final java.util.concurrent.ConcurrentHashMap<Long,
-            org.apache.pulsar.broker.service.scalable.DagWatchSession> 
dagWatchSessions =
-            new java.util.concurrent.ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long,
+            DagWatchSession> dagWatchSessions =
+            new ConcurrentHashMap<>();
 
     @Override
     protected void handleCommandScalableTopicLookup(
@@ -823,7 +830,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         return;
                     }
                     // Create a DagWatchSession that will send the initial 
layout and watch for changes
-                    var session = new 
org.apache.pulsar.broker.service.scalable.DagWatchSession(
+                    var session = new DagWatchSession(
                             sessionId, topicName, this, resources, service);
                     dagWatchSessions.put(sessionId, session);
 
@@ -854,13 +861,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     // --- Scalable topics namespace watcher ---
 
-    private final java.util.concurrent.ConcurrentHashMap<Long,
-            
org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession>
-            scalableTopicsWatchers = new 
java.util.concurrent.ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long,
+            ScalableTopicsWatcherSession>
+            scalableTopicsWatchers = new ConcurrentHashMap<>();
 
     @Override
     protected void handleCommandWatchScalableTopics(
-            org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd) 
{
+            CommandWatchScalableTopics cmd) {
         checkArgument(state == State.Connected);
 
         final long watchId = cmd.getWatchId();
@@ -898,7 +905,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        org.apache.pulsar.broker.resources.ScalableTopicResources resources =
+        ScalableTopicResources resources =
                 
service.getPulsar().getPulsarResources().getScalableTopicResources();
         if (resources == null) {
             log.warn("WatchScalableTopics rejected: scalable topic resources 
not available");
@@ -917,8 +924,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 ServerError.AuthorizationError, msg));
                         return;
                     }
-                    var session = new org.apache.pulsar.broker.service.scalable
-                            .ScalableTopicsWatcherSession(watchId, 
namespaceName, propertyFilters,
+                    var session = new ScalableTopicsWatcherSession(watchId, 
namespaceName, propertyFilters,
                                     clientHash, this, resources, 
service.getPulsar().getExecutor());
                     scalableTopicsWatchers.put(watchId, session);
 
@@ -946,7 +952,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     protected void handleCommandWatchScalableTopicsClose(
-            org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose 
cmd) {
+            CommandWatchScalableTopicsClose cmd) {
         // Same idempotent-close semantics as DAG watch / consumer close: 
per-cnx
         // session, originating subscribe was authorized at create time, no 
per-call
         // authz needed. Unknown watchId is a no-op.
@@ -991,13 +997,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             scalableConsumerRegistrations = new 
java.util.concurrent.ConcurrentHashMap<>();
 
     private record ScalableConsumerRegistrationRef(
-            org.apache.pulsar.common.naming.TopicName topicName,
+            TopicName topicName,
             String subscription,
             String consumerName) {}
 
     @Override
     protected void handleCommandScalableTopicSubscribe(
-            org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe
+            CommandScalableTopicSubscribe
                     commandScalableTopicSubscribe) {
         checkArgument(state == State.Connected);
 
@@ -1006,7 +1012,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final String subscription = 
commandScalableTopicSubscribe.getSubscription();
         final String consumerName = 
commandScalableTopicSubscribe.getConsumerName();
         final long consumerId = commandScalableTopicSubscribe.getConsumerId();
-        final org.apache.pulsar.common.api.proto.ScalableConsumerType 
consumerType =
+        final ScalableConsumerType consumerType =
                 commandScalableTopicSubscribe.getConsumerType();
 
         log.debug().attr("topic", topicStr).attr("subscription", subscription)
@@ -1064,7 +1070,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 scalableConsumerRegistrations.put(consumerId,
                                         new 
ScalableConsumerRegistrationRef(topicName, subscription, consumerName));
                                 
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
-                                        
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
+                                        ConsumerSession.toProto(assignment));
                             }, ctx.executor());
                 })
                 .exceptionally(ex -> {

Reply via email to