This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 597eb0daa2c [feat] PIP-468: Add scalable topic protocol commands and
connection handling (#25564)
597eb0daa2c is described below
commit 597eb0daa2ca155cdf3d1bb0c2ff63c637e5ab41
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 24 14:46:54 2026 -0700
[feat] PIP-468: Add scalable topic protocol commands and connection
handling (#25564)
---
.../pulsar/broker/service/PulsarCommandSender.java | 18 ++
.../broker/service/PulsarCommandSenderImpl.java | 17 ++
.../apache/pulsar/broker/service/ServerCnx.java | 176 +++++++++++
.../broker/service/scalable/ConsumerSession.java | 38 ++-
.../broker/service/scalable/DagWatchSession.java | 248 ++++++++++++++++
.../service/scalable/ScalableTopicService.java | 23 ++
.../service/scalable/ConsumerSessionTest.java | 299 +++++++++++++++++++
.../service/scalable/DagWatchSessionTest.java | 330 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 45 +++
.../apache/pulsar/client/impl/DagWatchSession.java | 45 +++
.../apache/pulsar/common/protocol/Commands.java | 76 +++++
.../pulsar/common/protocol/PulsarDecoder.java | 63 ++++
pulsar-common/src/main/proto/PulsarApi.proto | 121 ++++++++
.../common/protocol/CommandsScalableTopicTest.java | 234 +++++++++++++++
14 files changed, 1729 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index ca80ca49d76..3a8ad10f8c6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -107,4 +108,21 @@ public interface PulsarCommandSender {
List<String> newTopics, List<String>
deletedTopics, String topicsHash,
Function<Throwable, CompletableFuture<Void>>
permitAcquireErrorHandler);
+
+ /**
+ * Send the response to a scalable-topic subscribe request on success. The
caller is
+ * responsible for building the {@link ScalableConsumerAssignment}.
+ */
+ void sendScalableTopicSubscribeResponse(long requestId,
ScalableConsumerAssignment assignment);
+
+ /**
+ * Send an error response to a scalable-topic subscribe request.
+ */
+ void sendScalableTopicSubscribeError(long requestId, ServerError error,
String message);
+
+ /**
+ * Push a segment-assignment update to a previously-subscribed scalable
consumer after
+ * a rebalance (peer added/removed, split, merge).
+ */
+ void sendScalableTopicAssignmentUpdate(long consumerId,
ScalableConsumerAssignment assignment);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 41b182f8bda..5e65b8a5571 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -405,6 +405,23 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
command, permitAcquireErrorHandler);
}
+ @Override
+ public void sendScalableTopicSubscribeResponse(long requestId,
+ org.apache.pulsar.common.api.proto.ScalableConsumerAssignment
assignment) {
+ writeAndFlush(Commands.newScalableTopicSubscribeResponse(requestId,
assignment));
+ }
+
+ @Override
+ public void sendScalableTopicSubscribeError(long requestId, ServerError
error, String message) {
+ writeAndFlush(Commands.newScalableTopicSubscribeError(requestId,
error, message));
+ }
+
+ @Override
+ public void sendScalableTopicAssignmentUpdate(long consumerId,
+ org.apache.pulsar.common.api.proto.ScalableConsumerAssignment
assignment) {
+ writeAndFlush(Commands.newScalableTopicAssignmentUpdate(consumerId,
assignment));
+ }
+
private void writeAndFlush(ByteBuf outBuf) {
NettyChannelUtil.writeAndFlushWithVoidPromise(cnx.ctx(), outBuf);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 569cbab1e07..cc09a3109e6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -92,6 +92,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -134,6 +135,8 @@ import org.apache.pulsar.common.api.proto.CommandNewTxn;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
import
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicClose;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -470,6 +473,36 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
});
this.topicListService.inactivate();
+
+ // Close any outstanding scalable-topic DAG watch sessions held by
this connection.
+ dagWatchSessions.values().forEach(session -> {
+ try {
+ session.close();
+ } catch (Exception e) {
+ log.warn().exceptionMessage(e).log("Error closing DAG watch
session on connection close");
+ }
+ });
+ dagWatchSessions.clear();
+
+ // Notify the scalable-topic controller that this connection's
scalable consumers
+ // have dropped. The controller marks them disconnected and starts the
grace-period
+ // timer; if they reconnect in time, their assignment is preserved.
+ if (!scalableConsumerRegistrations.isEmpty()) {
+ var scalableTopicService = service.getScalableTopicService();
+ if (scalableTopicService != null) {
+ scalableConsumerRegistrations.values().forEach(ref -> {
+ try {
+ scalableTopicService.onConsumerDisconnect(
+ ref.topicName(), ref.subscription(),
ref.consumerName());
+ } catch (Exception e) {
+ log.warn().attr("consumerName",
ref.consumerName()).exceptionMessage(e)
+ .log("Error notifying scalable controller of
consumer disconnect");
+ }
+ });
+ }
+ scalableConsumerRegistrations.clear();
+ }
+
this.service.getPulsarStats().recordConnectionClose();
// complete possible pending connection check future
@@ -709,6 +742,149 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
+ // --- Scalable topic lookup ---
+
+ private final java.util.concurrent.ConcurrentHashMap<Long,
+ org.apache.pulsar.broker.service.scalable.DagWatchSession>
dagWatchSessions =
+ new java.util.concurrent.ConcurrentHashMap<>();
+
+ @Override
+ protected void handleCommandScalableTopicLookup(
+ CommandScalableTopicLookup commandScalableTopicLookup) {
+ checkArgument(state == State.Connected);
+
+ final long sessionId = commandScalableTopicLookup.getSessionId();
+ final String topicStr = commandScalableTopicLookup.getTopic();
+
+ log.debug().attr("topic", topicStr).attr("sessionId", sessionId)
+ .log("Received ScalableTopicLookup");
+
+ final TopicName topicName;
+ try {
+ topicName = TopicName.get(topicStr);
+ } catch (Exception e) {
+ log.warn().attr("topic", topicStr).log("Invalid topic name in
ScalableTopicLookup");
+ ctx.close();
+ return;
+ }
+
+ if (!this.service.getPulsar().isRunning()) {
+ log.warn("ScalableTopicLookup rejected: broker not ready");
+ ctx.close();
+ return;
+ }
+
+ ScalableTopicResources resources =
service.getPulsar().getPulsarResources()
+ .getScalableTopicResources();
+ if (resources == null) {
+ log.warn("ScalableTopicLookup rejected: scalable topic resources
not available");
+ ctx.close();
+ return;
+ }
+
+ // Create a DagWatchSession that will send the initial layout and
watch for changes
+ var session = new
org.apache.pulsar.broker.service.scalable.DagWatchSession(
+ sessionId, topicName, this, resources, service);
+ dagWatchSessions.put(sessionId, session);
+
+ session.start()
+ .thenAcceptAsync(session::pushUpdate, ctx.executor())
+ .exceptionally(ex -> {
+ Throwable cause = ex.getCause() != null ? ex.getCause() :
ex;
+ log.warn().attr("topic", topicName).exception(cause)
+ .log("ScalableTopicLookup failed");
+ dagWatchSessions.remove(sessionId);
+ session.close();
+ ctx.executor().execute(() ->
+
ctx.writeAndFlush(Commands.newScalableTopicError(sessionId,
+ ServerError.TopicNotFound, cause.getMessage()))
+ );
+ return null;
+ });
+ }
+
+ @Override
+ protected void handleCommandScalableTopicClose(
+ CommandScalableTopicClose commandScalableTopicClose) {
+ checkArgument(state == State.Connected);
+
+ final long sessionId = commandScalableTopicClose.getSessionId();
+
+ log.debug().attr("sessionId", sessionId).log("Received
ScalableTopicClose");
+
+ var session = dagWatchSessions.remove(sessionId);
+ if (session != null) {
+ session.close();
+ }
+ }
+
+ // --- Scalable consumer registrations ---
+
+ /**
+ * Tracks the scalable-topic consumer registrations held by this
connection, so that
+ * when the TCP connection drops we can notify the owning {@link
+ * org.apache.pulsar.broker.service.scalable.ScalableTopicController} of
every
+ * disconnected consumer in bulk. Keyed by the protocol-level {@code
consumerId}.
+ */
+ private final java.util.concurrent.ConcurrentHashMap<Long,
ScalableConsumerRegistrationRef>
+ scalableConsumerRegistrations = new
java.util.concurrent.ConcurrentHashMap<>();
+
+ private record ScalableConsumerRegistrationRef(
+ org.apache.pulsar.common.naming.TopicName topicName,
+ String subscription,
+ String consumerName) {}
+
+ @Override
+ protected void handleCommandScalableTopicSubscribe(
+ org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe
+ commandScalableTopicSubscribe) {
+ checkArgument(state == State.Connected);
+
+ final long requestId = commandScalableTopicSubscribe.getRequestId();
+ final String topicStr = commandScalableTopicSubscribe.getTopic();
+ final String subscription =
commandScalableTopicSubscribe.getSubscription();
+ final String consumerName =
commandScalableTopicSubscribe.getConsumerName();
+ final long consumerId = commandScalableTopicSubscribe.getConsumerId();
+
+ log.debug().attr("topic", topicStr).attr("subscription", subscription)
+ .attr("consumerName", consumerName).attr("requestId",
requestId)
+ .log("Received ScalableTopicSubscribe");
+
+ final TopicName topicName;
+ try {
+ topicName = TopicName.get(topicStr);
+ } catch (Exception e) {
+ getCommandSender().sendScalableTopicSubscribeError(requestId,
+ ServerError.InvalidTopicName, "Invalid topic name: " +
topicStr);
+ return;
+ }
+
+ var scalableTopicService = service.getScalableTopicService();
+ if (scalableTopicService == null) {
+ getCommandSender().sendScalableTopicSubscribeError(requestId,
+ ServerError.ServiceNotReady, "Scalable topic service not
available");
+ return;
+ }
+
+ scalableTopicService.registerConsumer(topicName, subscription,
consumerName, consumerId, this)
+ .whenCompleteAsync((assignment, ex) -> {
+ if (ex != null) {
+ Throwable cause = ex.getCause() != null ?
ex.getCause() : ex;
+ log.warn().attr("topic",
topicName).attr("subscription", subscription)
+ .attr("consumerName",
consumerName).exception(cause)
+ .log("ScalableTopicSubscribe failed");
+
getCommandSender().sendScalableTopicSubscribeError(requestId,
+ ServerError.UnknownError, cause.getMessage());
+ return;
+ }
+ // Record the registration so we can call
onConsumerDisconnect on channelInactive.
+ scalableConsumerRegistrations.put(consumerId,
+ new ScalableConsumerRegistrationRef(topicName,
subscription, consumerName));
+
getCommandSender().sendScalableTopicSubscribeResponse(requestId,
+
org.apache.pulsar.broker.service.scalable.ConsumerSession.toProto(assignment));
+ }, ctx.executor());
+ }
+
@Override
protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata
partitionMetadataParam) {
checkArgument(state == State.Connected);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
index 3e798ca418b..b5e1f5c00fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
@@ -167,12 +167,42 @@ public class ConsumerSession {
}
/**
- * Push an assignment update to this consumer, if currently connected. The
actual
- * wire push is wired up in the protocol-commands commit; at this stage
the method
- * exists so {@link SubscriptionCoordinator} can call it.
+ * Push an assignment update to this consumer, if currently connected.
Builds a
+ * {@code ScalableConsumerAssignment} proto and writes a
+ * {@code CommandScalableTopicAssignmentUpdate} to the connection.
*/
public void sendAssignmentUpdate(ConsumerAssignment assignment) {
- // no-op until the protocol-commands commit wires in the wire protocol
+ TransportCnx localCnx = this.cnx;
+ if (localCnx == null || !connected) {
+ // Consumer is disconnected — no-op. The assignment will be
delivered when it
+ // reconnects (the coordinator re-pushes on attach).
+ return;
+ }
+ var sender = localCnx.getCommandSender();
+ if (sender == null) {
+ // Connection is in the middle of being torn down; skip silently.
+ return;
+ }
+ sender.sendScalableTopicAssignmentUpdate(consumerId,
toProto(assignment));
+ }
+
+ /**
+ * Convert the broker-side {@link ConsumerAssignment} record to its
protocol wire form.
+ * Shared by {@link #sendAssignmentUpdate(ConsumerAssignment)} and the
+ * {@code handleCommandScalableTopicSubscribe} path in {@code ServerCnx}.
+ */
+ public static
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment toProto(
+ ConsumerAssignment assignment) {
+ var proto = new
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment()
+ .setLayoutEpoch(assignment.layoutEpoch());
+ for (ConsumerAssignment.AssignedSegment seg :
assignment.assignedSegments()) {
+ proto.addSegment()
+ .setSegmentId(seg.segmentId())
+ .setHashStart(seg.hashRange().start())
+ .setHashEnd(seg.hashRange().end())
+ .setSegmentTopic(seg.underlyingTopicName());
+ }
+ return proto;
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
new file mode 100644
index 00000000000..87dee98b71b
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
+import org.apache.pulsar.common.api.proto.SegmentBrokerAddress;
+import org.apache.pulsar.common.api.proto.SegmentInfoProto;
+import org.apache.pulsar.common.api.proto.SegmentState;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Broker-side handler for a client's DAG watch session.
+ *
+ * <p>Any broker can serve this role since metadata is in the metadata store.
+ * The session watches for metadata changes (via Oxia watch) and pushes updated
+ * {@link ScalableTopicLayoutResponse} to the client.
+ *
+ * <p>The session is tied to a connection. When the connection breaks, the
session dies.
+ * The client must reinitiate a new session (possibly with another broker).
+ */
+public class DagWatchSession {
+
+ private static final Logger LOG = Logger.get(DagWatchSession.class);
+ private final Logger log;
+
+ @Getter
+ private final long sessionId;
+ private final TopicName topicName;
+ private final ServerCnx cnx;
+ private final ScalableTopicResources resources;
+ private final BrokerService brokerService;
+
+ private final String metadataPath;
+ private final java.util.function.Consumer<Notification>
notificationListener;
+ private volatile boolean closed = false;
+
+ public DagWatchSession(long sessionId,
+ TopicName topicName,
+ ServerCnx cnx,
+ ScalableTopicResources resources,
+ BrokerService brokerService) {
+ this.sessionId = sessionId;
+ this.topicName = topicName;
+ this.cnx = cnx;
+ this.resources = resources;
+ this.brokerService = brokerService;
+ this.metadataPath = resources.topicPath(topicName);
+ this.notificationListener = this::onNotification;
+ this.log = LOG.with().attr("topic", topicName).attr("sessionId",
sessionId).build();
+ }
+
+ /**
+ * Start the session: load current metadata, set up watch, and return
+ * the initial layout response.
+ */
+ public CompletableFuture<ScalableTopicLayoutResponse> start() {
+ // Register metadata store listener for changes to this topic's
metadata
+ resources.getStore().registerListener(notificationListener);
+
+ return resources.getScalableTopicMetadataAsync(topicName, true)
+ .thenCompose(optMd -> {
+ if (optMd.isEmpty()) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Scalable topic not
found: " + topicName));
+ }
+ ScalableTopicMetadata metadata = optMd.get();
+ return buildResponse(metadata);
+ });
+ }
+
+ // Visible for testing — invoked by the metadata-store listener registered
in start().
+ void onNotification(Notification notification) {
+ if (closed) {
+ return;
+ }
+ if (!metadataPath.equals(notification.getPath())) {
+ return;
+ }
+ if (notification.getType() == NotificationType.Deleted) {
+ return;
+ }
+ // Metadata changed — reload and push update
+ resources.getScalableTopicMetadataAsync(topicName, true)
+ .thenAccept(optMd -> optMd.ifPresent(this::onMetadataChanged));
+ }
+
+ /**
+ * Called when the metadata store watch fires (metadata changed).
+ */
+ public void onMetadataChanged(ScalableTopicMetadata newMetadata) {
+ if (closed) {
+ return;
+ }
+ buildResponse(newMetadata).thenAccept(this::pushUpdate);
+ }
+
+ /**
+ * Push an update to the connected client.
+ */
+ public void pushUpdate(ScalableTopicLayoutResponse response) {
+ if (closed) {
+ return;
+ }
+ ScalableTopicDAG dag = buildDagProto(response);
+ log.info().attr("epoch", response.epoch()).log("Pushing DAG update");
+ cnx.ctx().writeAndFlush(Commands.newScalableTopicUpdate(sessionId,
dag));
+ }
+
+ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse
response) {
+ ScalableTopicDAG dag = new ScalableTopicDAG();
+ dag.setEpoch(response.epoch());
+
+ for (var entry : response.segments().entrySet()) {
+ SegmentInfo seg = entry.getValue();
+ SegmentInfoProto segProto = dag.addSegment();
+ segProto.setSegmentId(seg.segmentId());
+ segProto.setHashStart(seg.hashRange().start());
+ segProto.setHashEnd(seg.hashRange().end());
+ segProto.setState(seg.isActive() ? SegmentState.ACTIVE :
SegmentState.SEALED);
+ for (int i = 0; i < seg.parentIds().size(); i++) {
+ segProto.addParentId(seg.parentIds().get(i));
+ }
+ for (int i = 0; i < seg.childIds().size(); i++) {
+ segProto.addChildId(seg.childIds().get(i));
+ }
+ segProto.setCreatedAtEpoch(seg.createdAtEpoch());
+ if (seg.sealedAtEpoch() >= 0) {
+ segProto.setSealedAtEpoch(seg.sealedAtEpoch());
+ }
+ }
+
+ // Add broker addresses for active segments
+ Map<Long, String> brokerAddresses = response.segmentBrokerAddresses();
+ if (brokerAddresses != null) {
+ for (var entry : brokerAddresses.entrySet()) {
+ SegmentBrokerAddress addr = dag.addSegmentBroker();
+ addr.setSegmentId(entry.getKey());
+ addr.setBrokerUrl(entry.getValue());
+ }
+ }
+
+ return dag;
+ }
+
+ public void close() {
+ closed = true;
+ // Listener is guarded by the closed flag; MetadataStore does not
support unregister.
+ }
+
+ /**
+ * Build a full layout response with broker addresses resolved.
+ */
+ private CompletableFuture<ScalableTopicLayoutResponse>
buildResponse(ScalableTopicMetadata metadata) {
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ // Resolve broker addresses for all active segments
+ CompletableFuture<Map<Long, String>> brokersFuture =
resolveSegmentBrokers(layout);
+
+ // Resolve controller broker address
+ CompletableFuture<Optional<String>> controllerFuture =
+ readControllerBrokerUrl();
+
+ return brokersFuture.thenCombine(controllerFuture, (segmentBrokers,
controllerUrl) ->
+ new ScalableTopicLayoutResponse(
+ layout.getEpoch(),
+ layout.getAllSegments(),
+ segmentBrokers,
+ null,
+ controllerUrl.orElse(null),
+ null));
+ }
+
+ private CompletableFuture<Map<Long, String>>
resolveSegmentBrokers(SegmentLayout layout) {
+ Map<Long, String> result = new LinkedHashMap<>();
+ CompletableFuture<?>[] futures =
layout.getActiveSegments().values().stream()
+ .map(segment -> {
+ // Resolve which broker owns this segment's underlying
segment:// topic
+ TopicName segTn =
org.apache.pulsar.common.scalable.SegmentTopicName.fromParent(
+ topicName, segment.hashRange(),
segment.segmentId());
+ var lookupOptions =
org.apache.pulsar.broker.namespace.LookupOptions.builder()
+ .readOnly(false).authoritative(false).build();
+ return brokerService.getPulsar().getNamespaceService()
+ .getBrokerServiceUrlAsync(segTn, lookupOptions)
+ .thenAccept(optUrl ->
optUrl.ifPresent(lookupResult -> {
+ synchronized (result) {
+ result.put(segment.segmentId(),
+
lookupResult.getLookupData().getBrokerUrl());
+ }
+ }));
+ })
+ .toArray(CompletableFuture[]::new);
+
+ return CompletableFuture.allOf(futures).thenApply(__ -> result);
+ }
+
+ private CompletableFuture<Optional<String>> readControllerBrokerUrl() {
+ String lockPath = resources.controllerLockPath(topicName);
+ return resources.getStore().get(lockPath)
+ .thenCompose(optValue -> {
+ if (optValue.isEmpty()) {
+ return
CompletableFuture.completedFuture(Optional.<String>empty());
+ }
+ // The leader-election value is the brokerId of the
controller leader.
+ // Resolve it to a pulsar:// service URL via
NamespaceService so clients
+ // can connect to the controller broker for scalable-topic
subscribe.
+ String brokerId = new String(optValue.get().getValue());
+ return brokerService.getPulsar().getNamespaceService()
+ .createLookupResult(brokerId, false, null)
+ .thenApply(lookupResult ->
+
Optional.ofNullable(lookupResult.getLookupData().getBrokerUrl()))
+ .exceptionally(ex -> {
+ log.warn().attr("brokerId",
brokerId).exceptionMessage(ex)
+ .log("Failed to resolve controller
broker");
+ return Optional.<String>empty();
+ });
+ });
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index c151df6e81b..dbb6b32aec7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -172,6 +172,29 @@ public class ScalableTopicService {
.thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
}
+ /**
+ * Register a scalable consumer with the controller leader for {@code
topic}.
+ * Persists a durable session and returns the consumer's segment
assignment.
+ */
+ public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName
topic, String subscription,
+ String
consumerName, long consumerId,
+
org.apache.pulsar.broker.service.TransportCnx cnx) {
+ return getOrCreateController(topic)
+ .thenCompose(controller ->
controller.registerConsumer(subscription, consumerName, consumerId, cnx));
+ }
+
+ /**
+ * Called when a scalable consumer's transport connection drops. Forwards
to the
+ * controller which marks the session disconnected and starts its grace
timer.
+ * No-op if the controller is not held locally.
+ */
+ public void onConsumerDisconnect(TopicName topic, String subscription,
String consumerName) {
+ ScalableTopicController controller = controllers.get(topic.toString());
+ if (controller != null) {
+ controller.onConsumerDisconnect(subscription, consumerName);
+ }
+ }
+
// --- Internal helpers ---
private void onLeaderStateChange(TopicName topic, LeaderElectionState
state) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ConsumerSessionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ConsumerSessionTest.java
new file mode 100644
index 00000000000..15e2b10ba30
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ConsumerSessionTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.PulsarCommandSender;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+public class ConsumerSessionTest {
+
+ private static final Duration GRACE = Duration.ofMillis(100);
+ private static final Logger PARENT_LOG =
Logger.get(ConsumerSessionTest.class);
+
+ /** Convenience wrapper: fresh mock scheduler that returns a mock
ScheduledFuture. */
+ private static final class TestContext {
+ final ScheduledExecutorService scheduler =
mock(ScheduledExecutorService.class);
+ final ScheduledFuture<?> future = mock(ScheduledFuture.class);
+ final Runnable onGraceExpiry = mock(Runnable.class);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TestContext() {
+ // Every schedule(...) call returns the same mock future so we can
verify cancel.
+ when(scheduler.schedule(any(Runnable.class), anyLong(),
any(TimeUnit.class)))
+ .thenReturn((ScheduledFuture) future);
+ }
+
+ ConsumerSession session(String name, long consumerId, TransportCnx
cnx) {
+ return new ConsumerSession(name, consumerId, cnx, GRACE,
scheduler, onGraceExpiry, PARENT_LOG);
+ }
+
+ ConsumerSession restored(String name) {
+ return ConsumerSession.restored(name, GRACE, scheduler,
onGraceExpiry, PARENT_LOG);
+ }
+ }
+
+ private static ConsumerAssignment buildAssignment(long epoch, long...
segmentIds) {
+ List<ConsumerAssignment.AssignedSegment> segments = new
java.util.ArrayList<>();
+ for (int i = 0; i < segmentIds.length; i++) {
+ long id = segmentIds[i];
+ int start = i * 0x4000;
+ int end = start + 0x3FFF;
+ segments.add(new ConsumerAssignment.AssignedSegment(
+ id, HashRange.of(start, end),
+ "persistent://tenant/ns/my-scalable-seg-" + id));
+ }
+ return new ConsumerAssignment(epoch, segments);
+ }
+
+ // --- Construction / identity ---
+
+ @Test
+ public void testConstructionWithConnectionMarksConnected() {
+ TestContext ctx = new TestContext();
+ TransportCnx cnx = mock(TransportCnx.class);
+
+ ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+ assertEquals(session.getConsumerName(), "c1");
+ assertEquals(session.getConsumerId(), 10L);
+ assertSame(session.getCnx(), cnx);
+ assertTrue(session.isConnected());
+ assertNull(session.getGraceTimer(), "no timer is armed until
markDisconnected");
+ verify(ctx.scheduler, never())
+ .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testRestoredStartsDisconnectedWithGraceTimerArmed() {
+ TestContext ctx = new TestContext();
+
+ ConsumerSession session = ctx.restored("c-restored");
+
+ assertEquals(session.getConsumerName(), "c-restored");
+ assertEquals(session.getConsumerId(), -1L);
+ assertNull(session.getCnx());
+ assertFalse(session.isConnected());
+ // restored() arms the grace timer internally so callers can't forget
to schedule it.
+ assertNotNull(session.getGraceTimer(), "restored() must arm the grace
timer");
+ verify(ctx.scheduler, times(1))
+ .schedule(eq(ctx.onGraceExpiry), eq(GRACE.toMillis()),
eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testEqualsAndHashCodeOnConsumerNameOnly() {
+ TestContext ctx = new TestContext();
+ TransportCnx cnx1 = mock(TransportCnx.class);
+ TransportCnx cnx2 = mock(TransportCnx.class);
+ ConsumerSession a = ctx.session("same", 1L, cnx1);
+ ConsumerSession b = ctx.session("same", 2L, cnx2);
+ ConsumerSession c = ctx.session("other", 1L, cnx1);
+
+ assertEquals(a, b, "sessions with the same consumerName must be
equal");
+ assertEquals(a.hashCode(), b.hashCode());
+ assertFalse(a.equals(c));
+ }
+
+ // --- attach / markDisconnected ---
+
+ @Test
+ public void testAttachUpdatesIdAndCnxAndMarksConnected() {
+ TestContext ctx = new TestContext();
+ ConsumerSession session = ctx.restored("c1");
+ TransportCnx fresh = mock(TransportCnx.class);
+
+ session.attach(77L, fresh);
+
+ assertEquals(session.getConsumerId(), 77L);
+ assertSame(session.getCnx(), fresh);
+ assertTrue(session.isConnected());
+ }
+
+ @Test
+ public void testAttachCancelsPendingGraceTimer() {
+ TestContext ctx = new TestContext();
+ // restored() arms the timer via the scheduler mock; attach should
cancel it.
+ ConsumerSession session = ctx.restored("c1");
+ assertSame(session.getGraceTimer(), ctx.future);
+
+ session.attach(1L, mock(TransportCnx.class));
+
+ verify(ctx.future).cancel(false);
+ assertNull(session.getGraceTimer());
+ }
+
+ @Test
+ public void testMarkDisconnectedClearsCnxKeepsConsumerIdAndArmsTimer() {
+ TestContext ctx = new TestContext();
+ TransportCnx cnx = mock(TransportCnx.class);
+ ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+ session.markDisconnected();
+
+ assertFalse(session.isConnected());
+ assertNull(session.getCnx());
+ assertEquals(session.getConsumerId(), 10L, "consumerId is preserved
until reattach");
+ assertSame(session.getGraceTimer(), ctx.future, "markDisconnected arms
the grace timer");
+ verify(ctx.scheduler, times(1))
+ .schedule(eq(ctx.onGraceExpiry), eq(GRACE.toMillis()),
eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testMarkDisconnectedTwiceCancelsPreviousTimer() {
+ // startGraceTimer cancels any prior timer before scheduling a new
one; a double
+ // markDisconnected (edge case) should leave only the latest timer
armed.
+ TestContext ctx = new TestContext();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ ScheduledFuture<?> second = mock(ScheduledFuture.class);
+ when(ctx.scheduler.schedule(any(Runnable.class), anyLong(),
any(TimeUnit.class)))
+ .thenReturn((ScheduledFuture) ctx.future, (ScheduledFuture)
second);
+
+ ConsumerSession session = ctx.session("c1", 10L,
mock(TransportCnx.class));
+
+ session.markDisconnected();
+ session.markDisconnected();
+
+ verify(ctx.future).cancel(false);
+ assertSame(session.getGraceTimer(), second);
+ }
+
+ @Test
+ public void testCancelGraceTimerIsIdempotent() {
+ TestContext ctx = new TestContext();
+ ConsumerSession session = ctx.restored("c1");
+ assertSame(session.getGraceTimer(), ctx.future);
+
+ session.cancelGraceTimer();
+ session.cancelGraceTimer(); // second call is a no-op
+
+ verify(ctx.future, times(1)).cancel(false);
+ assertNull(session.getGraceTimer());
+ }
+
+ // --- sendAssignmentUpdate ---
+
+ @Test
+ public void testSendAssignmentUpdateWritesViaCommandSender() {
+ TestContext ctx = new TestContext();
+ TransportCnx cnx = mock(TransportCnx.class);
+ PulsarCommandSender sender = mock(PulsarCommandSender.class);
+ when(cnx.getCommandSender()).thenReturn(sender);
+ ConsumerSession session = ctx.session("c1", 42L, cnx);
+
+ session.sendAssignmentUpdate(buildAssignment(5L, 0L, 1L));
+
+ ArgumentCaptor<ScalableConsumerAssignment> captor =
+ ArgumentCaptor.forClass(ScalableConsumerAssignment.class);
+ verify(sender).sendScalableTopicAssignmentUpdate(eq(42L),
captor.capture());
+ ScalableConsumerAssignment proto = captor.getValue();
+ assertEquals(proto.getLayoutEpoch(), 5L);
+ assertEquals(proto.getSegmentsCount(), 2);
+ assertEquals(proto.getSegmentAt(0).getSegmentId(), 0L);
+ assertEquals(proto.getSegmentAt(1).getSegmentId(), 1L);
+ }
+
+ @Test
+ public void testSendAssignmentUpdateIsNoopWhenDisconnected() {
+ TestContext ctx = new TestContext();
+ ConsumerSession session = ctx.restored("c1");
+ // restored() leaves the session disconnected with no cnx — should not
NPE, should not call anything.
+ session.sendAssignmentUpdate(buildAssignment(0L, 0L));
+ // no mocks to verify — just ensure no exception
+ }
+
+ @Test
+ public void testSendAssignmentUpdateIsNoopAfterMarkDisconnected() {
+ TestContext ctx = new TestContext();
+ TransportCnx cnx = mock(TransportCnx.class);
+ PulsarCommandSender sender = mock(PulsarCommandSender.class);
+ when(cnx.getCommandSender()).thenReturn(sender);
+ ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+ session.markDisconnected();
+ session.sendAssignmentUpdate(buildAssignment(0L, 0L));
+
+ verify(sender, never()).sendScalableTopicAssignmentUpdate(anyLong(),
any());
+ }
+
+ @Test
+ public void testSendAssignmentUpdateIsNoopWhenCommandSenderNull() {
+ TestContext ctx = new TestContext();
+ TransportCnx cnx = mock(TransportCnx.class);
+ when(cnx.getCommandSender()).thenReturn(null); // connection tearing
down
+ ConsumerSession session = ctx.session("c1", 10L, cnx);
+
+ // no exception, no interaction
+ session.sendAssignmentUpdate(buildAssignment(0L, 0L));
+ }
+
+ // --- toProto ---
+
+ @Test
+ public void testToProtoConvertsAllFields() {
+ ConsumerAssignment assignment = buildAssignment(9L, 3L, 4L, 5L);
+
+ ScalableConsumerAssignment proto = ConsumerSession.toProto(assignment);
+
+ assertEquals(proto.getLayoutEpoch(), 9L);
+ assertEquals(proto.getSegmentsCount(), 3);
+ // hash ranges are derived in buildAssignment as 0x4000-wide
contiguous ranges
+ assertEquals(proto.getSegmentAt(0).getSegmentId(), 3L);
+ assertEquals(proto.getSegmentAt(0).getHashStart(), 0x0000);
+ assertEquals(proto.getSegmentAt(0).getHashEnd(), 0x3FFF);
+ assertEquals(proto.getSegmentAt(0).getSegmentTopic(),
+ "persistent://tenant/ns/my-scalable-seg-3");
+ assertEquals(proto.getSegmentAt(1).getSegmentId(), 4L);
+ assertEquals(proto.getSegmentAt(1).getHashStart(), 0x4000);
+ assertEquals(proto.getSegmentAt(2).getSegmentId(), 5L);
+ assertEquals(proto.getSegmentAt(2).getHashStart(), 0x8000);
+ }
+
+ @Test
+ public void testToProtoHandlesEmptyAssignment() {
+ ConsumerAssignment empty = new ConsumerAssignment(0L, List.of());
+
+ ScalableConsumerAssignment proto = ConsumerSession.toProto(empty);
+
+ assertEquals(proto.getLayoutEpoch(), 0L);
+ assertEquals(proto.getSegmentsCount(), 0);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
new file mode 100644
index 00000000000..9f423413003
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentState;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Focused unit tests for {@link DagWatchSession}.
+ *
+ * <p>The deep namespace-lookup codepath inside {@code start()} (resolving
per-segment
+ * brokers and the controller broker) is covered by integration tests; these
unit tests
+ * stay in the parts that don't need a live {@code NamespaceService}: session
lifecycle,
+ * notification filtering, and the DAG proto built by {@link
DagWatchSession#pushUpdate}.
+ */
+public class DagWatchSessionTest {
+
+ private static final String TOPIC_PATH =
"/admin/scalable-topics/tenant/ns/my-scalable";
+ private static final TopicName TOPIC =
TopicName.get("topic://tenant/ns/my-scalable");
+ private static final long SESSION_ID = 42L;
+
+ private ScalableTopicResources resources;
+ private ServerCnx cnx;
+ private ChannelHandlerContext ctx;
+ private BrokerService brokerService;
+ private DagWatchSession session;
+
+ @BeforeMethod
+ public void setup() {
+ resources = mock(ScalableTopicResources.class);
+ cnx = mock(ServerCnx.class);
+ ctx = mock(ChannelHandlerContext.class);
+ brokerService = mock(BrokerService.class);
+
+ when(resources.topicPath(TOPIC)).thenReturn(TOPIC_PATH);
+ when(cnx.ctx()).thenReturn(ctx);
+ // Default: metadata store has a valid registerListener hook.
+ var store = mock(org.apache.pulsar.metadata.api.MetadataStore.class);
+ when(resources.getStore()).thenReturn(store);
+
+ session = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources,
brokerService);
+ }
+
+ // --- identity / lifecycle ---
+
+ @Test
+ public void testSessionIdIsPreserved() {
+ assertEquals(session.getSessionId(), SESSION_ID);
+ }
+
+ @Test
+ public void testCloseIsIdempotent() {
+ session.close();
+ session.close(); // must not throw
+ }
+
+ // --- start() ---
+
+ @Test
+ public void testStartFailsWhenTopicMetadataMissing() {
+ when(resources.getScalableTopicMetadataAsync(TOPIC, true))
+
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ CompletableFuture<ScalableTopicLayoutResponse> future =
session.start();
+
+ assertTrue(future.isDone());
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.get();
+ fail("expected failure");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue(cause instanceof IllegalStateException, "got: " +
cause);
+ assertTrue(cause.getMessage().contains("not found"),
cause.getMessage());
+ }
+ }
+
+ @Test
+ public void testStartRegistersMetadataStoreListener() {
+ // Regardless of outcome, start() should wire up a notification
listener so that
+ // subsequent metadata changes flow into the session.
+ when(resources.getScalableTopicMetadataAsync(TOPIC, true))
+
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ session.start();
+
+ verify(resources.getStore()).registerListener(any());
+ }
+
+ // --- onNotification filtering ---
+
+ @Test
+ public void testNotificationForUnrelatedPathIsIgnored() {
+ session.onNotification(new Notification(NotificationType.Modified,
"/some/other/path"));
+
+ verify(resources, never()).getScalableTopicMetadataAsync(any(),
anyBoolean());
+ }
+
+ @Test
+ public void testDeletedNotificationIsIgnored() {
+ session.onNotification(new Notification(NotificationType.Deleted,
TOPIC_PATH));
+
+ verify(resources, never()).getScalableTopicMetadataAsync(any(),
anyBoolean());
+ }
+
+ @Test
+ public void testNotificationAfterCloseIsIgnored() {
+ session.close();
+ session.onNotification(new Notification(NotificationType.Modified,
TOPIC_PATH));
+
+ verify(resources, never()).getScalableTopicMetadataAsync(any(),
anyBoolean());
+ }
+
+ @Test
+ public void testNotificationOnMatchingPathTriggersReload() {
+ // Return an empty optional so we stop before the NamespaceService
calls inside
+ // buildResponse — we only care that the reload was kicked off.
+ when(resources.getScalableTopicMetadataAsync(TOPIC, true))
+
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ session.onNotification(new Notification(NotificationType.Modified,
TOPIC_PATH));
+
+ verify(resources, times(1)).getScalableTopicMetadataAsync(TOPIC, true);
+ }
+
+ // --- pushUpdate ---
+
+ @Test
+ public void testPushUpdateWritesDagToConnection() {
+ ScalableTopicLayoutResponse response = buildSampleResponse(
+ 7L,
+ Map.of(
+ 0L, seg(0, 0x0000, 0x7FFF, SegmentState.SEALED, new
long[]{}, new long[]{2L, 3L}, 0L, 5L),
+ 2L, seg(2, 0x0000, 0x3FFF, SegmentState.ACTIVE, new
long[]{0L}, new long[]{}, 7L, -1L),
+ 3L, seg(3, 0x4000, 0x7FFF, SegmentState.ACTIVE, new
long[]{0L}, new long[]{}, 7L, -1L)),
+ Map.of(
+ 2L, "pulsar://broker-a:6650",
+ 3L, "pulsar://broker-b:6650"));
+
+ session.pushUpdate(response);
+
+ ArgumentCaptor<ByteBuf> captor =
ArgumentCaptor.forClass(ByteBuf.class);
+ verify(ctx).writeAndFlush(captor.capture());
+
+ BaseCommand cmd = parseFrame(captor.getValue());
+ assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
+ assertEquals(cmd.getScalableTopicUpdate().getSessionId(), SESSION_ID);
+
+ var dag = cmd.getScalableTopicUpdate().getDag();
+ assertEquals(dag.getEpoch(), 7L);
+ assertEquals(dag.getSegmentsCount(), 3);
+
+ // sealed parent should not have parentIds; its childIds should be 2, 3
+ var parent = findSegment(dag, 0L);
+ assertEquals(parent.getState(),
org.apache.pulsar.common.api.proto.SegmentState.SEALED);
+ assertEquals(parent.getChildIdsCount(), 2);
+ assertEquals(parent.getChildIdAt(0), 2L);
+ assertEquals(parent.getChildIdAt(1), 3L);
+ assertEquals(parent.getCreatedAtEpoch(), 0L);
+ assertEquals(parent.getSealedAtEpoch(), 5L);
+
+ // active children should reference parent 0
+ var childA = findSegment(dag, 2L);
+ assertEquals(childA.getState(),
org.apache.pulsar.common.api.proto.SegmentState.ACTIVE);
+ assertEquals(childA.getParentIdsCount(), 1);
+ assertEquals(childA.getParentIdAt(0), 0L);
+ assertEquals(childA.getCreatedAtEpoch(), 7L);
+ // sealedAtEpoch is only written when non-negative
+ assertTrue(!childA.hasSealedAtEpoch() || childA.getSealedAtEpoch() ==
0,
+ "active segment should not have sealedAtEpoch set");
+
+ // broker addresses only for the 2 active segments
+ assertEquals(dag.getSegmentBrokersCount(), 2);
+ Map<Long, String> brokerAddrs = new LinkedHashMap<>();
+ for (int i = 0; i < dag.getSegmentBrokersCount(); i++) {
+ brokerAddrs.put(dag.getSegmentBrokerAt(i).getSegmentId(),
+ dag.getSegmentBrokerAt(i).getBrokerUrl());
+ }
+ assertEquals(brokerAddrs.get(2L), "pulsar://broker-a:6650");
+ assertEquals(brokerAddrs.get(3L), "pulsar://broker-b:6650");
+ }
+
+ @Test
+ public void testPushUpdateAfterCloseIsNoop() {
+ ScalableTopicLayoutResponse response = buildSampleResponse(
+ 0L,
+ Map.of(0L, seg(0, 0x0000, 0xFFFF, SegmentState.ACTIVE, new
long[]{}, new long[]{}, 0L, -1L)),
+ Map.of());
+
+ session.close();
+ session.pushUpdate(response);
+
+ verify(ctx, never()).writeAndFlush(any());
+ }
+
+ @Test
+ public void testPushUpdateWithNullBrokerAddressMapOmitsBrokerSection() {
+ // buildDagProto guards against a null address map (e.g., when
upstream namespace
+ // lookup short-circuits) and should not throw.
+ ScalableTopicLayoutResponse response = new ScalableTopicLayoutResponse(
+ 1L,
+ Map.of(0L, seg(0, 0x0000, 0xFFFF, SegmentState.ACTIVE, new
long[]{}, new long[]{}, 0L, -1L)),
+ null, null, null, null);
+
+ session.pushUpdate(response);
+
+ ArgumentCaptor<ByteBuf> captor =
ArgumentCaptor.forClass(ByteBuf.class);
+ verify(ctx).writeAndFlush(captor.capture());
+ BaseCommand cmd = parseFrame(captor.getValue());
+
assertEquals(cmd.getScalableTopicUpdate().getDag().getSegmentBrokersCount(), 0);
+ }
+
+ // --- onMetadataChanged ---
+
+ @Test
+ public void testOnMetadataChangedAfterCloseIsNoop() {
+ session.close();
+ // Build a minimal metadata object; close should short-circuit before
any work runs.
+ ScalableTopicMetadata md =
ScalableTopicController.createInitialMetadata(1, Map.of());
+ session.onMetadataChanged(md);
+
+ verify(ctx, never()).writeAndFlush(any());
+ }
+
+ // ==== helpers ====
+
+ private static SegmentInfo seg(long id, int start, int end, SegmentState
state,
+ long[] parents, long[] children,
+ long createdAt, long sealedAt) {
+ return new SegmentInfo(
+ id,
+ HashRange.of(start, end),
+ state,
+ toList(parents),
+ toList(children),
+ createdAt,
+ sealedAt);
+ }
+
+ private static java.util.List<Long> toList(long[] arr) {
+ java.util.List<Long> out = new java.util.ArrayList<>(arr.length);
+ for (long v : arr) {
+ out.add(v);
+ }
+ return out;
+ }
+
+ private static ScalableTopicLayoutResponse buildSampleResponse(
+ long epoch,
+ Map<Long, SegmentInfo> segments,
+ Map<Long, String> brokerAddrs) {
+ return new ScalableTopicLayoutResponse(epoch, segments, brokerAddrs,
null, null, null);
+ }
+
+ private static org.apache.pulsar.common.api.proto.SegmentInfoProto
findSegment(
+ org.apache.pulsar.common.api.proto.ScalableTopicDAG dag, long id) {
+ for (int i = 0; i < dag.getSegmentsCount(); i++) {
+ if (dag.getSegmentAt(i).getSegmentId() == id) {
+ return dag.getSegmentAt(i);
+ }
+ }
+ fail("segment " + id + " not found");
+ return null;
+ }
+
+ private static BaseCommand parseFrame(ByteBuf frame) {
+ assertNotNull(frame);
+ try {
+ frame.skipBytes(4); // total size
+ int cmdSize = (int) frame.readUnsignedInt();
+ BaseCommand cmd = new BaseCommand();
+ cmd.parseFrom(frame, cmdSize);
+ // materialize() copies fields out of the backing buffer so it's
safe to
+ // release the frame before the caller reads fields back.
+ cmd.materialize();
+ return cmd;
+ } finally {
+ frame.release();
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 0c8bf66ae25..7ab0b4f0239 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -160,6 +160,12 @@ public class ClientCnx extends PulsarHandler {
.expectedItems(16)
.concurrencyLevel(1)
.build();
+ @Getter(AccessLevel.PACKAGE)
+ private final ConcurrentLongHashMap<DagWatchSession> dagWatchSessions =
+ ConcurrentLongHashMap.<DagWatchSession>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(1)
+ .build();
private final CompletableFuture<Void> connectionFuture = new
CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new
ConcurrentLinkedQueue<>();
@@ -354,12 +360,14 @@ public class ClientCnx extends PulsarHandler {
consumers.forEach((id, consumer) -> consumer.connectionClosed(this,
Optional.empty(), Optional.empty()));
transactionMetaStoreHandlers.forEach((id, handler) ->
handler.connectionClosed(this));
topicListWatchers.forEach((__, watcher) ->
watcher.connectionClosed(this));
+ dagWatchSessions.forEach((__, session) -> session.connectionClosed());
waitingLookupRequests.clear();
producers.clear();
consumers.clear();
topicListWatchers.clear();
+ dagWatchSessions.clear();
timeoutTask.cancel(true);
}
@@ -1306,6 +1314,43 @@ public class ClientCnx extends PulsarHandler {
}
}
+ @Override
+ protected void handleCommandScalableTopicUpdate(
+ org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate cmd)
{
+ checkArgument(state == State.Ready);
+
+ long sessionId = cmd.getSessionId();
+ log.debug().attr("sessionId", sessionId).log("Received
scalableTopicUpdate");
+
+ if (cmd.hasError()) {
+ // Error response for the initial lookup
+ DagWatchSession session = dagWatchSessions.remove(sessionId);
+ if (session != null) {
+ session.onError(cmd.getError(), cmd.hasMessage() ?
cmd.getMessage() : null);
+ } else {
+ log.warn().attr("sessionId", sessionId)
+ .log("Received scalable topic error for unknown
session");
+ }
+ return;
+ }
+
+ DagWatchSession session = dagWatchSessions.get(sessionId);
+ if (session != null) {
+ session.onUpdate(cmd.getDag());
+ } else {
+ log.warn().attr("sessionId", sessionId)
+ .log("Received scalable topic update for unknown session");
+ }
+ }
+
+ public void registerDagWatchSession(long sessionId, DagWatchSession
session) {
+ dagWatchSessions.put(sessionId, session);
+ }
+
+ public void removeDagWatchSession(long sessionId) {
+ dagWatchSessions.remove(sessionId);
+ }
+
/**
* check serverError and take appropriate action.
* <ul>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
new file mode 100644
index 00000000000..f48681ff6d0
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DagWatchSession.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
+import org.apache.pulsar.common.api.proto.ServerError;
+
+/**
+ * Callback interface for receiving DAG watch session updates from the broker.
+ * Implemented by the v5 client's DagWatchClient.
+ */
+public interface DagWatchSession {
+
+ /**
+ * Called when the broker sends a DAG update for this session.
+ */
+ void onUpdate(ScalableTopicDAG dag);
+
+ /**
+ * Called when the broker sends an error for this session.
+ */
+ void onError(ServerError error, String message);
+
+ /**
+ * Called when the connection to the broker is closed.
+ * Implementations should fail any pending operations and optionally
reconnect.
+ */
+ void connectionClosed();
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 8c6289b944d..c805164ef92 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -86,6 +86,9 @@ import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataRespons
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate;
+import
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -102,6 +105,8 @@ import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
@@ -1670,6 +1675,77 @@ public class Commands {
return cmd;
}
+ // --- Scalable topic commands ---
+
+ public static ByteBuf newScalableTopicLookup(long sessionId, String topic)
{
+ BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_LOOKUP);
+ cmd.setScalableTopicLookup()
+ .setSessionId(sessionId)
+ .setTopic(topic);
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newScalableTopicClose(long sessionId) {
+ BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_CLOSE);
+ cmd.setScalableTopicClose()
+ .setSessionId(sessionId);
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newScalableTopicUpdate(long sessionId,
ScalableTopicDAG dag) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
+ CommandScalableTopicUpdate update = cmd.setScalableTopicUpdate()
+ .setSessionId(sessionId);
+ update.setDag().copyFrom(dag);
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newScalableTopicError(long sessionId, ServerError
error, String message) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.SCALABLE_TOPIC_UPDATE);
+ cmd.setScalableTopicUpdate()
+ .setSessionId(sessionId)
+ .setError(error)
+ .setMessage(message);
+ return serializeWithSize(cmd);
+ }
+
+ /**
+ * Broker -> Client: response to a scalable-topic subscribe request. On
success the
+ * caller must populate the nested {@link ScalableConsumerAssignment} via
+ * {@code response.setAssignment()} before serializing; on failure the
error and
+ * message should be set instead.
+ */
+ public static ByteBuf newScalableTopicSubscribeResponse(long requestId,
+
ScalableConsumerAssignment assignment) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+ CommandScalableTopicSubscribeResponse response =
cmd.setScalableTopicSubscribeResponse()
+ .setRequestId(requestId);
+ response.setAssignment().copyFrom(assignment);
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newScalableTopicSubscribeError(long requestId,
ServerError error, String message) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+ cmd.setScalableTopicSubscribeResponse()
+ .setRequestId(requestId)
+ .setError(error)
+ .setMessage(message);
+ return serializeWithSize(cmd);
+ }
+
+ /**
+ * Broker -> Client: push a new segment assignment to a
previously-subscribed scalable
+ * consumer after a rebalance.
+ */
+ public static ByteBuf newScalableTopicAssignmentUpdate(long consumerId,
+
ScalableConsumerAssignment assignment) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.SCALABLE_TOPIC_ASSIGNMENT_UPDATE);
+ CommandScalableTopicAssignmentUpdate update =
cmd.setScalableTopicAssignmentUpdate()
+ .setConsumerId(consumerId);
+ update.setAssignment().copyFrom(assignment);
+ return serializeWithSize(cmd);
+ }
+
public static ByteBuf serializeWithSize(BaseCommand cmd) {
return serializeWithPrecalculatedSerializedSize(cmd,
cmd.getSerializedSize());
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 595d75e8801..0eb2b1b9670 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -72,6 +72,12 @@ import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic;
import
org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicClose;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicLookup;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribe;
+import
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse;
+import org.apache.pulsar.common.api.proto.CommandScalableTopicUpdate;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSendError;
@@ -478,6 +484,36 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
handleCommandWatchTopicListClose(cmd.getWatchTopicListClose());
break;
+ case SCALABLE_TOPIC_LOOKUP:
+ checkArgument(cmd.hasScalableTopicLookup());
+ handleCommandScalableTopicLookup(cmd.getScalableTopicLookup());
+ break;
+
+ case SCALABLE_TOPIC_UPDATE:
+ checkArgument(cmd.hasScalableTopicUpdate());
+ handleCommandScalableTopicUpdate(cmd.getScalableTopicUpdate());
+ break;
+
+ case SCALABLE_TOPIC_CLOSE:
+ checkArgument(cmd.hasScalableTopicClose());
+ handleCommandScalableTopicClose(cmd.getScalableTopicClose());
+ break;
+
+ case SCALABLE_TOPIC_SUBSCRIBE:
+ checkArgument(cmd.hasScalableTopicSubscribe());
+
handleCommandScalableTopicSubscribe(cmd.getScalableTopicSubscribe());
+ break;
+
+ case SCALABLE_TOPIC_SUBSCRIBE_RESPONSE:
+ checkArgument(cmd.hasScalableTopicSubscribeResponse());
+
handleCommandScalableTopicSubscribeResponse(cmd.getScalableTopicSubscribeResponse());
+ break;
+
+ case SCALABLE_TOPIC_ASSIGNMENT_UPDATE:
+ checkArgument(cmd.hasScalableTopicAssignmentUpdate());
+
handleCommandScalableTopicAssignmentUpdate(cmd.getScalableTopicAssignmentUpdate());
+ break;
+
default:
break;
}
@@ -744,6 +780,33 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleCommandScalableTopicLookup(CommandScalableTopicLookup
commandScalableTopicLookup) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandScalableTopicUpdate(CommandScalableTopicUpdate
commandScalableTopicUpdate) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandScalableTopicClose(CommandScalableTopicClose
commandScalableTopicClose) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandScalableTopicSubscribe(
+ CommandScalableTopicSubscribe commandScalableTopicSubscribe) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandScalableTopicSubscribeResponse(
+ CommandScalableTopicSubscribeResponse
commandScalableTopicSubscribeResponse) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandScalableTopicAssignmentUpdate(
+ CommandScalableTopicAssignmentUpdate
commandScalableTopicAssignmentUpdate) {
+ throw new UnsupportedOperationException();
+ }
+
private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index ed49f7bc19b..114d083e12f 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -834,6 +834,111 @@ message CommandWatchTopicListClose {
required uint64 watcher_id = 2;
}
+/// --- Scalable topic commands ---
+
+enum SegmentState {
+ ACTIVE = 0;
+ SEALED = 1;
+}
+
+message SegmentInfoProto {
+ required uint64 segment_id = 1;
+ required uint32 hash_start = 2;
+ required uint32 hash_end = 3;
+ required SegmentState state = 4;
+ repeated uint64 parent_ids = 5;
+ repeated uint64 child_ids = 6;
+ required uint64 created_at_epoch = 7;
+ optional uint64 sealed_at_epoch = 8;
+}
+
+message SegmentBrokerAddress {
+ required uint64 segment_id = 1;
+ required string broker_url = 2;
+ optional string broker_url_tls = 3;
+}
+
+message ScalableTopicDAG {
+ required uint64 epoch = 1;
+ repeated SegmentInfoProto segments = 2;
+ repeated SegmentBrokerAddress segment_brokers = 3;
+ optional string controller_broker_url = 4;
+ optional string controller_broker_url_tls = 5;
+}
+
+// Client -> Broker: Request scalable topic metadata and initiate watch session
+message CommandScalableTopicLookup {
+ required uint64 session_id = 1; // Client-assigned session ID
+ required string topic = 2; // e.g. "topic://tenant/ns/my-topic"
+}
+
+// Broker -> Client: Used for BOTH initial response and subsequent pushed
updates
+message CommandScalableTopicUpdate {
+ required uint64 session_id = 1;
+ optional ScalableTopicDAG dag = 2;
+
+ optional ServerError error = 3;
+ optional string message = 4;
+}
+
+// Client -> Broker: Close the DAG watch session
+message CommandScalableTopicClose {
+ required uint64 session_id = 1;
+}
+
+// Kind of scalable consumer registering with the controller leader.
+// QueueConsumer never registers — it attaches directly to all active and
sealed
+// segment topics — so it does not appear here.
+enum ScalableConsumerType {
+ STREAM = 0;
+ CHECKPOINT = 1;
+}
+
+// A single segment assigned to a scalable consumer.
+message ScalableAssignedSegment {
+ required uint64 segment_id = 1;
+ required uint32 hash_start = 2;
+ required uint32 hash_end = 3;
+ // Fully-qualified segment:// topic name the consumer should attach to.
+ required string segment_topic = 4;
+}
+
+// An assignment of active segments to a single consumer. Carries the layout
epoch
+// it was computed from so the client can reject stale updates.
+message ScalableConsumerAssignment {
+ required uint64 layout_epoch = 1;
+ repeated ScalableAssignedSegment segments = 2;
+}
+
+// Client -> Broker: register as an ordered (Stream) or external (Checkpoint)
consumer
+// on a scalable topic and request the initial segment assignment. The broker
leader
+// persists the consumer registration and returns the current assignment.
+message CommandScalableTopicSubscribe {
+ required uint64 request_id = 1;
+ required string topic = 2; // e.g.
"topic://tenant/ns/my-topic"
+ required string subscription = 3;
+ required string consumer_name = 4;
+ required uint64 consumer_id = 5;
+ required ScalableConsumerType consumer_type = 6;
+}
+
+// Broker -> Client: response to CommandScalableTopicSubscribe. On success,
carries
+// the initial ScalableConsumerAssignment. On failure, error + message are
populated
+// and the assignment is absent.
+message CommandScalableTopicSubscribeResponse {
+ required uint64 request_id = 1;
+ optional ServerError error = 2;
+ optional string message = 3;
+ optional ScalableConsumerAssignment assignment = 4;
+}
+
+// Broker -> Client: push a new assignment to a subscribed consumer after a
rebalance
+// (triggered by a peer joining/leaving the subscription or by a segment
split/merge).
+message CommandScalableTopicAssignmentUpdate {
+ required uint64 consumer_id = 1;
+ required ScalableConsumerAssignment assignment = 2;
+}
+
message CommandGetSchema {
required uint64 request_id = 1;
required string topic = 2;
@@ -1070,6 +1175,14 @@ message BaseCommand {
WATCH_TOPIC_LIST_CLOSE = 67;
TOPIC_MIGRATED = 68;
+
+ SCALABLE_TOPIC_LOOKUP = 70;
+ SCALABLE_TOPIC_UPDATE = 71;
+ SCALABLE_TOPIC_CLOSE = 72;
+
+ SCALABLE_TOPIC_SUBSCRIBE = 73;
+ SCALABLE_TOPIC_SUBSCRIBE_RESPONSE = 74;
+ SCALABLE_TOPIC_ASSIGNMENT_UPDATE = 75;
}
@@ -1153,4 +1266,12 @@ message BaseCommand {
optional CommandWatchTopicListClose watchTopicListClose = 67;
optional CommandTopicMigrated topicMigrated = 68;
+
+ optional CommandScalableTopicLookup scalableTopicLookup = 70;
+ optional CommandScalableTopicUpdate scalableTopicUpdate = 71;
+ optional CommandScalableTopicClose scalableTopicClose = 72;
+
+ optional CommandScalableTopicSubscribe scalableTopicSubscribe
= 73;
+ optional CommandScalableTopicSubscribeResponse
scalableTopicSubscribeResponse = 74;
+ optional CommandScalableTopicAssignmentUpdate
scalableTopicAssignmentUpdate = 75;
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
new file mode 100644
index 00000000000..9afd76a98e0
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
+import org.apache.pulsar.common.api.proto.SegmentInfoProto;
+import org.apache.pulsar.common.api.proto.SegmentState;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip tests for the {@code Commands.newScalableTopic*} factory methods:
+ * encode a command, reparse the serialized wire frame, and verify the fields
+ * survive the trip.
+ */
+public class CommandsScalableTopicTest {
+
+ /**
+ * Wire format emitted by {@link
Commands#serializeWithPrecalculatedSerializedSize}:
+ * {@code [4-byte TOTAL_SIZE][4-byte CMD_SIZE][CMD bytes]}. Parse it back
into a
+ * {@link BaseCommand} for assertions.
+ */
+ private static BaseCommand parseFrame(ByteBuf frame) {
+ try {
+ frame.skipBytes(4); // total size
+ int cmdSize = (int) frame.readUnsignedInt();
+ BaseCommand cmd = new BaseCommand();
+ cmd.parseFrom(frame, cmdSize);
+ // Materialize copies every field out of the backing buffer so
it's safe to
+ // release {@code frame} before the caller reads fields back.
+ cmd.materialize();
+ return cmd;
+ } finally {
+ frame.release();
+ }
+ }
+
+ @Test
+ public void testNewScalableTopicLookup() {
+ ByteBuf frame = Commands.newScalableTopicLookup(42L,
"topic://tenant/ns/my-scalable");
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_LOOKUP);
+ assertTrue(cmd.hasScalableTopicLookup());
+ assertEquals(cmd.getScalableTopicLookup().getSessionId(), 42L);
+ assertEquals(cmd.getScalableTopicLookup().getTopic(),
"topic://tenant/ns/my-scalable");
+ }
+
+ @Test
+ public void testNewScalableTopicClose() {
+ ByteBuf frame = Commands.newScalableTopicClose(99L);
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_CLOSE);
+ assertTrue(cmd.hasScalableTopicClose());
+ assertEquals(cmd.getScalableTopicClose().getSessionId(), 99L);
+ }
+
+ @Test
+ public void testNewScalableTopicUpdate() {
+ ScalableTopicDAG dag = new ScalableTopicDAG().setEpoch(7L);
+ SegmentInfoProto active = dag.addSegment()
+ .setSegmentId(0L)
+ .setHashStart(0x0000)
+ .setHashEnd(0x7FFF)
+ .setState(SegmentState.ACTIVE)
+ .setCreatedAtEpoch(0L);
+ active.addChildId(2L);
+ active.addChildId(3L);
+ dag.addSegment()
+ .setSegmentId(2L)
+ .setHashStart(0x0000)
+ .setHashEnd(0x3FFF)
+ .setState(SegmentState.ACTIVE)
+ .setCreatedAtEpoch(7L)
+ .addParentId(0L);
+
dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650");
+
+ ByteBuf frame = Commands.newScalableTopicUpdate(77L, dag);
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
+ assertTrue(cmd.hasScalableTopicUpdate());
+ assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 77L);
+ assertFalse(cmd.getScalableTopicUpdate().hasError(),
+ "successful update must not carry an error field");
+
+ ScalableTopicDAG got = cmd.getScalableTopicUpdate().getDag();
+ assertEquals(got.getEpoch(), 7L);
+ assertEquals(got.getSegmentsCount(), 2);
+ SegmentInfoProto parent = got.getSegmentAt(0);
+ assertEquals(parent.getSegmentId(), 0L);
+ assertEquals(parent.getState(), SegmentState.ACTIVE);
+ assertEquals(parent.getChildIdsCount(), 2);
+ assertEquals(parent.getChildIdAt(0), 2L);
+ assertEquals(parent.getChildIdAt(1), 3L);
+
+ SegmentInfoProto child = got.getSegmentAt(1);
+ assertEquals(child.getSegmentId(), 2L);
+ assertEquals(child.getHashStart(), 0x0000);
+ assertEquals(child.getHashEnd(), 0x3FFF);
+ assertEquals(child.getParentIdsCount(), 1);
+ assertEquals(child.getParentIdAt(0), 0L);
+
+ assertEquals(got.getSegmentBrokersCount(), 1);
+ assertEquals(got.getSegmentBrokerAt(0).getSegmentId(), 2L);
+ assertEquals(got.getSegmentBrokerAt(0).getBrokerUrl(),
"pulsar://broker-a:6650");
+ }
+
+ @Test
+ public void testNewScalableTopicError() {
+ ByteBuf frame = Commands.newScalableTopicError(15L,
ServerError.TopicNotFound,
+ "Scalable topic not found: topic://t/n/x");
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(), BaseCommand.Type.SCALABLE_TOPIC_UPDATE);
+ assertTrue(cmd.hasScalableTopicUpdate());
+ assertEquals(cmd.getScalableTopicUpdate().getSessionId(), 15L);
+ assertTrue(cmd.getScalableTopicUpdate().hasError());
+ assertEquals(cmd.getScalableTopicUpdate().getError(),
ServerError.TopicNotFound);
+ assertEquals(cmd.getScalableTopicUpdate().getMessage(),
+ "Scalable topic not found: topic://t/n/x");
+ }
+
+ @Test
+ public void testNewScalableTopicSubscribeResponseSuccess() {
+ ScalableConsumerAssignment assignment = new
ScalableConsumerAssignment().setLayoutEpoch(3L);
+ assignment.addSegment()
+ .setSegmentId(2L)
+ .setHashStart(0x0000)
+ .setHashEnd(0x3FFF)
+
.setSegmentTopic("persistent://tenant/ns/my-scalable-0000-3fff-0000000000000002");
+ assignment.addSegment()
+ .setSegmentId(3L)
+ .setHashStart(0x4000)
+ .setHashEnd(0x7FFF)
+
.setSegmentTopic("persistent://tenant/ns/my-scalable-4000-7fff-0000000000000003");
+
+ ByteBuf frame = Commands.newScalableTopicSubscribeResponse(123L,
assignment);
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(),
BaseCommand.Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+ assertTrue(cmd.hasScalableTopicSubscribeResponse());
+ assertEquals(cmd.getScalableTopicSubscribeResponse().getRequestId(),
123L);
+ assertFalse(cmd.getScalableTopicSubscribeResponse().hasError());
+
+ ScalableConsumerAssignment got =
cmd.getScalableTopicSubscribeResponse().getAssignment();
+ assertEquals(got.getLayoutEpoch(), 3L);
+ assertEquals(got.getSegmentsCount(), 2);
+ assertEquals(got.getSegmentAt(0).getSegmentId(), 2L);
+ assertEquals(got.getSegmentAt(0).getHashStart(), 0x0000);
+ assertEquals(got.getSegmentAt(0).getHashEnd(), 0x3FFF);
+ assertEquals(got.getSegmentAt(0).getSegmentTopic(),
+
"persistent://tenant/ns/my-scalable-0000-3fff-0000000000000002");
+ assertEquals(got.getSegmentAt(1).getSegmentId(), 3L);
+ }
+
+ @Test
+ public void testNewScalableTopicSubscribeError() {
+ ByteBuf frame = Commands.newScalableTopicSubscribeError(456L,
+ ServerError.AuthorizationError, "not authorized");
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(),
BaseCommand.Type.SCALABLE_TOPIC_SUBSCRIBE_RESPONSE);
+ assertTrue(cmd.hasScalableTopicSubscribeResponse());
+ assertEquals(cmd.getScalableTopicSubscribeResponse().getRequestId(),
456L);
+ assertTrue(cmd.getScalableTopicSubscribeResponse().hasError());
+ assertEquals(cmd.getScalableTopicSubscribeResponse().getError(),
ServerError.AuthorizationError);
+ assertEquals(cmd.getScalableTopicSubscribeResponse().getMessage(),
"not authorized");
+ }
+
+ @Test
+ public void testNewScalableTopicAssignmentUpdate() {
+ ScalableConsumerAssignment assignment = new
ScalableConsumerAssignment().setLayoutEpoch(11L);
+ assignment.addSegment()
+ .setSegmentId(5L)
+ .setHashStart(0x8000)
+ .setHashEnd(0xBFFF)
+ .setSegmentTopic("persistent://t/n/seg-5");
+
+ ByteBuf frame = Commands.newScalableTopicAssignmentUpdate(789L,
assignment);
+ BaseCommand cmd = parseFrame(frame);
+
+ assertEquals(cmd.getType(),
BaseCommand.Type.SCALABLE_TOPIC_ASSIGNMENT_UPDATE);
+ assertTrue(cmd.hasScalableTopicAssignmentUpdate());
+ assertEquals(cmd.getScalableTopicAssignmentUpdate().getConsumerId(),
789L);
+
+ ScalableConsumerAssignment got =
cmd.getScalableTopicAssignmentUpdate().getAssignment();
+ assertNotNull(got);
+ assertEquals(got.getLayoutEpoch(), 11L);
+ assertEquals(got.getSegmentsCount(), 1);
+ assertEquals(got.getSegmentAt(0).getSegmentId(), 5L);
+ assertEquals(got.getSegmentAt(0).getHashStart(), 0x8000);
+ assertEquals(got.getSegmentAt(0).getHashEnd(), 0xBFFF);
+ assertEquals(got.getSegmentAt(0).getSegmentTopic(),
"persistent://t/n/seg-5");
+ }
+
+ /**
+ * Different session IDs must not collide — the wire layout must preserve
them
+ * independently across successive calls.
+ */
+ @Test
+ public void testSessionIdIsolation() {
+ BaseCommand a = parseFrame(Commands.newScalableTopicLookup(1L, "a"));
+ BaseCommand b =
parseFrame(Commands.newScalableTopicLookup(Long.MAX_VALUE, "b"));
+ assertEquals(a.getScalableTopicLookup().getSessionId(), 1L);
+ assertEquals(b.getScalableTopicLookup().getSessionId(),
Long.MAX_VALUE);
+ assertEquals(a.getScalableTopicLookup().getTopic(), "a");
+ assertEquals(b.getScalableTopicLookup().getTopic(), "b");
+ }
+}