This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 597eb0daa2c [feat] PIP-468: Add scalable topic protocol commands and 
connection handling (#25564)
597eb0daa2c is described below

commit 597eb0daa2ca155cdf3d1bb0c2ff63c637e5ab41
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 24 14:46:54 2026 -0700

    [feat] PIP-468: Add scalable topic protocol commands and connection 
handling (#25564)
---
 .../pulsar/broker/service/PulsarCommandSender.java |  18 ++
 .../broker/service/PulsarCommandSenderImpl.java    |  17 ++
 .../apache/pulsar/broker/service/ServerCnx.java    | 176 +++++++++++
 .../broker/service/scalable/ConsumerSession.java   |  38 ++-
 .../broker/service/scalable/DagWatchSession.java   | 248 ++++++++++++++++
 .../service/scalable/ScalableTopicService.java     |  23 ++
 .../service/scalable/ConsumerSessionTest.java      | 299 +++++++++++++++++++
 .../service/scalable/DagWatchSessionTest.java      | 330 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  45 +++
 .../apache/pulsar/client/impl/DagWatchSession.java |  45 +++
 .../apache/pulsar/common/protocol/Commands.java    |  76 +++++
 .../pulsar/common/protocol/PulsarDecoder.java      |  63 ++++
 pulsar-common/src/main/proto/PulsarApi.proto       | 121 ++++++++
 .../common/protocol/CommandsScalableTopicTest.java | 234 +++++++++++++++
 14 files changed, 1729 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index ca80ca49d76..3a8ad10f8c6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -107,4 +108,21 @@ public interface PulsarCommandSender {
                                   List<String> newTopics, List<String> 
deletedTopics, String topicsHash,
                                   Function<Throwable, CompletableFuture<Void>>
                                                              
permitAcquireErrorHandler);
+
+    /**
+     * Send the response to a scalable-topic subscribe request on success. The 
caller is
+     * responsible for building the {@link ScalableConsumerAssignment}.
+     */
+    void sendScalableTopicSubscribeResponse(long requestId, 
ScalableConsumerAssignment assignment);
+
+    /**
+     * Send an error response to a scalable-topic subscribe request.
+     */
+    void sendScalableTopicSubscribeError(long requestId, ServerError error, 
String message);
+
+    /**
+     * Push a segment-assignment update to a previously-subscribed scalable 
consumer after
+     * a rebalance (peer added/removed, split, merge).
+     */
+    void sendScalableTopicAssignmentUpdate(long consumerId, 
ScalableConsumerAssignment assignment);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 41b182f8bda..5e65b8a5571 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -405,6 +405,23 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 command, permitAcquireErrorHandler);
     }
 
+    @Override
+    public void sendScalableTopicSubscribeResponse(long requestId,
+            org.apache.pulsar.common.api.proto.ScalableConsumerAssignment 
assignment) {
+        writeAndFlush(Commands.newScalableTopicSubscribeResponse(requestId, 
assignment));
+    }
+
+    @Override
+    public void sendScalableTopicSubscribeError(long requestId, ServerError 
error, String message) {
+        writeAndFlush(Commands.newScalableTopicSubscribeError(requestId, 
error, message));
+    }
+
+    @Override
+    public void sendScalableTopicAssignmentUpdate(long consumerId,
+            org.apache.pulsar.common.api.proto.ScalableConsumerAssignment 
assignment) {
+        writeAndFlush(Commands.newScalableTopicAssignmentUpdate(consumerId, 
assignment));
+    }
+
     private void writeAndFlush(ByteBuf outBuf) {
         NettyChannelUtil.writeAndFlushWithVoidPromise(cnx.ctx(), outBuf);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 569cbab1e07..cc09a3109e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -92,6 +92,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -134,6 +135,8 @@ import org.apache.pulsar.common.api.proto.CommandNewTxn;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.CommandProducer;
 import 
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicClose;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup;
 import org.apache.pulsar.common.api.proto.CommandSeek;
 import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -470,6 +473,36 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
         });
         this.topicListService.inactivate();
+
+        // Close any outstanding scalable-topic DAG watch sessions held by 
this connection.
+        dagWatchSessions.values().forEach(session -> {
+            try {
+                session.close();
+            } catch (Exception e) {
+                log.warn().exceptionMessage(e).log("Error closing DAG watch 
session on connection close");
+            }
+        });
+        dagWatchSessions.clear();
+
+        // Notify the scalable-topic controller that this connection's 
scalable consumers
+        // have dropped. The controller marks them disconnected and starts the 
grace-period
+        // timer; if they reconnect in time, their assignment is preserved.
+        if (!scalableConsumerRegistrations.isEmpty()) {
+            var scalableTopicService = service.getScalableTopicService();
+            if (scalableTopicService != null) {
+                scalableConsumerRegistrations.values().forEach(ref -> {
+                    try {
+                        scalableTopicService.onConsumerDisconnect(
+                                ref.topicName(), ref.subscription(), 
ref.consumerName());
+                    } catch (Exception e) {
+                        log.warn().attr("consumerName", 
ref.consumerName()).exceptionMessage(e)
+                                .log("Error notifying scalable controller of 
consumer disconnect");
+                    }
+                });
+            }
+            scalableConsumerRegistrations.clear();
+        }
+
         this.service.getPulsarStats().recordConnectionClose();
 
         // complete possible pending connection check future
@@ -709,6 +742,149 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
     }
 
+    // --- Scalable topic lookup ---
+
+    private final java.util.concurrent.ConcurrentHashMap<Long,
+            org.apache.pulsar.broker.service.scalable.DagWatchSession> 
dagWatchSessions =
+            new java.util.concurrent.ConcurrentHashMap<>();
+
+    @Override
+    protected void handleCommandScalableTopicLookup(
+            CommandScalableTopicLookup commandScalableTopicLookup) {
+        checkArgument(state == State.Connected);
+
+        final long sessionId = commandScalableTopicLookup.getSessionId();
+        final String topicStr = commandScalableTopicLookup.getTopic();
+
+        log.debug().attr("topic", topicStr).attr("sessionId", sessionId)
+                .log("Received ScalableTopicLookup");
+
+        final TopicName topicName;
+        try {
+            topicName = TopicName.get(topicStr);
+        } catch (Exception e) {
+            log.warn().attr("topic", topicStr).log("Invalid topic name in 
ScalableTopicLookup");
+            ctx.close();
+            return;
+        }
+
+        if (!this.service.getPulsar().isRunning()) {
+            log.warn("ScalableTopicLookup rejected: broker not ready");
+            ctx.close();
+            return;
+        }
+
+        ScalableTopicResources resources = 
service.getPulsar().getPulsarResources()
+                .getScalableTopicResources();
+        if (resources == null) {
+            log.warn("ScalableTopicLookup rejected: scalable topic resources 
not available");
+            ctx.close();
+            return;
+        }
+
+        // Create a DagWatchSession that will send the initial layout and 
watch for changes
+        var session = new 
org.apache.pulsar.broker.service.scalable.DagWatchSession(
+                sessionId, topicName, this, resources, service);
+        dagWatchSessions.put(sessionId, session);
+
+        session.start()
+                .thenAcceptAsync(session::pushUpdate, ctx.executor())
+                .exceptionally(ex -> {
+                    Throwable cause = ex.getCause() != null ? ex.getCause() : 
ex;
+                    log.warn().attr("topic", topicName).exception(cause)
+                            .log("ScalableTopicLookup failed");
+                    dagWatchSessions.remove(sessionId);
+                    session.close();
+                    ctx.executor().execute(() ->
+                        
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
+                                ServerError.TopicNotFound, cause.getMessage()))
+                    );
+                    return null;
+                });
+    }
+
+    @Override
+    protected void handleCommandScalableTopicClose(
+            CommandScalableTopicClose commandScalableTopicClose) {
+        checkArgument(state == State.Connected);
+
+        final long sessionId = commandScalableTopicClose.getSessionId();
+
+        log.debug().attr("sessionId", sessionId).log("Received 
ScalableTopicClose");
+
+        var session = dagWatchSessions.remove(sessionId);
+        if (session != null) {
+            session.close();
+        }
+    }
+
+    // --- Scalable consumer registrations ---
+
+    /**
+     * Tracks the scalable-topic consumer registrations held by this 
connection, so that
+     * when the TCP connection drops we can notify the owning {@link
+     * org.apache.pulsar.broker.service.scalable.ScalableTopicController} of 
every
+     * disconnected consumer in bulk. Keyed by the protocol-level {@code 
consumerId}.
+     */
+    private final java.util.concurrent.ConcurrentHashMap<Long, 
ScalableConsumerRegistrationRef>
+            scalableConsumerRegistrations = new 
java.util.concurrent.ConcurrentHashMap<>();
+
+    private record ScalableConsumerRegistrationRef(
+            org.apache.pulsar.common.naming.TopicName topicName,
+            String subscription,
+            String consumerName) {}
+
+    @Override
+    protected void handleCommandScalableTopicSubscribe(
+            org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe
+                    commandScalableTopicSubscribe) {
+        checkArgument(state == State.Connected);
+
+        final long requestId = commandScalableTopicSubscribe.getRequestId();
+        final String topicStr = commandScalableTopicSubscribe.getTopic();
+        final String subscription = 
commandScalableTopicSubscribe.getSubscription();
+        final String consumerName = 
commandScalableTopicSubscribe.getConsumerName();
+        final long consumerId = commandScalableTopicSubscribe.getConsumerId();
+
+        log.debug().attr("topic", topicStr).attr("subscription", subscription)
+                .attr("consumerName", consumerName).attr("requestId", 
requestId)
+                .log("Received ScalableTopicSubscribe");
+
+        final TopicName topicName;
+        try {
+            topicName = TopicName.get(topicStr);
+        } catch (Exception e) {
+            getCommandSender().sendScalableTopicSubscribeError(requestId,
+                    ServerError.InvalidTopicName, "Invalid topic name: " + 
topicStr);
+            return;
+        }
+
+        var scalableTopicService = service.getScalableTopicService();
+        if (scalableTopicService == null) {
+            getCommandSender().sendScalableTopicSubscribeError(requestId,
+                    ServerError.ServiceNotReady, "Scalable topic service not 
available");
+            return;
+        }
+
+        scalableTopicService.registerConsumer(topicName, subscription, 
consumerName, consumerId, this)
+                .whenCompleteAsync((assignment, ex) -> {
+                    if (ex != null) {
+                        Throwable cause = ex.getCause() != null ? 
ex.getCause() : ex;
+                        log.warn().attr("topic", 
topicName).attr("subscription", subscription)
+                                .attr("consumerName", 
consumerName).exception(cause)
+                                .log("ScalableTopicSubscribe failed");
+                        
getCommandSender().sendScalableTopicSubscribeError(requestId,
+                                ServerError.UnknownError, cause.getMessage());
+                        return;
+                    }
+                    // Record the registration so we can call 
onConsumerDisconnect on channelInactive.
+                    scalableConsumerRegistrations.put(consumerId,
+                            new ScalableConsumerRegistrationRef(topicName, 
subscription, consumerName));
+                    
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
+                            
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
+                }, ctx.executor());
+    }
+
     @Override
     protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata 
partitionMetadataParam) {
         checkArgument(state == State.Connected);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
index 3e798ca418b..b5e1f5c00fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
@@ -167,12 +167,42 @@ public class ConsumerSession {
     }
 
     /**
-     * Push an assignment update to this consumer, if currently connected. The 
actual
-     * wire push is wired up in the protocol-commands commit; at this stage 
the method
-     * exists so {@link SubscriptionCoordinator} can call it.
+     * Push an assignment update to this consumer, if currently connected. 
Builds a
+     * {@code ScalableConsumerAssignment} proto and writes a
+     * {@code CommandScalableTopicAssignmentUpdate} to the connection.
      */
     public void sendAssignmentUpdate(ConsumerAssignment assignment) {
-        // no-op until the protocol-commands commit wires in the wire protocol
+        TransportCnx localCnx = this.cnx;
+        if (localCnx == null || !connected) {
+            // Consumer is disconnected — no-op. The assignment will be 
delivered when it
+            // reconnects (the coordinator re-pushes on attach).
+            return;
+        }
+        var sender = localCnx.getCommandSender();
+        if (sender == null) {
+            // Connection is in the middle of being torn down; skip silently.
+            return;
+        }
+        sender.sendScalableTopicAssignmentUpdate(consumerId, 
toProto(assignment));
+    }
+
+    /**
+     * Convert the broker-side {@link ConsumerAssignment} record to its 
protocol wire form.
+     * Shared by {@link #sendAssignmentUpdate(ConsumerAssignment)} and the
+     * {@code handleCommandScalableTopicSubscribe} path in {@code ServerCnx}.
+     */
+    public static 
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment toProto(
+            ConsumerAssignment assignment) {
+        var proto = new 
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment()
+                .setLayoutEpoch(assignment.layoutEpoch());
+        for (ConsumerAssignment.AssignedSegment seg : 
assignment.assignedSegments()) {
+            proto.addSegment()
+                    .setSegmentId(seg.segmentId())
+                    .setHashStart(seg.hashRange().start())
+                    .setHashEnd(seg.hashRange().end())
+                    .setSegmentTopic(seg.underlyingTopicName());
+        }
+        return proto;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
new file mode 100644
index 00000000000..87dee98b71b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
+import org.apache.pulsar.common.api.proto.SegmentBrokerAddress;
+import org.apache.pulsar.common.api.proto.SegmentInfoProto;
+import org.apache.pulsar.common.api.proto.SegmentState;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Broker-side handler for a client's DAG watch session.
+ *
+ * <p>Any broker can serve this role since metadata is in the metadata store.
+ * The session watches for metadata changes (via Oxia watch) and pushes updated
+ * {@link ScalableTopicLayoutResponse} to the client.
+ *
+ * <p>The session is tied to a connection. When the connection breaks, the 
session dies.
+ * The client must reinitiate a new session (possibly with another broker).
+ */
+public class DagWatchSession {
+
+    private static final Logger LOG = Logger.get(DagWatchSession.class);
+    private final Logger log;
+
+    @Getter
+    private final long sessionId;
+    private final TopicName topicName;
+    private final ServerCnx cnx;
+    private final ScalableTopicResources resources;
+    private final BrokerService brokerService;
+
+    private final String metadataPath;
+    private final java.util.function.Consumer<Notification> 
notificationListener;
+    private volatile boolean closed = false;
+
+    public DagWatchSession(long sessionId,
+                           TopicName topicName,
+                           ServerCnx cnx,
+                           ScalableTopicResources resources,
+                           BrokerService brokerService) {
+        this.sessionId = sessionId;
+        this.topicName = topicName;
+        this.cnx = cnx;
+        this.resources = resources;
+        this.brokerService = brokerService;
+        this.metadataPath = resources.topicPath(topicName);
+        this.notificationListener = this::onNotification;
+        this.log = LOG.with().attr("topic", topicName).attr("sessionId", 
sessionId).build();
+    }
+
+    /**
+     * Start the session: load current metadata, set up watch, and return
+     * the initial layout response.
+     */
+    public CompletableFuture<ScalableTopicLayoutResponse> start() {
+        // Register metadata store listener for changes to this topic's 
metadata
+        resources.getStore().registerListener(notificationListener);
+
+        return resources.getScalableTopicMetadataAsync(topicName, true)
+                .thenCompose(optMd -> {
+                    if (optMd.isEmpty()) {
+                        return CompletableFuture.failedFuture(
+                                new IllegalStateException("Scalable topic not 
found: " + topicName));
+                    }
+                    ScalableTopicMetadata metadata = optMd.get();
+                    return buildResponse(metadata);
+                });
+    }
+
+    // Visible for testing — invoked by the metadata-store listener registered 
in start().
+    void onNotification(Notification notification) {
+        if (closed) {
+            return;
+        }
+        if (!metadataPath.equals(notification.getPath())) {
+            return;
+        }
+        if (notification.getType() == NotificationType.Deleted) {
+            return;
+        }
+        // Metadata changed — reload and push update
+        resources.getScalableTopicMetadataAsync(topicName, true)
+                .thenAccept(optMd -> optMd.ifPresent(this::onMetadataChanged));
+    }
+
+    /**
+     * Called when the metadata store watch fires (metadata changed).
+     */
+    public void onMetadataChanged(ScalableTopicMetadata newMetadata) {
+        if (closed) {
+            return;
+        }
+        buildResponse(newMetadata).thenAccept(this::pushUpdate);
+    }
+
+    /**
+     * Push an update to the connected client.
+     */
+    public void pushUpdate(ScalableTopicLayoutResponse response) {
+        if (closed) {
+            return;
+        }
+        ScalableTopicDAG dag = buildDagProto(response);
+        log.info().attr("epoch", response.epoch()).log("Pushing DAG update");
+        cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId, 
dag));
+    }
+
+    private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse 
response) {
+        ScalableTopicDAG dag = new ScalableTopicDAG();
+        dag.setEpoch(response.epoch());
+
+        for (var entry : response.segments().entrySet()) {
+            SegmentInfo seg = entry.getValue();
+            SegmentInfoProto segProto = dag.addSegment();
+            segProto.setSegmentId(seg.segmentId());
+            segProto.setHashStart(seg.hashRange().start());
+            segProto.setHashEnd(seg.hashRange().end());
+            segProto.setState(seg.isActive() ? SegmentState.ACTIVE : 
SegmentState.SEALED);
+            for (int i = 0; i < seg.parentIds().size(); i++) {
+                segProto.addParentId(seg.parentIds().get(i));
+            }
+            for (int i = 0; i < seg.childIds().size(); i++) {
+                segProto.addChildId(seg.childIds().get(i));
+            }
+            segProto.setCreatedAtEpoch(seg.createdAtEpoch());
+            if (seg.sealedAtEpoch() >= 0) {
+                segProto.setSealedAtEpoch(seg.sealedAtEpoch());
+            }
+        }
+
+        // Add broker addresses for active segments
+        Map<Long, String> brokerAddresses = response.segmentBrokerAddresses();
+        if (brokerAddresses != null) {
+            for (var entry : brokerAddresses.entrySet()) {
+                SegmentBrokerAddress addr = dag.addSegmentBroker();
+                addr.setSegmentId(entry.getKey());
+                addr.setBrokerUrl(entry.getValue());
+            }
+        }
+
+        return dag;
+    }
+
+    public void close() {
+        closed = true;
+        // Listener is guarded by the closed flag; MetadataStore does not 
support unregister.
+    }
+
+    /**
+     * Build a full layout response with broker addresses resolved.
+     */
+    private CompletableFuture<ScalableTopicLayoutResponse> 
buildResponse(ScalableTopicMetadata metadata) {
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        // Resolve broker addresses for all active segments
+        CompletableFuture<Map<Long, String>> brokersFuture = 
resolveSegmentBrokers(layout);
+
+        // Resolve controller broker address
+        CompletableFuture<Optional<String>> controllerFuture =
+                readControllerBrokerUrl();
+
+        return brokersFuture.thenCombine(controllerFuture, (segmentBrokers, 
controllerUrl) ->
+                new ScalableTopicLayoutResponse(
+                        layout.getEpoch(),
+                        layout.getAllSegments(),
+                        segmentBrokers,
+                        null,
+                        controllerUrl.orElse(null),
+                        null));
+    }
+
+    private CompletableFuture<Map<Long, String>> 
resolveSegmentBrokers(SegmentLayout layout) {
+        Map<Long, String> result = new LinkedHashMap<>();
+        CompletableFuture<?>[] futures = 
layout.getActiveSegments().values().stream()
+                .map(segment -> {
+                    // Resolve which broker owns this segment's underlying 
segment:// topic
+                    TopicName segTn = 
org.apache.pulsar.common.scalable.SegmentTopicName.fromParent(
+                            topicName, segment.hashRange(), 
segment.segmentId());
+                    var lookupOptions = 
org.apache.pulsar.broker.namespace.LookupOptions.builder()
+                            .readOnly(false).authoritative(false).build();
+                    return brokerService.getPulsar().getNamespaceService()
+                            .getBrokerServiceUrlAsync(segTn, lookupOptions)
+                            .thenAccept(optUrl -> 
optUrl.ifPresent(lookupResult -> {
+                                synchronized (result) {
+                                    result.put(segment.segmentId(),
+                                            
lookupResult.getLookupData().getBrokerUrl());
+                                }
+                            }));
+                })
+                .toArray(CompletableFuture[]::new);
+
+        return CompletableFuture.allOf(futures).thenApply(__ -> result);
+    }
+
+    private CompletableFuture<Optional<String>> readControllerBrokerUrl() {
+        String lockPath = resources.controllerLockPath(topicName);
+        return resources.getStore().get(lockPath)
+                .thenCompose(optValue -> {
+                    if (optValue.isEmpty()) {
+                        return 
CompletableFuture.completedFuture(Optional.<String>empty());
+                    }
+                    // The leader-election value is the brokerId of the 
controller leader.
+                    // Resolve it to a pulsar:// service URL via 
NamespaceService so clients
+                    // can connect to the controller broker for scalable-topic 
subscribe.
+                    String brokerId = new String(optValue.get().getValue());
+                    return brokerService.getPulsar().getNamespaceService()
+                            .createLookupResult(brokerId, false, null)
+                            .thenApply(lookupResult ->
+                                    
Optional.ofNullable(lookupResult.getLookupData().getBrokerUrl()))
+                            .exceptionally(ex -> {
+                                log.warn().attr("brokerId", 
brokerId).exceptionMessage(ex)
+                                        .log("Failed to resolve controller 
broker");
+                                return Optional.<String>empty();
+                            });
+                });
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index c151df6e81b..dbb6b32aec7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -172,6 +172,29 @@ public class ScalableTopicService {
                 .thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
     }
 
+    /**
+     * Register a scalable consumer with the controller leader for {@code 
topic}.
+     * Persists a durable session and returns the consumer's segment 
assignment.
+     */
+    public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName 
topic, String subscription,
+                                                                   String 
consumerName, long consumerId,
+                                                                   
org.apache.pulsar.broker.service.TransportCnx cnx) {
+        return getOrCreateController(topic)
+                .thenCompose(controller -> 
controller.registerConsumer(subscription, consumerName, consumerId, cnx));
+    }
+
+    /**
+     * Called when a scalable consumer's transport connection drops. Forwards 
to the
+     * controller which marks the session disconnected and starts its grace 
timer.
+     * No-op if the controller is not held locally.
+     */
+    public void onConsumerDisconnect(TopicName topic, String subscription, 
String consumerName) {
+        ScalableTopicController controller = controllers.get(topic.toString());
+        if (controller != null) {
+            controller.onConsumerDisconnect(subscription, consumerName);
+        }
+    }
+
     // --- Internal helpers ---
 
     private void onLeaderStateChange(TopicName topic, LeaderElectionState 
state) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ConsumerSessionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ConsumerSessionTest.java
new file mode 100644
index 00000000000..15e2b10ba30
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ConsumerSessionTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.PulsarCommandSender;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+public class ConsumerSessionTest {
+
+    private static final Duration GRACE = Duration.ofMillis(100);
+    private static final Logger PARENT_LOG = 
Logger.get(ConsumerSessionTest.class);
+
+    /** Convenience wrapper: fresh mock scheduler that returns a mock 
ScheduledFuture. */
+    private static final class TestContext {
+        final ScheduledExecutorService scheduler = 
mock(ScheduledExecutorService.class);
+        final ScheduledFuture<?> future = mock(ScheduledFuture.class);
+        final Runnable onGraceExpiry = mock(Runnable.class);
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        TestContext() {
+            // Every schedule(...) call returns the same mock future so we can 
verify cancel.
+            when(scheduler.schedule(any(Runnable.class), anyLong(), 
any(TimeUnit.class)))
+                    .thenReturn((ScheduledFuture) future);
+        }
+
+        ConsumerSession session(String name, long consumerId, TransportCnx 
cnx) {
+            return new ConsumerSession(name, consumerId, cnx, GRACE, 
scheduler, onGraceExpiry, PARENT_LOG);
+        }
+
+        ConsumerSession restored(String name) {
+            return ConsumerSession.restored(name, GRACE, scheduler, 
onGraceExpiry, PARENT_LOG);
+        }
+    }
+
+    private static ConsumerAssignment buildAssignment(long epoch, long... 
segmentIds) {
+        List<ConsumerAssignment.AssignedSegment> segments = new 
java.util.ArrayList<>();
+        for (int i = 0; i < segmentIds.length; i++) {
+            long id = segmentIds[i];
+            int start = i * 0x4000;
+            int end = start + 0x3FFF;
+            segments.add(new ConsumerAssignment.AssignedSegment(
+                    id, HashRange.of(start, end),
+                    "persistent://tenant/ns/my-scalable-seg-" + id));
+        }
+        return new ConsumerAssignment(epoch, segments);
+    }
+
+    // --- Construction / identity ---
+
+    @Test
+    public void testConstructionWithConnectionMarksConnected() {
+        TestContext ctx = new TestContext();
+        TransportCnx cnx = mock(TransportCnx.class);
+
+        ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+        assertEquals(session.getConsumerName(), "c1");
+        assertEquals(session.getConsumerId(), 10L);
+        assertSame(session.getCnx(), cnx);
+        assertTrue(session.isConnected());
+        assertNull(session.getGraceTimer(), "no timer is armed until 
markDisconnected");
+        verify(ctx.scheduler, never())
+                .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+    }
+
+    @Test
+    public void testRestoredStartsDisconnectedWithGraceTimerArmed() {
+        TestContext ctx = new TestContext();
+
+        ConsumerSession session = ctx.restored("c-restored");
+
+        assertEquals(session.getConsumerName(), "c-restored");
+        assertEquals(session.getConsumerId(), -1L);
+        assertNull(session.getCnx());
+        assertFalse(session.isConnected());
+        // restored() arms the grace timer internally so callers can't forget 
to schedule it.
+        assertNotNull(session.getGraceTimer(), "restored() must arm the grace 
timer");
+        verify(ctx.scheduler, times(1))
+                .schedule(eq(ctx.onGraceExpiry), eq(GRACE.toMillis()), 
eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testEqualsAndHashCodeOnConsumerNameOnly() {
+        TestContext ctx = new TestContext();
+        TransportCnx cnx1 = mock(TransportCnx.class);
+        TransportCnx cnx2 = mock(TransportCnx.class);
+        ConsumerSession a = ctx.session("same", 1L, cnx1);
+        ConsumerSession b = ctx.session("same", 2L, cnx2);
+        ConsumerSession c = ctx.session("other", 1L, cnx1);
+
+        assertEquals(a, b, "sessions with the same consumerName must be 
equal");
+        assertEquals(a.hashCode(), b.hashCode());
+        assertFalse(a.equals(c));
+    }
+
+    // --- attach / markDisconnected ---
+
+    @Test
+    public void testAttachUpdatesIdAndCnxAndMarksConnected() {
+        TestContext ctx = new TestContext();
+        ConsumerSession session = ctx.restored("c1");
+        TransportCnx fresh = mock(TransportCnx.class);
+
+        session.attach(77L, fresh);
+
+        assertEquals(session.getConsumerId(), 77L);
+        assertSame(session.getCnx(), fresh);
+        assertTrue(session.isConnected());
+    }
+
+    @Test
+    public void testAttachCancelsPendingGraceTimer() {
+        TestContext ctx = new TestContext();
+        // restored() arms the timer via the scheduler mock; attach should 
cancel it.
+        ConsumerSession session = ctx.restored("c1");
+        assertSame(session.getGraceTimer(), ctx.future);
+
+        session.attach(1L, mock(TransportCnx.class));
+
+        verify(ctx.future).cancel(false);
+        assertNull(session.getGraceTimer());
+    }
+
+    @Test
+    public void testMarkDisconnectedClearsCnxKeepsConsumerIdAndArmsTimer() {
+        TestContext ctx = new TestContext();
+        TransportCnx cnx = mock(TransportCnx.class);
+        ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+        session.markDisconnected();
+
+        assertFalse(session.isConnected());
+        assertNull(session.getCnx());
+        assertEquals(session.getConsumerId(), 10L, "consumerId is preserved 
until reattach");
+        assertSame(session.getGraceTimer(), ctx.future, "markDisconnected arms 
the grace timer");
+        verify(ctx.scheduler, times(1))
+                .schedule(eq(ctx.onGraceExpiry), eq(GRACE.toMillis()), 
eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testMarkDisconnectedTwiceCancelsPreviousTimer() {
+        // startGraceTimer cancels any prior timer before scheduling a new 
one; a double
+        // markDisconnected (edge case) should leave only the latest timer 
armed.
+        TestContext ctx = new TestContext();
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        ScheduledFuture<?> second = mock(ScheduledFuture.class);
+        when(ctx.scheduler.schedule(any(Runnable.class), anyLong(), 
any(TimeUnit.class)))
+                .thenReturn((ScheduledFuture) ctx.future, (ScheduledFuture) 
second);
+
+        ConsumerSession session = ctx.session("c1", 10L, 
mock(TransportCnx.class));
+
+        session.markDisconnected();
+        session.markDisconnected();
+
+        verify(ctx.future).cancel(false);
+        assertSame(session.getGraceTimer(), second);
+    }
+
+    @Test
+    public void testCancelGraceTimerIsIdempotent() {
+        TestContext ctx = new TestContext();
+        ConsumerSession session = ctx.restored("c1");
+        assertSame(session.getGraceTimer(), ctx.future);
+
+        session.cancelGraceTimer();
+        session.cancelGraceTimer(); // second call is a no-op
+
+        verify(ctx.future, times(1)).cancel(false);
+        assertNull(session.getGraceTimer());
+    }
+
+    // --- sendAssignmentUpdate ---
+
+    @Test
+    public void testSendAssignmentUpdateWritesViaCommandSender() {
+        TestContext ctx = new TestContext();
+        TransportCnx cnx = mock(TransportCnx.class);
+        PulsarCommandSender sender = mock(PulsarCommandSender.class);
+        when(cnx.getCommandSender()).thenReturn(sender);
+        ConsumerSession session = ctx.session("c1", 42L, cnx);
+
+        session.sendAssignmentUpdate(buildAssignment(5L, 0L, 1L));
+
+        ArgumentCaptor<ScalableConsumerAssignment> captor =
+                ArgumentCaptor.forClass(ScalableConsumerAssignment.class);
+        verify(sender).sendScalableTopicAssignmentUpdate(eq(42L), 
captor.capture());
+        ScalableConsumerAssignment proto = captor.getValue();
+        assertEquals(proto.getLayoutEpoch(), 5L);
+        assertEquals(proto.getSegmentsCount(), 2);
+        assertEquals(proto.getSegmentAt(0).getSegmentId(), 0L);
+        assertEquals(proto.getSegmentAt(1).getSegmentId(), 1L);
+    }
+
+    @Test
+    public void testSendAssignmentUpdateIsNoopWhenDisconnected() {
+        TestContext ctx = new TestContext();
+        ConsumerSession session = ctx.restored("c1");
+        // restored() leaves the session disconnected with no cnx — should not 
NPE, should not call anything.
+        session.sendAssignmentUpdate(buildAssignment(0L, 0L));
+        // no mocks to verify — just ensure no exception
+    }
+
+    @Test
+    public void testSendAssignmentUpdateIsNoopAfterMarkDisconnected() {
+        TestContext ctx = new TestContext();
+        TransportCnx cnx = mock(TransportCnx.class);
+        PulsarCommandSender sender = mock(PulsarCommandSender.class);
+        when(cnx.getCommandSender()).thenReturn(sender);
+        ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+        session.markDisconnected();
+        session.sendAssignmentUpdate(buildAssignment(0L, 0L));
+
+        verify(sender, never()).sendScalableTopicAssignmentUpdate(anyLong(), 
any());
+    }
+
+    @Test
+    public void testSendAssignmentUpdateIsNoopWhenCommandSenderNull() {
+        TestContext ctx = new TestContext();
+        TransportCnx cnx = mock(TransportCnx.class);
+        when(cnx.getCommandSender()).thenReturn(null); // connection tearing 
down
+        ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+        // no exception, no interaction
+        session.sendAssignmentUpdate(buildAssignment(0L, 0L));
+    }
+
+    // --- toProto ---
+
+    @Test
+    public void testToProtoConvertsAllFields() {
+        ConsumerAssignment assignment = buildAssignment(9L, 3L, 4L, 5L);
+
+        ScalableConsumerAssignment proto = ConsumerSession.toProto(assignment);
+
+        assertEquals(proto.getLayoutEpoch(), 9L);
+        assertEquals(proto.getSegmentsCount(), 3);
+        // hash ranges are derived in buildAssignment as 0x4000-wide 
contiguous ranges
+        assertEquals(proto.getSegmentAt(0).getSegmentId(), 3L);
+        assertEquals(proto.getSegmentAt(0).getHashStart(), 0x0000);
+        assertEquals(proto.getSegmentAt(0).getHashEnd(), 0x3FFF);
+        assertEquals(proto.getSegmentAt(0).getSegmentTopic(),
+                "persistent://tenant/ns/my-scalable-seg-3");
+        assertEquals(proto.getSegmentAt(1).getSegmentId(), 4L);
+        assertEquals(proto.getSegmentAt(1).getHashStart(), 0x4000);
+        assertEquals(proto.getSegmentAt(2).getSegmentId(), 5L);
+        assertEquals(proto.getSegmentAt(2).getHashStart(), 0x8000);
+    }
+
+    @Test
+    public void testToProtoHandlesEmptyAssignment() {
+        ConsumerAssignment empty = new ConsumerAssignment(0L, List.of());
+
+        ScalableConsumerAssignment proto = ConsumerSession.toProto(empty);
+
+        assertEquals(proto.getLayoutEpoch(), 0L);
+        assertEquals(proto.getSegmentsCount(), 0);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
new file mode 100644
index 00000000000..9f423413003
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentState;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Focused unit tests for {@link DagWatchSession}.
+ *
+ * <p>The deep namespace-lookup codepath inside {@code start()} (resolving 
per-segment
+ * brokers and the controller broker) is covered by integration tests; these 
unit tests
+ * stay in the parts that don't need a live {@code NamespaceService}: session 
lifecycle,
+ * notification filtering, and the DAG proto built by {@link 
DagWatchSession#pushUpdate}.
+ */
+public class DagWatchSessionTest {
+
+    private static final String TOPIC_PATH = 
"/admin/scalable-topics/tenant/ns/my-scalable";
+    private static final TopicName TOPIC = 
TopicName.get("topic://tenant/ns/my-scalable");
+    private static final long SESSION_ID = 42L;
+
+    private ScalableTopicResources resources;
+    private ServerCnx cnx;
+    private ChannelHandlerContext ctx;
+    private BrokerService brokerService;
+    private DagWatchSession session;
+
+    @BeforeMethod
+    public void setup() {
+        resources = mock(ScalableTopicResources.class);
+        cnx = mock(ServerCnx.class);
+        ctx = mock(ChannelHandlerContext.class);
+        brokerService = mock(BrokerService.class);
+
+        when(resources.topicPath(TOPIC)).thenReturn(TOPIC_PATH);
+        when(cnx.ctx()).thenReturn(ctx);
+        // Default: metadata store has a valid registerListener hook.
+        var store = mock(org.apache.pulsar.metadata.api.MetadataStore.class);
+        when(resources.getStore()).thenReturn(store);
+
+        session = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, 
brokerService);
+    }
+
+    // --- identity / lifecycle ---
+
+    @Test
+    public void testSessionIdIsPreserved() {
+        assertEquals(session.getSessionId(), SESSION_ID);
+    }
+
+    @Test
+    public void testCloseIsIdempotent() {
+        session.close();
+        session.close(); // must not throw
+    }
+
+    // --- start() ---
+
+    @Test
+    public void testStartFailsWhenTopicMetadataMissing() {
+        when(resources.getScalableTopicMetadataAsync(TOPIC, true))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        CompletableFuture<ScalableTopicLayoutResponse> future = 
session.start();
+
+        assertTrue(future.isDone());
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.get();
+            fail("expected failure");
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            fail("interrupted");
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue(cause instanceof IllegalStateException, "got: " + 
cause);
+            assertTrue(cause.getMessage().contains("not found"), 
cause.getMessage());
+        }
+    }
+
+    @Test
+    public void testStartRegistersMetadataStoreListener() {
+        // Regardless of outcome, start() should wire up a notification 
listener so that
+        // subsequent metadata changes flow into the session.
+        when(resources.getScalableTopicMetadataAsync(TOPIC, true))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        session.start();
+
+        verify(resources.getStore()).registerListener(any());
+    }
+
+    // --- onNotification filtering ---
+
+    @Test
+    public void testNotificationForUnrelatedPathIsIgnored() {
+        session.onNotification(new Notification(NotificationType.Modified, 
"/some/other/path"));
+
+        verify(resources, never()).getScalableTopicMetadataAsync(any(), 
anyBoolean());
+    }
+
+    @Test
+    public void testDeletedNotificationIsIgnored() {
+        session.onNotification(new Notification(NotificationType.Deleted, 
TOPIC_PATH));
+
+        verify(resources, never()).getScalableTopicMetadataAsync(any(), 
anyBoolean());
+    }
+
+    @Test
+    public void testNotificationAfterCloseIsIgnored() {
+        session.close();
+        session.onNotification(new Notification(NotificationType.Modified, 
TOPIC_PATH));
+
+        verify(resources, never()).getScalableTopicMetadataAsync(any(), 
anyBoolean());
+    }
+
+    @Test
+    public void testNotificationOnMatchingPathTriggersReload() {
+        // Return an empty optional so we stop before the NamespaceService 
calls inside
+        // buildResponse — we only care that the reload was kicked off.
+        when(resources.getScalableTopicMetadataAsync(TOPIC, true))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        session.onNotification(new Notification(NotificationType.Modified, 
TOPIC_PATH));
+
+        verify(resources, times(1)).getScalableTopicMetadataAsync(TOPIC, true);
+    }
+
+    // --- pushUpdate ---
+
+    @Test
+    public void testPushUpdateWritesDagToConnection() {
+        ScalableTopicLayoutResponse response = buildSampleResponse(
+                7L,
+                Map.of(
+                        0L, seg(0, 0x0000, 0x7FFF, SegmentState.SEALED, new 
long[]{}, new long[]{2L, 3L}, 0L, 5L),
+                        2L, seg(2, 0x0000, 0x3FFF, SegmentState.ACTIVE, new 
long[]{0L}, new long[]{}, 7L, -1L),
+                        3L, seg(3, 0x4000, 0x7FFF, SegmentState.ACTIVE, new 
long[]{0L}, new long[]{}, 7L, -1L)),
+                Map.of(
+                        2L, "pulsar://broker-a:6650",
+                        3L, "pulsar://broker-b:6650"));
+
+        session.pushUpdate(response);
+
+        ArgumentCaptor<ByteBuf> captor = 
ArgumentCaptor.forClass(ByteBuf.class);
+        verify(ctx).writeAndFlush(captor.capture());
+
+        BaseCommand cmd = parseFrame(captor.getValue());
+        assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
+        assertEquals(cmd.getScalableTopicUpdate().getSessionId(), SESSION_ID);
+
+        var dag = cmd.getScalableTopicUpdate().getDag();
+        assertEquals(dag.getEpoch(), 7L);
+        assertEquals(dag.getSegmentsCount(), 3);
+
+        // sealed parent should not have parentIds; its childIds should be 2, 3
+        var parent = findSegment(dag, 0L);
+        assertEquals(parent.getState(), 
org.apache.pulsar.common.api.proto.SegmentState.SEALED);
+        assertEquals(parent.getChildIdsCount(), 2);
+        assertEquals(parent.getChildIdAt(0), 2L);
+        assertEquals(parent.getChildIdAt(1), 3L);
+        assertEquals(parent.getCreatedAtEpoch(), 0L);
+        assertEquals(parent.getSealedAtEpoch(), 5L);
+
+        // active children should reference parent 0
+        var childA = findSegment(dag, 2L);
+        assertEquals(childA.getState(), 
org.apache.pulsar.common.api.proto.SegmentState.ACTIVE);
+        assertEquals(childA.getParentIdsCount(), 1);
+        assertEquals(childA.getParentIdAt(0), 0L);
+        assertEquals(childA.getCreatedAtEpoch(), 7L);
+        // sealedAtEpoch is only written when non-negative
+        assertTrue(!childA.hasSealedAtEpoch() || childA.getSealedAtEpoch() == 
0,
+                "active segment should not have sealedAtEpoch set");
+
+        // broker addresses only for the 2 active segments
+        assertEquals(dag.getSegmentBrokersCount(), 2);
+        Map<Long, String> brokerAddrs = new LinkedHashMap<>();
+        for (int i = 0; i < dag.getSegmentBrokersCount(); i++) {
+            brokerAddrs.put(dag.getSegmentBrokerAt(i).getSegmentId(),
+                    dag.getSegmentBrokerAt(i).getBrokerUrl());
+        }
+        assertEquals(brokerAddrs.get(2L), "pulsar://broker-a:6650");
+        assertEquals(brokerAddrs.get(3L), "pulsar://broker-b:6650");
+    }
+
+    @Test
+    public void testPushUpdateAfterCloseIsNoop() {
+        ScalableTopicLayoutResponse response = buildSampleResponse(
+                0L,
+                Map.of(0L, seg(0, 0x0000, 0xFFFF, SegmentState.ACTIVE, new 
long[]{}, new long[]{}, 0L, -1L)),
+                Map.of());
+
+        session.close();
+        session.pushUpdate(response);
+
+        verify(ctx, never()).writeAndFlush(any());
+    }
+
+    @Test
+    public void testPushUpdateWithNullBrokerAddressMapOmitsBrokerSection() {
+        // buildDagProto guards against a null address map (e.g., when 
upstream namespace
+        // lookup short-circuits) and should not throw.
+        ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse(
+                1L,
+                Map.of(0L, seg(0, 0x0000, 0xFFFF, SegmentState.ACTIVE, new 
long[]{}, new long[]{}, 0L, -1L)),
+                null, null, null, null);
+
+        session.pushUpdate(response);
+
+        ArgumentCaptor<ByteBuf> captor = 
ArgumentCaptor.forClass(ByteBuf.class);
+        verify(ctx).writeAndFlush(captor.capture());
+        BaseCommand cmd = parseFrame(captor.getValue());
+        
assertEquals(cmd.getScalableTopicUpdate().getDag().getSegmentBrokersCount(), 0);
+    }
+
+    // --- onMetadataChanged ---
+
+    @Test
+    public void testOnMetadataChangedAfterCloseIsNoop() {
+        session.close();
+        // Build a minimal metadata object; close should short-circuit before 
any work runs.
+        ScalableTopicMetadata md = 
ScalableTopicController.createInitialMetadata(1, Map.of());
+        session.onMetadataChanged(md);
+
+        verify(ctx, never()).writeAndFlush(any());
+    }
+
+    // ==== helpers ====
+
+    private static SegmentInfo seg(long id, int start, int end, SegmentState 
state,
+                                   long[] parents, long[] children,
+                                   long createdAt, long sealedAt) {
+        return new SegmentInfo(
+                id,
+                HashRange.of(start, end),
+                state,
+                toList(parents),
+                toList(children),
+                createdAt,
+                sealedAt);
+    }
+
+    private static java.util.List<Long> toList(long[] arr) {
+        java.util.List<Long> out = new java.util.ArrayList<>(arr.length);
+        for (long v : arr) {
+            out.add(v);
+        }
+        return out;
+    }
+
+    private static ScalableTopicLayoutResponse buildSampleResponse(
+            long epoch,
+            Map<Long, SegmentInfo> segments,
+            Map<Long, String> brokerAddrs) {
+        return new ScalableTopicLayoutResponse(epoch, segments, brokerAddrs, 
null, null, null);
+    }
+
+    private static org.apache.pulsar.common.api.proto.SegmentInfoProto 
findSegment(
+            org.apache.pulsar.common.api.proto.ScalableTopicDAG dag, long id) {
+        for (int i = 0; i < dag.getSegmentsCount(); i++) {
+            if (dag.getSegmentAt(i).getSegmentId() == id) {
+                return dag.getSegmentAt(i);
+            }
+        }
+        fail("segment " + id + " not found");
+        return null;
+    }
+
+    private static BaseCommand parseFrame(ByteBuf frame) {
+        assertNotNull(frame);
+        try {
+            frame.skipBytes(4); // total size
+            int cmdSize = (int) frame.readUnsignedInt();
+            BaseCommand cmd = new BaseCommand();
+            cmd.parseFrom(frame, cmdSize);
+            // materialize() copies fields out of the backing buffer so it's 
safe to
+            // release the frame before the caller reads fields back.
+            cmd.materialize();
+            return cmd;
+        } finally {
+            frame.release();
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 0c8bf66ae25..7ab0b4f0239 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -160,6 +160,12 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
+    @Getter(AccessLevel.PACKAGE)
+    private final ConcurrentLongHashMap<DagWatchSession> dagWatchSessions =
+            ConcurrentLongHashMap.<DagWatchSession>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(1)
+                    .build();
 
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new 
ConcurrentLinkedQueue<>();
@@ -354,12 +360,14 @@ public class ClientCnx extends PulsarHandler {
         consumers.forEach((id, consumer) -> consumer.connectionClosed(this, 
Optional.empty(), Optional.empty()));
         transactionMetaStoreHandlers.forEach((id, handler) -> 
handler.connectionClosed(this));
         topicListWatchers.forEach((__, watcher) -> 
watcher.connectionClosed(this));
+        dagWatchSessions.forEach((__, session) -> session.connectionClosed());
 
         waitingLookupRequests.clear();
 
         producers.clear();
         consumers.clear();
         topicListWatchers.clear();
+        dagWatchSessions.clear();
 
         timeoutTask.cancel(true);
     }
@@ -1306,6 +1314,43 @@ public class ClientCnx extends PulsarHandler {
         }
     }
 
+    @Override
+    protected void handleCommandScalableTopicUpdate(
+            org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate cmd) 
{
+        checkArgument(state == State.Ready);
+
+        long sessionId = cmd.getSessionId();
+        log.debug().attr("sessionId", sessionId).log("Received 
scalableTopicUpdate");
+
+        if (cmd.hasError()) {
+            // Error response for the initial lookup
+            DagWatchSession session = dagWatchSessions.remove(sessionId);
+            if (session != null) {
+                session.onError(cmd.getError(), cmd.hasMessage() ? 
cmd.getMessage() : null);
+            } else {
+                log.warn().attr("sessionId", sessionId)
+                        .log("Received scalable topic error for unknown 
session");
+            }
+            return;
+        }
+
+        DagWatchSession session = dagWatchSessions.get(sessionId);
+        if (session != null) {
+            session.onUpdate(cmd.getDag());
+        } else {
+            log.warn().attr("sessionId", sessionId)
+                    .log("Received scalable topic update for unknown session");
+        }
+    }
+
+    public void registerDagWatchSession(long sessionId, DagWatchSession 
session) {
+        dagWatchSessions.put(sessionId, session);
+    }
+
+    public void removeDagWatchSession(long sessionId) {
+        dagWatchSessions.remove(sessionId);
+    }
+
     /**
      * check serverError and take appropriate action.
      * <ul>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
new file mode 100644
index 00000000000..f48681ff6d0
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
+import org.apache.pulsar.common.api.proto.ServerError;
+
+/**
+ * Callback interface for receiving DAG watch session updates from the broker.
+ * Implemented by the v5 client's DagWatchClient.
+ */
+public interface DagWatchSession {
+
+    /**
+     * Called when the broker sends a DAG update for this session.
+     */
+    void onUpdate(ScalableTopicDAG dag);
+
+    /**
+     * Called when the broker sends an error for this session.
+     */
+    void onError(ServerError error, String message);
+
+    /**
+     * Called when the connection to the broker is closed.
+     * Implementations should fail any pending operations and optionally 
reconnect.
+     */
+    void connectionClosed();
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 8c6289b944d..c805164ef92 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -86,6 +86,9 @@ import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataRespons
 import org.apache.pulsar.common.api.proto.CommandProducer;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import 
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate;
+import 
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
 import org.apache.pulsar.common.api.proto.CommandSeek;
 import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -102,6 +105,8 @@ import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
 import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
@@ -1670,6 +1675,77 @@ public class Commands {
         return cmd;
     }
 
+    // --- Scalable topic commands ---
+
+    public static ByteBuf newScalableTopicLookup(long sessionId, String topic) 
{
+        BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_LOOKUP);
+        cmd.setScalableTopicLookup()
+                .setSessionId(sessionId)
+                .setTopic(topic);
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newScalableTopicClose(long sessionId) {
+        BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_CLOSE);
+        cmd.setScalableTopicClose()
+                .setSessionId(sessionId);
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newScalableTopicUpdate(long sessionId, 
ScalableTopicDAG dag) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
+        CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate()
+                .setSessionId(sessionId);
+        update.setDag().copyFrom(dag);
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newScalableTopicError(long sessionId, ServerError 
error, String message) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
+        cmd.setScalableTopicUpdate()
+                .setSessionId(sessionId)
+                .setError(error)
+                .setMessage(message);
+        return serializeWithSize(cmd);
+    }
+
+    /**
+     * Broker -> Client: response to a scalable-topic subscribe request. On 
success the
+     * caller must populate the nested {@link ScalableConsumerAssignment} via
+     * {@code response.setAssignment()} before serializing; on failure the 
error and
+     * message should be set instead.
+     */
+    public static ByteBuf newScalableTopicSubscribeResponse(long requestId,
+                                                             
ScalableConsumerAssignment assignment) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+        CommandScalableTopicSubscribeResponse response = 
cmd.setScalableTopicSubscribeResponse()
+                .setRequestId(requestId);
+        response.setAssignment().copyFrom(assignment);
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newScalableTopicSubscribeError(long requestId, 
ServerError error, String message) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+        cmd.setScalableTopicSubscribeResponse()
+                .setRequestId(requestId)
+                .setError(error)
+                .setMessage(message);
+        return serializeWithSize(cmd);
+    }
+
+    /**
+     * Broker -> Client: push a new segment assignment to a 
previously-subscribed scalable
+     * consumer after a rebalance.
+     */
+    public static ByteBuf newScalableTopicAssignmentUpdate(long consumerId,
+                                                            
ScalableConsumerAssignment assignment) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.SCALABLE_TOPIC_ASSIGNMENT_UPDATE);
+        CommandScalableTopicAssignmentUpdate update = 
cmd.setScalableTopicAssignmentUpdate()
+                .setConsumerId(consumerId);
+        update.setAssignment().copyFrom(assignment);
+        return serializeWithSize(cmd);
+    }
+
     public static ByteBuf serializeWithSize(BaseCommand cmd) {
         return serializeWithPrecalculatedSerializedSize(cmd, 
cmd.getSerializedSize());
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 595d75e8801..0eb2b1b9670 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -72,6 +72,12 @@ import org.apache.pulsar.common.api.proto.CommandProducer;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic;
 import 
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicClose;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe;
+import 
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
 import org.apache.pulsar.common.api.proto.CommandSeek;
 import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSendError;
@@ -478,6 +484,36 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 handleCommandWatchTopicListClose(cmd.getWatchTopicListClose());
                 break;
 
+            case SCALABLE_TOPIC_LOOKUP:
+                checkArgument(cmd.hasScalableTopicLookup());
+                handleCommandScalableTopicLookup(cmd.getScalableTopicLookup());
+                break;
+
+            case SCALABLE_TOPIC_UPDATE:
+                checkArgument(cmd.hasScalableTopicUpdate());
+                handleCommandScalableTopicUpdate(cmd.getScalableTopicUpdate());
+                break;
+
+            case SCALABLE_TOPIC_CLOSE:
+                checkArgument(cmd.hasScalableTopicClose());
+                handleCommandScalableTopicClose(cmd.getScalableTopicClose());
+                break;
+
+            case SCALABLE_TOPIC_SUBSCRIBE:
+                checkArgument(cmd.hasScalableTopicSubscribe());
+                
handleCommandScalableTopicSubscribe(cmd.getScalableTopicSubscribe());
+                break;
+
+            case SCALABLE_TOPIC_SUBSCRIBE_RESPONSE:
+                checkArgument(cmd.hasScalableTopicSubscribeResponse());
+                
handleCommandScalableTopicSubscribeResponse(cmd.getScalableTopicSubscribeResponse());
+                break;
+
+            case SCALABLE_TOPIC_ASSIGNMENT_UPDATE:
+                checkArgument(cmd.hasScalableTopicAssignmentUpdate());
+                
handleCommandScalableTopicAssignmentUpdate(cmd.getScalableTopicAssignmentUpdate());
+                break;
+
             default:
                 break;
             }
@@ -744,6 +780,33 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleCommandScalableTopicLookup(CommandScalableTopicLookup 
commandScalableTopicLookup) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandScalableTopicUpdate(CommandScalableTopicUpdate 
commandScalableTopicUpdate) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandScalableTopicClose(CommandScalableTopicClose 
commandScalableTopicClose) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandScalableTopicSubscribe(
+            CommandScalableTopicSubscribe commandScalableTopicSubscribe) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandScalableTopicSubscribeResponse(
+            CommandScalableTopicSubscribeResponse 
commandScalableTopicSubscribeResponse) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandScalableTopicAssignmentUpdate(
+            CommandScalableTopicAssignmentUpdate 
commandScalableTopicAssignmentUpdate) {
+        throw new UnsupportedOperationException();
+    }
+
     private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
         NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index ed49f7bc19b..114d083e12f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -834,6 +834,111 @@ message CommandWatchTopicListClose {
     required uint64 watcher_id     = 2;
 }
 
+/// --- Scalable topic commands ---
+
+enum SegmentState {
+    ACTIVE = 0;
+    SEALED = 1;
+}
+
+message SegmentInfoProto {
+    required uint64 segment_id       = 1;
+    required uint32 hash_start       = 2;
+    required uint32 hash_end         = 3;
+    required SegmentState state      = 4;
+    repeated uint64 parent_ids       = 5;
+    repeated uint64 child_ids        = 6;
+    required uint64 created_at_epoch = 7;
+    optional uint64 sealed_at_epoch  = 8;
+}
+
+message SegmentBrokerAddress {
+    required uint64 segment_id     = 1;
+    required string broker_url     = 2;
+    optional string broker_url_tls = 3;
+}
+
+message ScalableTopicDAG {
+    required uint64 epoch                           = 1;
+    repeated SegmentInfoProto segments              = 2;
+    repeated SegmentBrokerAddress segment_brokers   = 3;
+    optional string controller_broker_url           = 4;
+    optional string controller_broker_url_tls       = 5;
+}
+
+// Client -> Broker: Request scalable topic metadata and initiate watch session
+message CommandScalableTopicLookup {
+    required uint64 session_id = 1;   // Client-assigned session ID
+    required string topic      = 2;   // e.g. "topic://tenant/ns/my-topic"
+}
+
+// Broker -> Client: Used for BOTH initial response and subsequent pushed 
updates
+message CommandScalableTopicUpdate {
+    required uint64 session_id         = 1;
+    optional ScalableTopicDAG dag      = 2;
+
+    optional ServerError error         = 3;
+    optional string message            = 4;
+}
+
+// Client -> Broker: Close the DAG watch session
+message CommandScalableTopicClose {
+    required uint64 session_id = 1;
+}
+
+// Kind of scalable consumer registering with the controller leader.
+// QueueConsumer never registers — it attaches directly to all active and 
sealed
+// segment topics — so it does not appear here.
+enum ScalableConsumerType {
+    STREAM     = 0;
+    CHECKPOINT = 1;
+}
+
+// A single segment assigned to a scalable consumer.
+message ScalableAssignedSegment {
+    required uint64 segment_id             = 1;
+    required uint32 hash_start             = 2;
+    required uint32 hash_end               = 3;
+    // Fully-qualified segment:// topic name the consumer should attach to.
+    required string segment_topic          = 4;
+}
+
+// An assignment of active segments to a single consumer. Carries the layout 
epoch
+// it was computed from so the client can reject stale updates.
+message ScalableConsumerAssignment {
+    required uint64 layout_epoch                    = 1;
+    repeated ScalableAssignedSegment segments       = 2;
+}
+
+// Client -> Broker: register as an ordered (Stream) or external (Checkpoint) 
consumer
+// on a scalable topic and request the initial segment assignment. The broker 
leader
+// persists the consumer registration and returns the current assignment.
+message CommandScalableTopicSubscribe {
+    required uint64 request_id                  = 1;
+    required string topic                       = 2;  // e.g. 
"topic://tenant/ns/my-topic"
+    required string subscription                = 3;
+    required string consumer_name               = 4;
+    required uint64 consumer_id                 = 5;
+    required ScalableConsumerType consumer_type = 6;
+}
+
+// Broker -> Client: response to CommandScalableTopicSubscribe. On success, 
carries
+// the initial ScalableConsumerAssignment. On failure, error + message are 
populated
+// and the assignment is absent.
+message CommandScalableTopicSubscribeResponse {
+    required uint64 request_id                 = 1;
+    optional ServerError error                 = 2;
+    optional string message                    = 3;
+    optional ScalableConsumerAssignment assignment = 4;
+}
+
+// Broker -> Client: push a new assignment to a subscribed consumer after a 
rebalance
+// (triggered by a peer joining/leaving the subscription or by a segment 
split/merge).
+message CommandScalableTopicAssignmentUpdate {
+    required uint64 consumer_id                = 1;
+    required ScalableConsumerAssignment assignment = 2;
+}
+
 message CommandGetSchema {
     required uint64 request_id = 1;
     required string topic      = 2;
@@ -1070,6 +1175,14 @@ message BaseCommand {
         WATCH_TOPIC_LIST_CLOSE = 67;
 
         TOPIC_MIGRATED = 68;
+
+        SCALABLE_TOPIC_LOOKUP  = 70;
+        SCALABLE_TOPIC_UPDATE  = 71;
+        SCALABLE_TOPIC_CLOSE   = 72;
+
+        SCALABLE_TOPIC_SUBSCRIBE              = 73;
+        SCALABLE_TOPIC_SUBSCRIBE_RESPONSE     = 74;
+        SCALABLE_TOPIC_ASSIGNMENT_UPDATE      = 75;
     }
 
 
@@ -1153,4 +1266,12 @@ message BaseCommand {
     optional CommandWatchTopicListClose watchTopicListClose = 67;
 
     optional CommandTopicMigrated topicMigrated = 68;
+
+    optional CommandScalableTopicLookup scalableTopicLookup   = 70;
+    optional CommandScalableTopicUpdate scalableTopicUpdate    = 71;
+    optional CommandScalableTopicClose scalableTopicClose      = 72;
+
+    optional CommandScalableTopicSubscribe scalableTopicSubscribe              
       = 73;
+    optional CommandScalableTopicSubscribeResponse 
scalableTopicSubscribeResponse     = 74;
+    optional CommandScalableTopicAssignmentUpdate 
scalableTopicAssignmentUpdate       = 75;
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
new file mode 100644
index 00000000000..9afd76a98e0
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
+import org.apache.pulsar.common.api.proto.SegmentInfoProto;
+import org.apache.pulsar.common.api.proto.SegmentState;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip tests for the {@code Commands.newScalableTopic*} factory methods:
+ * encode a command, reparse the serialized wire frame, and verify the fields
+ * survive the trip.
+ */
+public class CommandsScalableTopicTest {
+
+    /**
+     * Wire format emitted by {@link 
Commands#serializeWithPrecalculatedSerializedSize}:
+     * {@code [4-byte TOTAL_SIZE][4-byte CMD_SIZE][CMD bytes]}. Parse it back 
into a
+     * {@link BaseCommand} for assertions.
+     */
+    private static BaseCommand parseFrame(ByteBuf frame) {
+        try {
+            frame.skipBytes(4); // total size
+            int cmdSize = (int) frame.readUnsignedInt();
+            BaseCommand cmd = new BaseCommand();
+            cmd.parseFrom(frame, cmdSize);
+            // Materialize copies every field out of the backing buffer so 
it's safe to
+            // release {@code frame} before the caller reads fields back.
+            cmd.materialize();
+            return cmd;
+        } finally {
+            frame.release();
+        }
+    }
+
+    @Test
+    public void testNewScalableTopicLookup() {
+        ByteBuf frame = Commands.newScalableTopicLookup(42L, 
"topic://tenant/ns/my-scalable");
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_LOOKUP);
+        assertTrue(cmd.hasScalableTopicLookup());
+        assertEquals(cmd.getScalableTopicLookup().getSessionId(), 42L);
+        assertEquals(cmd.getScalableTopicLookup().getTopic(), 
"topic://tenant/ns/my-scalable");
+    }
+
+    @Test
+    public void testNewScalableTopicClose() {
+        ByteBuf frame = Commands.newScalableTopicClose(99L);
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_CLOSE);
+        assertTrue(cmd.hasScalableTopicClose());
+        assertEquals(cmd.getScalableTopicClose().getSessionId(), 99L);
+    }
+
+    @Test
+    public void testNewScalableTopicUpdate() {
+        ScalableTopicDAG dag = new ScalableTopicDAG().setEpoch(7L);
+        SegmentInfoProto active = dag.addSegment()
+                .setSegmentId(0L)
+                .setHashStart(0x0000)
+                .setHashEnd(0x7FFF)
+                .setState(SegmentState.ACTIVE)
+                .setCreatedAtEpoch(0L);
+        active.addChildId(2L);
+        active.addChildId(3L);
+        dag.addSegment()
+                .setSegmentId(2L)
+                .setHashStart(0x0000)
+                .setHashEnd(0x3FFF)
+                .setState(SegmentState.ACTIVE)
+                .setCreatedAtEpoch(7L)
+                .addParentId(0L);
+        
dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650");
+
+        ByteBuf frame = Commands.newScalableTopicUpdate(77L, dag);
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
+        assertTrue(cmd.hasScalableTopicUpdate());
+        assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 77L);
+        assertFalse(cmd.getScalableTopicUpdate().hasError(),
+                "successful update must not carry an error field");
+
+        ScalableTopicDAG got = cmd.getScalableTopicUpdate().getDag();
+        assertEquals(got.getEpoch(), 7L);
+        assertEquals(got.getSegmentsCount(), 2);
+        SegmentInfoProto parent = got.getSegmentAt(0);
+        assertEquals(parent.getSegmentId(), 0L);
+        assertEquals(parent.getState(), SegmentState.ACTIVE);
+        assertEquals(parent.getChildIdsCount(), 2);
+        assertEquals(parent.getChildIdAt(0), 2L);
+        assertEquals(parent.getChildIdAt(1), 3L);
+
+        SegmentInfoProto child = got.getSegmentAt(1);
+        assertEquals(child.getSegmentId(), 2L);
+        assertEquals(child.getHashStart(), 0x0000);
+        assertEquals(child.getHashEnd(), 0x3FFF);
+        assertEquals(child.getParentIdsCount(), 1);
+        assertEquals(child.getParentIdAt(0), 0L);
+
+        assertEquals(got.getSegmentBrokersCount(), 1);
+        assertEquals(got.getSegmentBrokerAt(0).getSegmentId(), 2L);
+        assertEquals(got.getSegmentBrokerAt(0).getBrokerUrl(), 
"pulsar://broker-a:6650");
+    }
+
+    @Test
+    public void testNewScalableTopicError() {
+        ByteBuf frame = Commands.newScalableTopicError(15L, 
ServerError.TopicNotFound,
+                "Scalable topic not found: topic://t/n/x");
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
+        assertTrue(cmd.hasScalableTopicUpdate());
+        assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 15L);
+        assertTrue(cmd.getScalableTopicUpdate().hasError());
+        assertEquals(cmd.getScalableTopicUpdate().getError(), 
ServerError.TopicNotFound);
+        assertEquals(cmd.getScalableTopicUpdate().getMessage(),
+                "Scalable topic not found: topic://t/n/x");
+    }
+
+    @Test
+    public void testNewScalableTopicSubscribeResponseSuccess() {
+        ScalableConsumerAssignment assignment = new 
ScalableConsumerAssignment().setLayoutEpoch(3L);
+        assignment.addSegment()
+                .setSegmentId(2L)
+                .setHashStart(0x0000)
+                .setHashEnd(0x3FFF)
+                
.setSegmentTopic("persistent://tenant/ns/my-scalable-0000-3fff-0000000000000002");
+        assignment.addSegment()
+                .setSegmentId(3L)
+                .setHashStart(0x4000)
+                .setHashEnd(0x7FFF)
+                
.setSegmentTopic("persistent://tenant/ns/my-scalable-4000-7fff-0000000000000003");
+
+        ByteBuf frame = Commands.newScalableTopicSubscribeResponse(123L, 
assignment);
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), 
BaseCommand.Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+        assertTrue(cmd.hasScalableTopicSubscribeResponse());
+        assertEquals(cmd.getScalableTopicSubscribeResponse().getRequestId(), 
123L);
+        assertFalse(cmd.getScalableTopicSubscribeResponse().hasError());
+
+        ScalableConsumerAssignment got = 
cmd.getScalableTopicSubscribeResponse().getAssignment();
+        assertEquals(got.getLayoutEpoch(), 3L);
+        assertEquals(got.getSegmentsCount(), 2);
+        assertEquals(got.getSegmentAt(0).getSegmentId(), 2L);
+        assertEquals(got.getSegmentAt(0).getHashStart(), 0x0000);
+        assertEquals(got.getSegmentAt(0).getHashEnd(), 0x3FFF);
+        assertEquals(got.getSegmentAt(0).getSegmentTopic(),
+                
"persistent://tenant/ns/my-scalable-0000-3fff-0000000000000002");
+        assertEquals(got.getSegmentAt(1).getSegmentId(), 3L);
+    }
+
+    @Test
+    public void testNewScalableTopicSubscribeError() {
+        ByteBuf frame = Commands.newScalableTopicSubscribeError(456L,
+                ServerError.AuthorizationError, "not authorized");
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), 
BaseCommand.Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+        assertTrue(cmd.hasScalableTopicSubscribeResponse());
+        assertEquals(cmd.getScalableTopicSubscribeResponse().getRequestId(), 
456L);
+        assertTrue(cmd.getScalableTopicSubscribeResponse().hasError());
+        assertEquals(cmd.getScalableTopicSubscribeResponse().getError(), 
ServerError.AuthorizationError);
+        assertEquals(cmd.getScalableTopicSubscribeResponse().getMessage(), 
"not authorized");
+    }
+
+    @Test
+    public void testNewScalableTopicAssignmentUpdate() {
+        ScalableConsumerAssignment assignment = new 
ScalableConsumerAssignment().setLayoutEpoch(11L);
+        assignment.addSegment()
+                .setSegmentId(5L)
+                .setHashStart(0x8000)
+                .setHashEnd(0xBFFF)
+                .setSegmentTopic("persistent://t/n/seg-5");
+
+        ByteBuf frame = Commands.newScalableTopicAssignmentUpdate(789L, 
assignment);
+        BaseCommand cmd = parseFrame(frame);
+
+        assertEquals(cmd.getType(), 
BaseCommand.Type.SCALABLE_TOPIC_ASSIGNMENT_UPDATE);
+        assertTrue(cmd.hasScalableTopicAssignmentUpdate());
+        assertEquals(cmd.getScalableTopicAssignmentUpdate().getConsumerId(), 
789L);
+
+        ScalableConsumerAssignment got = 
cmd.getScalableTopicAssignmentUpdate().getAssignment();
+        assertNotNull(got);
+        assertEquals(got.getLayoutEpoch(), 11L);
+        assertEquals(got.getSegmentsCount(), 1);
+        assertEquals(got.getSegmentAt(0).getSegmentId(), 5L);
+        assertEquals(got.getSegmentAt(0).getHashStart(), 0x8000);
+        assertEquals(got.getSegmentAt(0).getHashEnd(), 0xBFFF);
+        assertEquals(got.getSegmentAt(0).getSegmentTopic(), 
"persistent://t/n/seg-5");
+    }
+
+    /**
+     * Different session IDs must not collide — the wire layout must preserve 
them
+     * independently across successive calls.
+     */
+    @Test
+    public void testSessionIdIsolation() {
+        BaseCommand a = parseFrame(Commands.newScalableTopicLookup(1L, "a"));
+        BaseCommand b = 
parseFrame(Commands.newScalableTopicLookup(Long.MAX_VALUE, "b"));
+        assertEquals(a.getScalableTopicLookup().getSessionId(), 1L);
+        assertEquals(b.getScalableTopicLookup().getSessionId(), 
Long.MAX_VALUE);
+        assertEquals(a.getScalableTopicLookup().getTopic(), "a");
+        assertEquals(b.getScalableTopicLookup().getTopic(), "b");
+    }
+}


Reply via email to