This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b45aa4e9159 [cleanup][broker] Clean up scalable topic type references
in ServerCnx (#25920)
b45aa4e9159 is described below
commit b45aa4e9159aa7719de56e031014b224b76af22b
Author: Ruimin MA <[email protected]>
AuthorDate: Wed Jun 3 22:10:04 2026 +0800
[cleanup][broker] Clean up scalable topic type references in ServerCnx
(#25920)
---
.../apache/pulsar/broker/service/ServerCnx.java | 38 +++++++++++++---------
1 file changed, 22 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8b7ec2134fb..af7e8930f36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,6 +100,9 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFo
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.scalable.ConsumerSession;
+import org.apache.pulsar.broker.service.scalable.DagWatchSession;
+import org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
@@ -137,6 +140,7 @@ import org.apache.pulsar.common.api.proto.CommandProducer;
import
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.common.api.proto.CommandScalableTopicClose;
import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -145,6 +149,8 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
+import org.apache.pulsar.common.api.proto.CommandWatchScalableTopics;
+import org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.FeatureFlags;
@@ -155,6 +161,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
@@ -758,9 +765,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// --- Scalable topic lookup ---
- private final java.util.concurrent.ConcurrentHashMap<Long,
- org.apache.pulsar.broker.service.scalable.DagWatchSession>
dagWatchSessions =
- new java.util.concurrent.ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long,
+ DagWatchSession> dagWatchSessions =
+ new ConcurrentHashMap<>();
@Override
protected void handleCommandScalableTopicLookup(
@@ -823,7 +830,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
// Create a DagWatchSession that will send the initial
layout and watch for changes
- var session = new
org.apache.pulsar.broker.service.scalable.DagWatchSession(
+ var session = new DagWatchSession(
sessionId, topicName, this, resources, service);
dagWatchSessions.put(sessionId, session);
@@ -854,13 +861,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// --- Scalable topics namespace watcher ---
- private final java.util.concurrent.ConcurrentHashMap<Long,
-
org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession>
- scalableTopicsWatchers = new
java.util.concurrent.ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long,
+ ScalableTopicsWatcherSession>
+ scalableTopicsWatchers = new ConcurrentHashMap<>();
@Override
protected void handleCommandWatchScalableTopics(
- org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd)
{
+ CommandWatchScalableTopics cmd) {
checkArgument(state == State.Connected);
final long watchId = cmd.getWatchId();
@@ -898,7 +905,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
- org.apache.pulsar.broker.resources.ScalableTopicResources resources =
+ ScalableTopicResources resources =
service.getPulsar().getPulsarResources().getScalableTopicResources();
if (resources == null) {
log.warn("WatchScalableTopics rejected: scalable topic resources
not available");
@@ -917,8 +924,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ServerError.AuthorizationError, msg));
return;
}
- var session = new org.apache.pulsar.broker.service.scalable
- .ScalableTopicsWatcherSession(watchId,
namespaceName, propertyFilters,
+ var session = new ScalableTopicsWatcherSession(watchId,
namespaceName, propertyFilters,
clientHash, this, resources,
service.getPulsar().getExecutor());
scalableTopicsWatchers.put(watchId, session);
@@ -946,7 +952,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
protected void handleCommandWatchScalableTopicsClose(
- org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose
cmd) {
+ CommandWatchScalableTopicsClose cmd) {
// Same idempotent-close semantics as DAG watch / consumer close:
per-cnx
// session, originating subscribe was authorized at create time, no
per-call
// authz needed. Unknown watchId is a no-op.
@@ -991,13 +997,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
scalableConsumerRegistrations = new
java.util.concurrent.ConcurrentHashMap<>();
private record ScalableConsumerRegistrationRef(
- org.apache.pulsar.common.naming.TopicName topicName,
+ TopicName topicName,
String subscription,
String consumerName) {}
@Override
protected void handleCommandScalableTopicSubscribe(
- org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe
+ CommandScalableTopicSubscribe
commandScalableTopicSubscribe) {
checkArgument(state == State.Connected);
@@ -1006,7 +1012,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final String subscription =
commandScalableTopicSubscribe.getSubscription();
final String consumerName =
commandScalableTopicSubscribe.getConsumerName();
final long consumerId = commandScalableTopicSubscribe.getConsumerId();
- final org.apache.pulsar.common.api.proto.ScalableConsumerType
consumerType =
+ final ScalableConsumerType consumerType =
commandScalableTopicSubscribe.getConsumerType();
log.debug().attr("topic", topicStr).attr("subscription", subscription)
@@ -1064,7 +1070,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
scalableConsumerRegistrations.put(consumerId,
new
ScalableConsumerRegistrationRef(topicName, subscription, consumerName));
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
-
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
+ ConsumerSession.toProto(assignment));
}, ctx.executor());
})
.exceptionally(ex -> {