This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 527efb6e47c [feat] PIP-468: Add ScalableTopicController and broker 
infrastructure (#25559)
527efb6e47c is described below

commit 527efb6e47cb3afecf2b4aad28c70100782c4edc
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 24 12:49:51 2026 -0700

    [feat] PIP-468: Add ScalableTopicController and broker infrastructure 
(#25559)
---
 .../broker/resources/ConsumerRegistration.java     |  38 +++
 .../pulsar/broker/resources/PulsarResources.java   |   4 +
 .../broker/resources/ScalableTopicMetadata.java    |  54 ++++
 .../broker/resources/ScalableTopicResources.java   | 222 +++++++++++++
 .../broker/resources/SubscriptionMetadata.java     |  34 ++
 .../pulsar/broker/resources/SubscriptionType.java  |  41 +++
 .../pulsar/broker/service/BrokerService.java       |  14 +-
 .../service/scalable/ConsumerAssignment.java       |  46 +++
 .../broker/service/scalable/ConsumerSession.java   | 199 ++++++++++++
 .../service/scalable/ScalableTopicController.java  | 274 +++++++++++++++++
 .../scalable/ScalableTopicLayoutResponse.java      |  45 +++
 .../service/scalable/ScalableTopicService.java     | 218 +++++++++++++
 .../broker/service/scalable/SegmentLayout.java     | 269 ++++++++++++++++
 .../service/scalable/SubscriptionCoordinator.java  | 342 +++++++++++++++++++++
 .../broker/service/scalable/package-info.java      |  19 ++
 .../broker/service/scalable/SegmentLayoutTest.java | 250 +++++++++++++++
 .../scalable/SubscriptionCoordinatorTest.java      | 266 ++++++++++++++++
 17 files changed, 2333 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ConsumerRegistration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ConsumerRegistration.java
new file mode 100644
index 00000000000..665efd90c97
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ConsumerRegistration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+/**
+ * Persisted marker of a consumer registered against a subscription on a 
scalable topic.
+ *
+ * <p>The znode <em>existence</em> is the durable session — it survives TCP 
disconnects,
+ * client restarts, and controller leader failovers. The consumer name lives 
in the znode
+ * path
+ * ({@code 
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}})
+ * so no fields are stored in the value.
+ *
+ * <p>Keep-alive state (connected/disconnected, grace-period timer) is 
<em>not</em>
+ * persisted — it is tracked in-memory by the controller leader only. A new 
leader taking
+ * over reads these entries and starts a fresh grace-period timer for each 
consumer.
+ *
+ * <p>The current segment assignment is <em>not</em> persisted either: since 
assignment is
+ * a pure function of the sorted consumer names and the current segment 
layout, the new
+ * leader recomputes it deterministically after loading the registrations.
+ */
+public record ConsumerRegistration() {}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index 7b89fe69194..db5a8db549c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -52,6 +52,8 @@ public class PulsarResources {
     @Getter
     private final LoadBalanceResources loadBalanceResources;
     @Getter
+    private final ScalableTopicResources scalableTopicResources;
+    @Getter
     private final Optional<MetadataStoreExtended> localMetadataStore;
     @Getter
     private final Optional<MetadataStore> configurationMetadataStore;
@@ -87,6 +89,7 @@ public class PulsarResources {
             bookieResources = new BookieResources(localMetadataStore, 
operationTimeoutSec);
             topicResources = new TopicResources(localMetadataStore);
             loadBalanceResources = new 
LoadBalanceResources(localMetadataStore, operationTimeoutSec);
+            scalableTopicResources = new 
ScalableTopicResources(localMetadataStore, operationTimeoutSec);
         } else {
             dynamicConfigResources = null;
             localPolicies = null;
@@ -94,6 +97,7 @@ public class PulsarResources {
             bookieResources = null;
             topicResources = null;
             loadBalanceResources = null;
+            scalableTopicResources = null;
         }
 
         this.localMetadataStore = Optional.ofNullable(localMetadataStore);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
new file mode 100644
index 00000000000..8af85231651
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * Metadata for a scalable topic, stored in the metadata store at
+ * {@code /topics/{tenant}/{namespace}/{topic}}.
+ *
+ * <p>The segments map represents the full DAG of the topic's segment history.
+ * The epoch is incremented on every layout change (split/merge/prune).
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ScalableTopicMetadata {
+    /** Incremented on every layout change (split/merge/prune). */
+    private long epoch;
+
+    /** Next available segment ID (monotonically increasing). */
+    private long nextSegmentId;
+
+    /** All segments: active + historical (keyed by segmentId). */
+    @Builder.Default
+    private Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+
+    /** User-defined topic properties. */
+    @Builder.Default
+    private Map<String, String> properties = Map.of();
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
new file mode 100644
index 00000000000..235f17e2d31
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.CustomLog;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+
+/**
+ * Metadata store access for scalable topic metadata.
+ *
+ * <p>Paths:
+ * <ul>
+ *   <li>{@code /topics/{tenant}/{namespace}/{encodedTopicName}} — {@link 
ScalableTopicMetadata}
+ *       (segment DAG and global topic state)</li>
+ *   <li>{@code /topics/{tenant}/{namespace}/{encodedTopicName}/controller} — 
controller leader
+ *       lock (ephemeral, broker URL as value)</li>
+ *   <li>{@code 
/topics/{tenant}/{namespace}/{encodedTopicName}/subscriptions/{subscription}} —
+ *       {@link SubscriptionMetadata}</li>
+ *   <li>{@code 
/topics/{tenant}/{namespace}/{encodedTopicName}/subscriptions/{subscription}
+ *       /consumers/{consumerName}} — {@link ConsumerRegistration} (durable 
session entry)</li>
+ * </ul>
+ */
+@CustomLog
+public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata> {
+
+    private static final String SCALABLE_TOPIC_PATH = "/topics";
+    private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
+    private static final String CONSUMERS_SEGMENT = "consumers";
+
+    private final MetadataCache<SubscriptionMetadata> subscriptionCache;
+    private final MetadataCache<ConsumerRegistration> 
consumerRegistrationCache;
+
+    public ScalableTopicResources(MetadataStore store, int 
operationTimeoutSec) {
+        super(store, ScalableTopicMetadata.class, operationTimeoutSec);
+        this.subscriptionCache = 
store.getMetadataCache(SubscriptionMetadata.class);
+        this.consumerRegistrationCache = 
store.getMetadataCache(ConsumerRegistration.class);
+    }
+
+    public CompletableFuture<Void> createScalableTopicAsync(TopicName tn, 
ScalableTopicMetadata metadata) {
+        return createAsync(topicPath(tn), metadata);
+    }
+
+    public CompletableFuture<Optional<ScalableTopicMetadata>> 
getScalableTopicMetadataAsync(TopicName tn) {
+        return getAsync(topicPath(tn));
+    }
+
+    public CompletableFuture<Optional<ScalableTopicMetadata>> 
getScalableTopicMetadataAsync(TopicName tn,
+                                                                               
              boolean refresh) {
+        if (refresh) {
+            return refreshAndGetAsync(topicPath(tn));
+        }
+        return getAsync(topicPath(tn));
+    }
+
+    public CompletableFuture<Void> updateScalableTopicAsync(TopicName tn,
+                                                             
Function<ScalableTopicMetadata,
+                                                                     
ScalableTopicMetadata> updateFunction) {
+        return setAsync(topicPath(tn), updateFunction);
+    }
+
+    public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
+        return deleteAsync(topicPath(tn));
+    }
+
+    public CompletableFuture<Boolean> scalableTopicExistsAsync(TopicName tn) {
+        return existsAsync(topicPath(tn));
+    }
+
+    public CompletableFuture<List<String>> 
listScalableTopicsAsync(NamespaceName ns) {
+        return getChildrenAsync(joinPath(SCALABLE_TOPIC_PATH, ns.toString()))
+                .thenApply(list -> list.stream()
+                        .map(encoded -> TopicName.get("topic", ns, 
Codec.decode(encoded)).toString())
+                        .collect(Collectors.toList()));
+    }
+
+    // --- Subscriptions ---
+
+    /**
+     * Create a subscription record. Fails if it already exists.
+     */
+    public CompletableFuture<Void> createSubscriptionAsync(TopicName tn, 
String subscription,
+                                                            SubscriptionType 
type) {
+        return subscriptionCache.create(subscriptionPath(tn, subscription),
+                new SubscriptionMetadata(type));
+    }
+
+    public CompletableFuture<Optional<SubscriptionMetadata>> 
getSubscriptionAsync(TopicName tn, String subscription) {
+        return subscriptionCache.get(subscriptionPath(tn, subscription));
+    }
+
+    public CompletableFuture<Boolean> subscriptionExistsAsync(TopicName tn, 
String subscription) {
+        return subscriptionCache.exists(subscriptionPath(tn, subscription));
+    }
+
+    /**
+     * Delete a subscription and all its consumer registration children. 
Best-effort on
+     * children — a missing child is ignored.
+     */
+    public CompletableFuture<Void> deleteSubscriptionAsync(TopicName tn, 
String subscription) {
+        String subPath = subscriptionPath(tn, subscription);
+        String consumersPath = joinPath(subPath, CONSUMERS_SEGMENT);
+        // Delete all consumer children first, then the consumers dir, then 
the subscription
+        return subscriptionCache.getChildren(consumersPath)
+                .thenCompose(children -> {
+                    if (children == null || children.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    CompletableFuture<?>[] futs = children.stream()
+                            .map(c -> consumerRegistrationCache
+                                    .delete(joinPath(consumersPath, c))
+                                    .exceptionally(ignoreMissing()))
+                            .toArray(CompletableFuture[]::new);
+                    return CompletableFuture.allOf(futs);
+                })
+                .thenCompose(__ -> subscriptionCache.delete(subPath)
+                        .exceptionally(ignoreMissing()));
+    }
+
+    public CompletableFuture<List<String>> listSubscriptionsAsync(TopicName 
tn) {
+        return subscriptionCache.getChildren(joinPath(topicPath(tn), 
SUBSCRIPTIONS_SEGMENT))
+                .thenApply(list -> list == null ? List.of() : list);
+    }
+
+    // --- Consumer registrations ---
+
+    /**
+     * Persist a consumer registration under a subscription. This is the 
durable session
+     * entry — once written, the consumer's segment assignment survives 
controller leader
+     * failover, client restarts, and TCP disconnects within the session grace 
period.
+     *
+     * <p>Idempotent: if the registration already exists, this completes 
successfully without
+     * overwriting it. Used by the controller leader on consumer register.
+     */
+    public CompletableFuture<Void> registerConsumerAsync(TopicName tn, String 
subscription, String consumerName) {
+        String path = consumerPath(tn, subscription, consumerName);
+        return consumerRegistrationCache.create(path, new 
ConsumerRegistration())
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof 
MetadataStoreException.AlreadyExistsException) {
+                        // Already registered — treat as success.
+                        return null;
+                    }
+                    throw FutureUtil.wrapToCompletionException(cause);
+                });
+    }
+
+    /**
+     * Remove a persisted consumer registration. Missing entries are ignored.
+     */
+    public CompletableFuture<Void> unregisterConsumerAsync(TopicName tn, 
String subscription, String consumerName) {
+        return consumerRegistrationCache.delete(consumerPath(tn, subscription, 
consumerName))
+                .exceptionally(ignoreMissing());
+    }
+
+    /**
+     * List all persisted consumer names for a subscription. Used by the 
controller leader
+     * on initialize() / failover to restore the in-memory session state.
+     */
+    public CompletableFuture<List<String>> listConsumersAsync(TopicName tn, 
String subscription) {
+        return consumerRegistrationCache
+                .getChildren(joinPath(subscriptionPath(tn, subscription), 
CONSUMERS_SEGMENT))
+                .thenApply(list -> list == null ? List.of() : list);
+    }
+
+    // --- Paths ---
+
+    /**
+     * Get the metadata store path for the controller leader lock.
+     */
+    public String controllerLockPath(TopicName tn) {
+        return joinPath(topicPath(tn), "controller");
+    }
+
+    public String topicPath(TopicName tn) {
+        return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(), 
tn.getEncodedLocalName());
+    }
+
+    public String subscriptionPath(TopicName tn, String subscription) {
+        return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription);
+    }
+
+    public String consumerPath(TopicName tn, String subscription, String 
consumerName) {
+        return joinPath(subscriptionPath(tn, subscription), CONSUMERS_SEGMENT, 
consumerName);
+    }
+
+    private static <T> Function<Throwable, T> ignoreMissing() {
+        return ex -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            if (cause instanceof MetadataStoreException.NotFoundException) {
+                return null;
+            }
+            throw FutureUtil.wrapToCompletionException(cause);
+        };
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionMetadata.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionMetadata.java
new file mode 100644
index 00000000000..56ec27381cd
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionMetadata.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+/**
+ * Persisted subscription-level metadata for a scalable topic.
+ *
+ * <p>Stored at: {@code 
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}}
+ *
+ * <p>The subscription name is encoded in the znode path, so only 
subscription-kind info
+ * is carried in the value. The {@link SubscriptionType} tells the controller 
whether to
+ * coordinate segment-to-consumer assignment for this subscription.
+ *
+ * <p>The set of consumer registrations for this subscription is persisted as 
children
+ * of this node under {@code .../consumers/{consumerName}} — see
+ * {@link ConsumerRegistration}.
+ */
+public record SubscriptionMetadata(SubscriptionType type) {}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionType.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionType.java
new file mode 100644
index 00000000000..6cc39d12d7c
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+/**
+ * Kind of subscription on a scalable topic. Controls whether the 
scalable-topic
+ * controller coordinates segment-to-consumer assignment for the subscription, 
or whether
+ * consumers attach to every segment directly without controller involvement.
+ */
+public enum SubscriptionType {
+
+    /**
+     * Controller-managed: each active segment is assigned to exactly one 
consumer at a
+     * time to preserve per-segment ordering. Covers both {@code 
StreamConsumer} (ordered,
+     * cumulative ack) and {@code CheckpointConsumer} (unmanaged, for 
connectors).
+     */
+    STREAM,
+
+    /**
+     * Controller-bypassing: every consumer attaches directly to every segment 
(active
+     * and sealed); each segment-owning broker round-robins messages across 
the attached
+     * consumers. No ordering guarantee.
+     */
+    QUEUE
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 366e19dd7a5..7ecb6f2bdfd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -165,7 +165,6 @@ import 
org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
@@ -321,6 +320,9 @@ public class BrokerService implements Closeable {
     @Getter
     private final BundlesQuotas bundlesQuotas;
 
+    @Getter
+    private volatile 
org.apache.pulsar.broker.service.scalable.ScalableTopicService 
scalableTopicService;
+
     private PulsarChannelInitializer.Factory pulsarChannelInitFactory = 
PulsarChannelInitializer.DEFAULT_FACTORY;
 
     private final List<Channel> listenChannels = new ArrayList<>(2);
@@ -615,6 +617,14 @@ public class BrokerService implements Closeable {
         this.producerNameGenerator = new 
DistributedIdGenerator(pulsar.getCoordinationService(),
                 PRODUCER_NAME_GENERATOR_PATH, 
pulsar.getConfiguration().getClusterName());
 
+        // Initialize scalable topic service
+        var scalableTopicResources = 
pulsar.getPulsarResources().getScalableTopicResources();
+        if (scalableTopicResources != null) {
+            this.scalableTopicService = new 
org.apache.pulsar.broker.service.scalable.ScalableTopicService(
+                    this, scalableTopicResources, 
pulsar.getCoordinationService());
+            this.scalableTopicService.start();
+        }
+
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
         List<BindAddress> bindAddresses = 
BindAddressValidator.validateBindAddresses(serviceConfig,
                 Arrays.asList("pulsar", "pulsar+ssl"));
@@ -1215,7 +1225,7 @@ public class BrokerService implements Closeable {
             if (tp != null) {
                 return tp;
             }
-            final boolean isPersistentTopic = 
topicName.getDomain().equals(TopicDomain.persistent);
+            final boolean isPersistentTopic = topicName.isPersistent();
             if (isPersistentTopic) {
                 if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
                     log.debug().attr("topic", topicName).log("Broker is unable 
to load persistent topic");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerAssignment.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerAssignment.java
new file mode 100644
index 00000000000..7615f64db98
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerAssignment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.List;
+import org.apache.pulsar.common.scalable.HashRange;
+
+/**
+ * Represents the set of segments assigned to a consumer by the controller.
+ *
+ * @param layoutEpoch      the layout epoch at the time of this assignment
+ * @param assignedSegments the segments assigned to this consumer
+ */
+public record ConsumerAssignment(
+        long layoutEpoch,
+        List<AssignedSegment> assignedSegments
+) {
+    public ConsumerAssignment {
+        assignedSegments = List.copyOf(assignedSegments);
+    }
+
+    /**
+     * A single segment assignment for a consumer.
+     */
+    public record AssignedSegment(
+            long segmentId,
+            HashRange hashRange,
+            String underlyingTopicName
+    ) {}
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
new file mode 100644
index 00000000000..3e798ca418b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.apache.pulsar.broker.service.TransportCnx;
+
+/**
+ * In-memory handle for a consumer registered with the controller leader.
+ *
+ * <p>Session identity is the stable {@code consumerName} chosen by the client.
+ * This class wraps the <em>durable</em> portion (persisted via
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) with the
+ * <em>transient</em> keep-alive state: the current transport connection, 
whether the
+ * consumer is currently connected, and — when disconnected — a grace-period 
timer that
+ * evicts the session if the consumer does not reconnect in time.
+ *
+ * <p>The keep-alive fields ({@code connected}, {@code consumerId}, {@code 
cnx},
+ * {@code graceTimer}) are <em>not</em> persisted; they live on the controller 
leader only
+ * and are reset to a "just disconnected" state when a new leader takes over.
+ *
+ * <p>Equality and hash are based on {@code consumerName} alone so that a 
reconnection
+ * with a new protocol-level {@code consumerId} resolves to the same session.
+ *
+ * <p>The grace-period timer is fully encapsulated: {@link 
#markDisconnected()} schedules
+ * it, {@link #attach(long, TransportCnx)} cancels it. The eviction action is 
supplied by
+ * the caller as a {@link Runnable} at construction, so the session doesn't 
need a
+ * back-reference to the coordinator.
+ */
+public class ConsumerSession {
+
+    @Getter
+    private final String consumerName;
+
+    /** Current protocol-level consumer ID. Changes on each reconnect. */
+    @Getter
+    private volatile long consumerId;
+
+    /** Current transport connection. Null when disconnected. */
+    @Getter
+    private volatile TransportCnx cnx;
+
+    /** Whether the consumer is currently connected. */
+    @Getter
+    private volatile boolean connected;
+
+    /** Scheduled eviction task, non-null only while disconnected and within 
grace period. */
+    @Getter
+    private volatile ScheduledFuture<?> graceTimer;
+
+    private final Duration gracePeriod;
+    private final ScheduledExecutorService scheduler;
+    private final Runnable onGraceExpiry;
+    private final Logger log;
+
+    public ConsumerSession(String consumerName,
+                           long consumerId,
+                           TransportCnx cnx,
+                           Duration gracePeriod,
+                           ScheduledExecutorService scheduler,
+                           Runnable onGraceExpiry,
+                           Logger parentLogger) {
+        this.consumerName = consumerName;
+        this.consumerId = consumerId;
+        this.cnx = cnx;
+        this.connected = cnx != null;
+        this.gracePeriod = gracePeriod;
+        this.scheduler = scheduler;
+        this.onGraceExpiry = onGraceExpiry;
+        this.log = Logger.get(ConsumerSession.class).with()
+                .ctx(parentLogger)
+                .attr("consumerName", consumerName)
+                .attr("consumerId", () -> this.consumerId)
+                .attr("connected", () -> this.connected)
+                .build();
+    }
+
+    /**
+     * Create a session for a consumer whose registration was loaded from the 
metadata store
+     * on controller leader failover. The returned session is in the "just 
disconnected"
+     * state with its grace-period timer already armed — if the consumer does 
not reconnect
+     * within the grace period the {@code onGraceExpiry} callback fires.
+     */
+    public static ConsumerSession restored(String consumerName,
+                                           Duration gracePeriod,
+                                           ScheduledExecutorService scheduler,
+                                           Runnable onGraceExpiry,
+                                           Logger parentLogger) {
+        ConsumerSession session = new ConsumerSession(consumerName, -1L, null,
+                gracePeriod, scheduler, onGraceExpiry, parentLogger);
+        session.startGraceTimer();
+        return session;
+    }
+
+    /**
+     * Attach a new transport connection to this session (reconnect path). 
Cancels any
+     * active grace timer and marks the session connected.
+     */
+    public synchronized void attach(long consumerId, TransportCnx cnx) {
+        this.consumerId = consumerId;
+        this.cnx = cnx;
+        this.connected = true;
+        cancelGraceTimer();
+    }
+
+    /**
+     * Mark the session as disconnected and start the grace-period timer. The 
eviction task
+     * (supplied at construction as {@code onGraceExpiry}) runs on the 
scheduler when the
+     * timer fires unless a reconnect arrives first via {@link #attach(long, 
TransportCnx)}.
+     */
+    public synchronized void markDisconnected() {
+        this.connected = false;
+        this.cnx = null;
+        log.info().attr("gracePeriodSeconds", gracePeriod.toSeconds())
+                .log("Consumer disconnected; starting grace period");
+        startGraceTimer();
+    }
+
+    /**
+     * Start (or restart) the grace-period eviction timer using the configured
+     * {@code onGraceExpiry} callback. Any previously-running timer is 
cancelled first.
+     *
+     * <p>Called from {@link #markDisconnected()} and from {@link #restored}.
+     */
+    private synchronized void startGraceTimer() {
+        setGraceTimer(scheduler.schedule(onGraceExpiry,
+                gracePeriod.toMillis(), TimeUnit.MILLISECONDS));
+    }
+
+    private synchronized void setGraceTimer(ScheduledFuture<?> timer) {
+        cancelGraceTimer();
+        this.graceTimer = timer;
+    }
+
+    /**
+     * Cancel any pending grace timer. Package-private so that the coordinator 
can cancel
+     * the timer when the consumer explicitly unregisters (in which case the 
session is
+     * removed from the coordinator's map and no eviction callback should 
fire).
+     */
+    synchronized void cancelGraceTimer() {
+        if (graceTimer != null) {
+            graceTimer.cancel(false);
+            graceTimer = null;
+        }
+    }
+
+    /**
+     * Push an assignment update to this consumer, if currently connected. The 
actual
+     * wire push is wired up in the protocol-commands commit; at this stage 
the method
+     * exists so {@link SubscriptionCoordinator} can call it.
+     */
+    public void sendAssignmentUpdate(ConsumerAssignment assignment) {
+        // no-op until the protocol-commands commit wires in the wire protocol
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ConsumerSession that = (ConsumerSession) o;
+        return Objects.equals(consumerName, that.consumerName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(consumerName);
+    }
+
+    @Override
+    public String toString() {
+        return "ConsumerSession{name=" + consumerName + ", connected=" + 
connected + "}";
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
new file mode 100644
index 00000000000..3ef0e6b9598
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+
+/**
+ * Per-topic coordinator that manages the segment layout and consumer 
assignments
+ * for a single scalable topic.
+ *
+ * <p>Only one instance of this controller runs across the cluster for a given 
topic,
+ * ensured by leader election via the metadata store. The leader stores its 
broker URL
+ * so that clients can discover and connect to it.
+ */
+@Slf4j
+public class ScalableTopicController {
+
+    @Getter
+    private final TopicName topicName;
+    private final ScalableTopicResources resources;
+    private final BrokerService brokerService;
+    private final LeaderElection<String> leaderElection;
+
+    private volatile SegmentLayout currentLayout;
+
+    /** Per-subscription consumer tracking. */
+    private final ConcurrentHashMap<String, SubscriptionCoordinator> 
subscriptions = new ConcurrentHashMap<>();
+
+    @Getter
+    private volatile LeaderElectionState leaderState = 
LeaderElectionState.NoLeader;
+
+    ScalableTopicController(TopicName topicName,
+                            ScalableTopicResources resources,
+                            BrokerService brokerService,
+                            LeaderElection<String> leaderElection) {
+        this.topicName = topicName;
+        this.resources = resources;
+        this.brokerService = brokerService;
+        this.leaderElection = leaderElection;
+    }
+
+    /**
+     * Initialize: load current layout from metadata store and attempt to 
become leader.
+     *
+     * <p>On successful election, also loads all persisted subscriptions and 
consumer
+     * registrations from the metadata store. Each restored consumer is 
installed in a
+     * "just disconnected" state with a fresh grace-period timer, so consumers 
that were
+     * registered under a previous leader will have the full grace window to 
reconnect to
+     * this new leader without losing their segment assignment.
+     */
+    public CompletableFuture<Void> initialize() {
+        return resources.getScalableTopicMetadataAsync(topicName, true)
+                .thenCompose(optMd -> {
+                    if (optMd.isEmpty()) {
+                        return CompletableFuture.failedFuture(
+                                new IllegalStateException("Scalable topic not 
found: " + topicName));
+                    }
+                    this.currentLayout = 
SegmentLayout.fromMetadata(optMd.get());
+                    return electLeader();
+                })
+                .thenCompose(__ -> {
+                    if (isLeader()) {
+                        return restoreSessionsFromStore();
+                    }
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
+    /**
+     * Load persisted subscriptions and consumer registrations from the 
metadata store and
+     * install them into per-subscription {@link SubscriptionCoordinator} 
instances. Called
+     * on successful leader election so the newly-elected leader can resume 
servicing
+     * consumers that were registered under a previous leader.
+     */
+    private CompletableFuture<Void> restoreSessionsFromStore() {
+        return resources.listSubscriptionsAsync(topicName)
+                .thenCompose(subNames -> {
+                    if (subNames.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    CompletableFuture<?>[] futures = subNames.stream()
+                            .map(this::restoreSubscription)
+                            .toArray(CompletableFuture[]::new);
+                    return CompletableFuture.allOf(futures);
+                });
+    }
+
+    private CompletableFuture<Void> restoreSubscription(String subscription) {
+        return resources.listConsumersAsync(topicName, subscription)
+                .thenAccept(consumerNames -> {
+                    SubscriptionCoordinator coordinator = 
subscriptions.computeIfAbsent(
+                            subscription, this::createCoordinator);
+                    coordinator.restoreConsumers(consumerNames);
+                    log.info("[{}] restored subscription {} with {} 
consumer(s)",
+                            topicName, subscription, consumerNames.size());
+                });
+    }
+
+    private SubscriptionCoordinator createCoordinator(String subscription) {
+        return new SubscriptionCoordinator(
+                subscription,
+                topicName,
+                currentLayout,
+                resources,
+                brokerService.getPulsar().getExecutor());
+    }
+
+    private CompletableFuture<Void> electLeader() {
+        // Store the brokerId as the leader-election value — not the raw 
pulsar:// URL.
+        // Callers that need a service URL (DagWatchSession for clients, the 
REST layer for
+        // HTTP redirection) look up the broker's advertised addresses via
+        // NamespaceService.createLookupResult(brokerId, ...), matching the 
pattern used by
+        // the cluster-leader redirection in NamespacesBase.
+        String brokerId = brokerService.getPulsar().getBrokerId();
+        return leaderElection.elect(brokerId)
+                .thenAccept(state -> {
+                    this.leaderState = state;
+                    log.info("Leader election for scalable topic {}: 
state={}", topicName, state);
+                });
+    }
+
+    public boolean isLeader() {
+        return leaderState == LeaderElectionState.Leading;
+    }
+
+    /**
+     * Get the current leader's brokerId (as stored in leader election). 
Callers resolve
+     * it to a service URL via
+     * {@link 
org.apache.pulsar.broker.namespace.NamespaceService#createLookupResult(String,
+     * boolean, String)}.
+     */
+    public CompletableFuture<Optional<String>> getLeaderBrokerId() {
+        return leaderElection.getLeaderValue();
+    }
+
+    // --- Layout operations (only valid on leader) ---
+
+    public CompletableFuture<SegmentLayout> getLayout() {
+        return CompletableFuture.completedFuture(currentLayout);
+    }
+
+    // --- Consumer management ---
+
+    /**
+     * Register a consumer for a subscription. The controller persists a 
durable session
+     * entry and returns the consumer's segment assignment.
+     *
+     * <p>If a session with the same {@code consumerName} already exists (for 
example
+     * because the consumer is reconnecting within the grace period), the 
existing
+     * assignment is reused and no rebalance occurs.
+     */
+    public CompletableFuture<ConsumerAssignment> registerConsumer(String 
subscription,
+                                                                   String 
consumerName,
+                                                                   long 
consumerId,
+                                                                   
TransportCnx cnx) {
+        checkLeader();
+        SubscriptionCoordinator coordinator = subscriptions.computeIfAbsent(
+                subscription, this::createCoordinator);
+        return coordinator.registerConsumer(consumerName, consumerId, cnx)
+                .thenApply(assignments -> {
+                    // Look up by name since the key may have been an existing 
session
+                    return assignments.entrySet().stream()
+                            .filter(e -> 
consumerName.equals(e.getKey().getConsumerName()))
+                            .map(Map.Entry::getValue)
+                            .findFirst()
+                            .orElse(null);
+                });
+    }
+
+    /**
+     * Explicit unregister: the consumer is leaving the subscription for good. 
Deletes the
+     * persisted session entry and rebalances remaining consumers.
+     */
+    public CompletableFuture<Void> unregisterConsumer(String subscription, 
String consumerName) {
+        checkLeader();
+        SubscriptionCoordinator coordinator = subscriptions.get(subscription);
+        if (coordinator == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return coordinator.unregisterConsumer(consumerName)
+                .thenAccept(__ -> {
+                    if (coordinator.getConsumers().isEmpty()) {
+                        subscriptions.remove(subscription);
+                    }
+                });
+    }
+
+    /**
+     * Called when a consumer's transport connection drops. Does <em>not</em> 
delete the
+     * persisted session — the coordinator marks the consumer disconnected and 
starts the
+     * grace-period timer. The consumer can reconnect within the grace period 
and resume
+     * with the same segment assignment.
+     */
+    public void onConsumerDisconnect(String subscription, String consumerName) 
{
+        SubscriptionCoordinator coordinator = subscriptions.get(subscription);
+        if (coordinator != null) {
+            coordinator.onConsumerDisconnect(consumerName);
+        }
+    }
+
+    // --- Lifecycle ---
+
+    public CompletableFuture<Void> close() {
+        subscriptions.clear();
+        return leaderElection.asyncClose();
+    }
+
+    // --- Internal helpers ---
+
+    private void checkLeader() {
+        if (!isLeader()) {
+            throw new IllegalStateException("This broker is not the leader for 
topic: " + topicName);
+        }
+    }
+
+    /**
+     * Create initial segment layout for a new scalable topic.
+     */
+    public static ScalableTopicMetadata createInitialMetadata(int 
numInitialSegments,
+                                                        Map<String, String> 
properties) {
+        if (numInitialSegments < 1) {
+            throw new IllegalArgumentException("Must have at least 1 segment");
+        }
+
+        int rangeSize = (HashRange.MAX_HASH + 1) / numInitialSegments;
+        Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+
+        for (int i = 0; i < numInitialSegments; i++) {
+            int start = i * rangeSize;
+            int end = (i == numInitialSegments - 1) ? HashRange.MAX_HASH : 
(start + rangeSize - 1);
+            HashRange range = HashRange.of(start, end);
+            SegmentInfo segment = SegmentInfo.active(i, range, 0);
+            segments.put((long) i, segment);
+        }
+
+        return ScalableTopicMetadata.builder()
+                .epoch(0)
+                .nextSegmentId(numInitialSegments)
+                .segments(segments)
+                .properties(properties != null ? properties : Map.of())
+                .build();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicLayoutResponse.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicLayoutResponse.java
new file mode 100644
index 00000000000..326a304b277
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicLayoutResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * Response sent to clients containing the full DAG state along with broker 
addresses
+ * for each active segment and the controller broker address.
+ *
+ * <p>This is used for both the initial lookup response and subsequent pushed 
updates
+ * via the DAG watch session.
+ *
+ * @param epoch                     the current layout epoch
+ * @param segments                  full DAG of segments (active + sealed)
+ * @param segmentBrokerAddresses    broker service URL for each active segment 
(segmentId to URL)
+ * @param segmentBrokerAddressesTls optional TLS broker URL for each active 
segment
+ * @param controllerBrokerUrl       broker running the ScalableTopicController
+ * @param controllerBrokerUrlTls    optional TLS URL for the controller broker
+ */
+public record ScalableTopicLayoutResponse(
+        long epoch,
+        Map<Long, SegmentInfo> segments,
+        Map<Long, String> segmentBrokerAddresses,
+        Map<Long, String> segmentBrokerAddressesTls,
+        String controllerBrokerUrl,
+        String controllerBrokerUrlTls
+) {}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
new file mode 100644
index 00000000000..c151df6e81b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+
+/**
+ * Central service managing all scalable topics on this broker.
+ *
+ * <p>Lifecycle is tied to {@link BrokerService}. This service handles:
+ * <ul>
+ *   <li>Creating and deleting scalable topics</li>
+ *   <li>Managing {@link ScalableTopicController} instances for topics this 
broker coordinates</li>
+ *   <li>Admin operations: split/merge</li>
+ * </ul>
+ */
+@Slf4j
+public class ScalableTopicService {
+
+    private final BrokerService brokerService;
+    private final ScalableTopicResources resources;
+    private final CoordinationService coordinationService;
+
+    /** Active controllers for topics this broker coordinates. */
+    private final ConcurrentHashMap<String, ScalableTopicController> 
controllers = new ConcurrentHashMap<>();
+
+    public ScalableTopicService(BrokerService brokerService,
+                                ScalableTopicResources resources,
+                                CoordinationService coordinationService) {
+        this.brokerService = brokerService;
+        this.resources = resources;
+        this.coordinationService = coordinationService;
+    }
+
+    // --- Lifecycle ---
+
+    public void start() {
+        log.info("ScalableTopicService started");
+    }
+
+    public void close() {
+        log.info("Closing ScalableTopicService, releasing {} controllers", 
controllers.size());
+        controllers.values().forEach(controller -> {
+            try {
+                controller.close().join();
+            } catch (Exception e) {
+                log.warn("Error closing controller for topic {}", 
controller.getTopicName(), e);
+            }
+        });
+        controllers.clear();
+    }
+
+    // --- Controller management ---
+
+    /**
+     * Get or create a controller for a scalable topic. The controller will 
attempt
+     * leader election; only the leader actively coordinates consumers.
+     */
+    public CompletableFuture<ScalableTopicController> 
getOrCreateController(TopicName topic) {
+        String key = topic.toString();
+        ScalableTopicController existing = controllers.get(key);
+        if (existing != null) {
+            return CompletableFuture.completedFuture(existing);
+        }
+
+        String lockPath = resources.controllerLockPath(topic);
+        LeaderElection<String> election = 
coordinationService.getLeaderElection(
+                String.class, lockPath, state -> onLeaderStateChange(topic, 
state));
+
+        ScalableTopicController controller = new ScalableTopicController(
+                topic, resources, brokerService, election);
+        controllers.put(key, controller);
+
+        return controller.initialize()
+                .thenApply(__ -> controller)
+                .exceptionally(ex -> {
+                    controllers.remove(key);
+                    throw new RuntimeException("Failed to initialize 
controller for " + topic, ex);
+                });
+    }
+
+    /**
+     * Release the controller for a topic (e.g., on topic unload).
+     */
+    public CompletableFuture<Void> releaseController(TopicName topic) {
+        ScalableTopicController controller = 
controllers.remove(topic.toString());
+        if (controller != null) {
+            return controller.close();
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    // --- Admin operations ---
+
+    /**
+     * Create a new scalable topic with the given number of initial segments.
+     */
+    public CompletableFuture<Void> createScalableTopic(TopicName topic, int 
numInitialSegments) {
+        return createScalableTopic(topic, numInitialSegments, Map.of());
+    }
+
+    public CompletableFuture<Void> createScalableTopic(TopicName topic, int 
numInitialSegments,
+                                                        Map<String, String> 
properties) {
+        if (topic.getDomain() != TopicDomain.topic) {
+            return CompletableFuture.failedFuture(
+                    new IllegalArgumentException("Expected topic domain, got: 
" + topic.getDomain()));
+        }
+        if (numInitialSegments < 1) {
+            return CompletableFuture.failedFuture(
+                    new IllegalArgumentException("numInitialSegments must be 
>= 1"));
+        }
+
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(
+                numInitialSegments, properties);
+
+        return resources.createScalableTopicAsync(topic, metadata)
+                .thenCompose(__ -> {
+                    // Create underlying persistent topics for each initial 
segment
+                    CompletableFuture<?>[] segmentFutures = 
metadata.getSegments().values().stream()
+                            .map(segment -> 
createUnderlyingSegmentTopic(topic, segment))
+                            .toArray(CompletableFuture[]::new);
+                    return CompletableFuture.allOf(segmentFutures);
+                });
+    }
+
+    /**
+     * Delete a scalable topic and all its segment topics.
+     */
+    public CompletableFuture<Void> deleteScalableTopic(TopicName topic) {
+        return releaseController(topic)
+                .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topic))
+                .thenCompose(optMd -> {
+                    if (optMd.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    ScalableTopicMetadata metadata = optMd.get();
+                    // Delete all underlying segment topics
+                    CompletableFuture<?>[] deleteFutures = 
metadata.getSegments().values().stream()
+                            .map(segment -> 
deleteUnderlyingSegmentTopic(topic, segment))
+                            .toArray(CompletableFuture[]::new);
+                    return CompletableFuture.allOf(deleteFutures);
+                })
+                .thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
+    }
+
+    // --- Internal helpers ---
+
+    private void onLeaderStateChange(TopicName topic, LeaderElectionState 
state) {
+        log.info("Leader state change for scalable topic {}: {}", topic, 
state);
+        if (state == LeaderElectionState.NoLeader) {
+            // Try to re-elect
+            ScalableTopicController controller = 
controllers.get(topic.toString());
+            if (controller != null) {
+                controller.initialize().exceptionally(ex -> {
+                    log.warn("Failed to re-elect for topic {}", topic, ex);
+                    return null;
+                });
+            }
+        }
+    }
+
+    private CompletableFuture<Void> createUnderlyingSegmentTopic(TopicName 
parentTopic, SegmentInfo segment) {
+        TopicName segmentTopic = SegmentTopicName.fromParent(
+                parentTopic, segment.hashRange(), segment.segmentId());
+        String persistentName = toPersistentName(segmentTopic);
+        return brokerService.getOrCreateTopic(persistentName)
+                .thenAccept(t -> log.info("Created segment topic: {}", 
persistentName));
+    }
+
+    private CompletableFuture<Void> deleteUnderlyingSegmentTopic(TopicName 
parentTopic, SegmentInfo segment) {
+        TopicName segmentTopic = SegmentTopicName.fromParent(
+                parentTopic, segment.hashRange(), segment.segmentId());
+        String persistentName = toPersistentName(segmentTopic);
+        return brokerService.deleteTopic(persistentName, true)
+                .exceptionally(ex -> {
+                    log.warn("Failed to delete segment topic {}: {}", 
persistentName, ex.getMessage());
+                    return null;
+                });
+    }
+
+    /**
+     * Convert a segment:// topic name to persistent:// for the underlying 
managed ledger topic.
+     */
+    private String toPersistentName(TopicName segmentTopic) {
+        return "persistent://" + segmentTopic.getTenant() + "/"
+                + segmentTopic.getNamespacePortion() + "/"
+                + segmentTopic.getLocalName();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
new file mode 100644
index 00000000000..5da17726b88
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * In-memory versioned view of a scalable topic's segment DAG.
+ *
+ * <p>This is an immutable snapshot. Mutation methods (split, merge, prune) 
return new instances
+ * with the updated state.
+ */
+public class SegmentLayout {
+    @Getter
+    private final long epoch;
+    @Getter
+    private final long nextSegmentId;
+    @Getter
+    private final Map<Long, SegmentInfo> allSegments;
+    @Getter
+    private final Map<Long, SegmentInfo> activeSegments;
+
+    public SegmentLayout(long epoch, long nextSegmentId, Map<Long, 
SegmentInfo> allSegments) {
+        this.epoch = epoch;
+        this.nextSegmentId = nextSegmentId;
+        this.allSegments = Collections.unmodifiableMap(new 
LinkedHashMap<>(allSegments));
+        this.activeSegments = allSegments.values().stream()
+                .filter(SegmentInfo::isActive)
+                .collect(Collectors.toUnmodifiableMap(SegmentInfo::segmentId, 
s -> s));
+    }
+
+    public static SegmentLayout fromMetadata(ScalableTopicMetadata metadata) {
+        return new SegmentLayout(metadata.getEpoch(), 
metadata.getNextSegmentId(), metadata.getSegments());
+    }
+
+    /**
+     * Find the active segment whose hash range contains the given hash value.
+     */
+    public SegmentInfo findActiveSegment(int hash) {
+        for (SegmentInfo segment : activeSegments.values()) {
+            if (segment.hashRange().contains(hash)) {
+                return segment;
+            }
+        }
+        throw new IllegalStateException("No active segment covers hash: " + 
hash);
+    }
+
+    /**
+     * Get the full lineage chain for a segment (ancestors + descendants).
+     */
+    public List<SegmentInfo> getLineage(long segmentId) {
+        List<SegmentInfo> lineage = new ArrayList<>();
+        collectAncestors(segmentId, lineage);
+        SegmentInfo self = allSegments.get(segmentId);
+        if (self != null) {
+            lineage.add(self);
+        }
+        collectDescendants(segmentId, lineage);
+        return lineage;
+    }
+
+    /**
+     * Get direct children of a segment.
+     */
+    public List<SegmentInfo> getChildren(long segmentId) {
+        SegmentInfo segment = allSegments.get(segmentId);
+        if (segment == null) {
+            return List.of();
+        }
+        return segment.childIds().stream()
+                .map(allSegments::get)
+                .filter(s -> s != null)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Get direct parents of a segment.
+     */
+    public List<SegmentInfo> getParents(long segmentId) {
+        SegmentInfo segment = allSegments.get(segmentId);
+        if (segment == null) {
+            return List.of();
+        }
+        return segment.parentIds().stream()
+                .map(allSegments::get)
+                .filter(s -> s != null)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Produce a new layout by splitting a segment at its midpoint.
+     *
+     * @param segmentId the active segment to split
+     * @return a new SegmentLayout with the split applied
+     */
+    public SegmentLayout splitSegment(long segmentId) {
+        SegmentInfo segment = allSegments.get(segmentId);
+        if (segment == null) {
+            throw new IllegalArgumentException("Segment not found: " + 
segmentId);
+        }
+        if (!segment.isActive()) {
+            throw new IllegalArgumentException("Cannot split non-active 
segment: " + segmentId);
+        }
+
+        HashRange[] splitRanges = segment.hashRange().split();
+        long newEpoch = epoch + 1;
+        long childId1 = nextSegmentId;
+        long childId2 = nextSegmentId + 1;
+
+        SegmentInfo sealedParent = segment.sealed(newEpoch, List.of(childId1, 
childId2));
+        SegmentInfo child1 = SegmentInfo.active(childId1, splitRanges[0], 
List.of(segmentId), newEpoch);
+        SegmentInfo child2 = SegmentInfo.active(childId2, splitRanges[1], 
List.of(segmentId), newEpoch);
+
+        Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
+        newSegments.put(segmentId, sealedParent);
+        newSegments.put(childId1, child1);
+        newSegments.put(childId2, child2);
+
+        return new SegmentLayout(newEpoch, nextSegmentId + 2, newSegments);
+    }
+
+    /**
+     * Produce a new layout by merging two adjacent active segments.
+     *
+     * @param segmentId1 the first segment (must be active and adjacent to 
segmentId2)
+     * @param segmentId2 the second segment (must be active and adjacent to 
segmentId1)
+     * @return a new SegmentLayout with the merge applied
+     */
+    public SegmentLayout mergeSegments(long segmentId1, long segmentId2) {
+        SegmentInfo seg1 = allSegments.get(segmentId1);
+        SegmentInfo seg2 = allSegments.get(segmentId2);
+        if (seg1 == null || seg2 == null) {
+            throw new IllegalArgumentException("Segment not found");
+        }
+        if (!seg1.isActive() || !seg2.isActive()) {
+            throw new IllegalArgumentException("Both segments must be active");
+        }
+        if (!seg1.hashRange().isAdjacentTo(seg2.hashRange())) {
+            throw new IllegalArgumentException("Segments are not adjacent: "
+                    + seg1.hashRange() + " and " + seg2.hashRange());
+        }
+
+        long newEpoch = epoch + 1;
+        long mergedId = nextSegmentId;
+        HashRange mergedRange = seg1.hashRange().merge(seg2.hashRange());
+
+        SegmentInfo sealed1 = seg1.sealed(newEpoch, List.of(mergedId));
+        SegmentInfo sealed2 = seg2.sealed(newEpoch, List.of(mergedId));
+        SegmentInfo merged = SegmentInfo.active(mergedId, mergedRange,
+                List.of(segmentId1, segmentId2), newEpoch);
+
+        Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
+        newSegments.put(segmentId1, sealed1);
+        newSegments.put(segmentId2, sealed2);
+        newSegments.put(mergedId, merged);
+
+        return new SegmentLayout(newEpoch, nextSegmentId + 1, newSegments);
+    }
+
+    /**
+     * Prune an expired segment from the DAG. The segment must be sealed and 
have no
+     * children that are still in the DAG (i.e., children have already been 
pruned or
+     * the segment is a leaf that was sealed).
+     *
+     * @param segmentId the segment to prune
+     * @return a new SegmentLayout with the segment removed
+     */
+    public SegmentLayout pruneSegment(long segmentId) {
+        SegmentInfo segment = allSegments.get(segmentId);
+        if (segment == null) {
+            throw new IllegalArgumentException("Segment not found: " + 
segmentId);
+        }
+        if (segment.isActive()) {
+            throw new IllegalArgumentException("Cannot prune an active 
segment: " + segmentId);
+        }
+
+        Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
+        newSegments.remove(segmentId);
+
+        // Remove this segment from its children's parent lists
+        for (long childId : segment.childIds()) {
+            SegmentInfo child = newSegments.get(childId);
+            if (child != null) {
+                List<Long> newParentIds = child.parentIds().stream()
+                        .filter(id -> id != segmentId)
+                        .collect(Collectors.toList());
+                newSegments.put(childId, child.withParentIds(newParentIds));
+            }
+        }
+
+        // Remove this segment from its parents' child lists
+        for (long parentId : segment.parentIds()) {
+            SegmentInfo parent = newSegments.get(parentId);
+            if (parent != null) {
+                List<Long> newChildIds = parent.childIds().stream()
+                        .filter(id -> id != segmentId)
+                        .collect(Collectors.toList());
+                newSegments.put(parentId, parent.withChildIds(newChildIds));
+            }
+        }
+
+        return new SegmentLayout(epoch + 1, nextSegmentId, newSegments);
+    }
+
+    /**
+     * Convert back to metadata for persistence.
+     */
+    public ScalableTopicMetadata toMetadata(Map<String, String> properties) {
+        return ScalableTopicMetadata.builder()
+                .epoch(epoch)
+                .nextSegmentId(nextSegmentId)
+                .segments(new LinkedHashMap<>(allSegments))
+                .properties(properties)
+                .build();
+    }
+
+    private void collectAncestors(long segmentId, List<SegmentInfo> result) {
+        SegmentInfo segment = allSegments.get(segmentId);
+        if (segment == null) {
+            return;
+        }
+        for (long parentId : segment.parentIds()) {
+            collectAncestors(parentId, result);
+            SegmentInfo parent = allSegments.get(parentId);
+            if (parent != null) {
+                result.add(parent);
+            }
+        }
+    }
+
+    private void collectDescendants(long segmentId, List<SegmentInfo> result) {
+        SegmentInfo segment = allSegments.get(segmentId);
+        if (segment == null) {
+            return;
+        }
+        for (long childId : segment.childIds()) {
+            SegmentInfo child = allSegments.get(childId);
+            if (child != null) {
+                result.add(child);
+            }
+            collectDescendants(childId, result);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
new file mode 100644
index 00000000000..ff55b7854ed
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+
+/**
+ * Manages segment-to-consumer assignments within a single subscription of a 
scalable topic.
+ *
+ * <p>Consumer sessions are persisted in the metadata store (as
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) and 
tracked in-memory
+ * as {@link ConsumerSession} objects. The distinction is important: the 
session <em>itself</em>
+ * is durable (survives TCP disconnects, client restarts, and controller 
leader failovers),
+ * but the keep-alive tracking (connected / grace-period timer) is in-memory 
only.
+ *
+ * <p>When a consumer's connection drops, the coordinator does <em>not</em> 
immediately evict
+ * it. Instead it marks the session disconnected and starts a grace-period 
timer. If the
+ * consumer reconnects (with the same {@code consumerName}) before the timer 
fires, its
+ * existing assignment is restored with no rebalance. If the timer fires, the 
persisted
+ * registration is deleted and a rebalance is triggered.
+ *
+ * <p>On controller leader failover, the new leader reloads persisted 
registrations via
+ * {@link #restoreConsumers(Collection)}, which installs them in a "just 
disconnected" state
+ * with fresh grace-period timers — giving every consumer the full window to 
reconnect to the
+ * new leader regardless of how long they had been disconnected under the old 
one.
+ */
+public class SubscriptionCoordinator {
+
+    private static final Logger LOG = 
Logger.get(SubscriptionCoordinator.class);
+    private final Logger log;
+
+    // TODO: make configurable via broker config (e.g. 
scalableTopicConsumerSessionTimeoutSeconds)
+    private static final Duration DEFAULT_GRACE_PERIOD = 
Duration.ofSeconds(60);
+
+    @Getter
+    private final String subscriptionName;
+    private final TopicName topicName;
+    private final ScalableTopicResources resources;
+    private final ScheduledExecutorService scheduler;
+    private final Duration gracePeriod;
+
+    /** Keyed by consumerName — the stable session identity. */
+    private final Map<String, ConsumerSession> sessions = new 
ConcurrentHashMap<>();
+    private Map<Long, ConsumerSession> segmentAssignments = new 
LinkedHashMap<>();
+    private SegmentLayout currentLayout;
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler) {
+        this(subscriptionName, topicName, initialLayout, resources, scheduler, 
DEFAULT_GRACE_PERIOD);
+    }
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler,
+                                   Duration gracePeriod) {
+        this.subscriptionName = subscriptionName;
+        this.topicName = topicName;
+        this.currentLayout = initialLayout;
+        this.resources = resources;
+        this.scheduler = scheduler;
+        this.gracePeriod = gracePeriod;
+        this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
+    }
+
+    // --- Register / unregister / reconnect ---
+
+    /**
+     * Register a consumer — either a fresh registration or a reconnect of an 
existing
+     * session. If the {@code consumerName} already has a persisted session, 
its assignment
+     * is preserved and the new connection is attached; otherwise the 
registration is
+     * persisted and a rebalance is triggered.
+     *
+     * @return assignment map for all consumers (unchanged on reconnect, 
recomputed on fresh register)
+     */
+    public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> registerConsumer(
+            String consumerName, long consumerId, TransportCnx cnx) {
+        ConsumerSession existing = sessions.get(consumerName);
+        if (existing != null) {
+            // Reconnect: attach the new connection, cancel any grace timer, 
and push the
+            // current assignment without rebalancing other consumers.
+            existing.attach(consumerId, cnx);
+            Map<ConsumerSession, ConsumerAssignment> current =
+                    computeAssignment(currentLayout, sessions.values());
+            ConsumerAssignment assignment = current.get(existing);
+            if (assignment != null) {
+                existing.sendAssignmentUpdate(assignment);
+            }
+            return CompletableFuture.completedFuture(current);
+        }
+
+        // Fresh registration — persist first, then install in-memory and 
rebalance.
+        ConsumerSession session = newSession(consumerName, consumerId, cnx);
+        return resources.registerConsumerAsync(topicName, subscriptionName, 
consumerName)
+                .thenApply(__ -> {
+                    synchronized (this) {
+                        sessions.put(consumerName, session);
+                        return rebalanceAndNotify();
+                    }
+                });
+    }
+
+    /**
+     * Explicit unregister (consumer asked to leave the subscription). Cancels 
any pending
+     * grace timer, deletes the persisted registration, and rebalances.
+     */
+    public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> unregisterConsumer(
+            String consumerName) {
+        ConsumerSession removed = sessions.remove(consumerName);
+        if (removed == null) {
+            return CompletableFuture.completedFuture(snapshotAssignments());
+        }
+        removed.cancelGraceTimer();
+        return resources.unregisterConsumerAsync(topicName, subscriptionName, 
consumerName)
+                .thenApply(__ -> {
+                    synchronized (this) {
+                        if (sessions.isEmpty()) {
+                            segmentAssignments.clear();
+                            return Map.of();
+                        }
+                        return rebalanceAndNotify();
+                    }
+                });
+    }
+
+    /**
+     * Called when a consumer's transport connection drops (not an explicit 
unregister).
+     * Marks the session disconnected and schedules an eviction task after the 
grace period.
+     * If the consumer reconnects with the same name before the timer fires, 
the timer is
+     * cancelled and no rebalance happens.
+     */
+    public synchronized void onConsumerDisconnect(String consumerName) {
+        ConsumerSession session = sessions.get(consumerName);
+        if (session == null || !session.isConnected()) {
+            return;
+        }
+        session.markDisconnected();
+    }
+
+    /**
+     * Restore consumer sessions loaded from the metadata store on controller 
leader failover.
+     * All restored sessions start in the "just disconnected" state with a 
fresh grace-period
+     * timer — consumers reconnecting within that window resume with the same 
assignment.
+     */
+    public synchronized Map<ConsumerSession, ConsumerAssignment> 
restoreConsumers(
+            Collection<String> persistedConsumerNames) {
+        for (String name : persistedConsumerNames) {
+            if (sessions.containsKey(name)) {
+                continue;
+            }
+            // restored() arms the grace timer internally. The eviction 
callback takes the
+            // coordinator's monitor, which we hold here, so ordering against 
the upcoming
+            // sessions.put is guaranteed.
+            ConsumerSession session = ConsumerSession.restored(name, 
gracePeriod, scheduler,
+                    () -> evictExpiredConsumer(name), log);
+            sessions.put(name, session);
+        }
+        // Compute the deterministic assignment against the current layout. No 
sends: the
+        // consumers aren't connected yet. They will receive their assignment 
on reconnect.
+        Map<ConsumerSession, ConsumerAssignment> result =
+                computeAssignment(currentLayout, sessions.values());
+        updateSegmentAssignmentIndex(result);
+        return result;
+    }
+
+    /**
+     * Handle a layout change (split/merge). Recompute and push assignments to 
connected
+     * consumers.
+     */
+    public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> onLayoutChange(
+            SegmentLayout newLayout) {
+        this.currentLayout = newLayout;
+        if (sessions.isEmpty()) {
+            segmentAssignments.clear();
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        return CompletableFuture.completedFuture(rebalanceAndNotify());
+    }
+
+    // --- Accessors ---
+
+    public synchronized Set<ConsumerSession> getConsumers() {
+        return Set.copyOf(sessions.values());
+    }
+
+    // --- Internals ---
+
+    /**
+     * Build a new {@link ConsumerSession} wired with this coordinator's grace 
period,
+     * scheduler, logger context, and eviction callback.
+     */
+    private ConsumerSession newSession(String consumerName, long consumerId, 
TransportCnx cnx) {
+        return new ConsumerSession(consumerName, consumerId, cnx, gracePeriod, 
scheduler,
+                () -> evictExpiredConsumer(consumerName), log);
+    }
+
+    /**
+     * Evict a consumer whose grace-period timer has fired. Runs on the 
scheduler thread.
+     */
+    private void evictExpiredConsumer(String consumerName) {
+        synchronized (this) {
+            ConsumerSession session = sessions.get(consumerName);
+            if (session == null) {
+                return;
+            }
+            if (session.isConnected()) {
+                // Raced with a reconnect — abort the eviction.
+                return;
+            }
+            sessions.remove(consumerName);
+            log.info().attr("consumer", consumerName)
+                    .log("Consumer evicted after grace period");
+        }
+        // Delete persisted registration outside the lock (async) and then 
rebalance.
+        resources.unregisterConsumerAsync(topicName, subscriptionName, 
consumerName)
+                .exceptionally(ex -> {
+                    log.warn().attr("consumer", consumerName).exception(ex)
+                            .log("Failed to delete persisted registration");
+                    return null;
+                })
+                .thenRun(() -> {
+                    synchronized (this) {
+                        if (!sessions.isEmpty()) {
+                            rebalanceAndNotify();
+                        } else {
+                            segmentAssignments.clear();
+                        }
+                    }
+                });
+    }
+
+    /**
+     * Compute a balanced assignment of active segments to consumers.
+     *
+     * <p>Strategy: sort segments by hash range start, sort consumers by name, 
then
+     * round-robin. Deterministic: the same inputs always produce the same 
output, so a new
+     * leader recomputing assignments after failover gets the same result as 
the old leader.
+     */
+    Map<ConsumerSession, ConsumerAssignment> computeAssignment(
+            SegmentLayout layout, Collection<ConsumerSession> consumers) {
+
+        if (consumers.isEmpty()) {
+            return Map.of();
+        }
+
+        List<SegmentInfo> sortedSegments = 
layout.getActiveSegments().values().stream()
+                .sorted(Comparator.comparing(SegmentInfo::hashRange))
+                .toList();
+
+        List<ConsumerSession> sortedConsumers = consumers.stream()
+                .sorted(Comparator.comparing(ConsumerSession::getConsumerName))
+                .toList();
+
+        Map<ConsumerSession, List<ConsumerAssignment.AssignedSegment>> 
assignmentLists =
+                new LinkedHashMap<>();
+        for (ConsumerSession consumer : sortedConsumers) {
+            assignmentLists.put(consumer, new ArrayList<>());
+        }
+
+        int consumerIndex = 0;
+        for (SegmentInfo segment : sortedSegments) {
+            ConsumerSession consumer = sortedConsumers.get(consumerIndex % 
sortedConsumers.size());
+            TopicName segmentTopic = SegmentTopicName.fromParent(topicName, 
segment.hashRange(),
+                    segment.segmentId());
+            assignmentLists.get(consumer).add(new 
ConsumerAssignment.AssignedSegment(
+                    segment.segmentId(),
+                    segment.hashRange(),
+                    segmentTopic.toString()
+            ));
+            consumerIndex++;
+        }
+
+        Map<ConsumerSession, ConsumerAssignment> result = new 
LinkedHashMap<>();
+        for (var entry : assignmentLists.entrySet()) {
+            result.put(entry.getKey(), new ConsumerAssignment(
+                    layout.getEpoch(), entry.getValue()));
+        }
+        return result;
+    }
+
+    private Map<ConsumerSession, ConsumerAssignment> rebalanceAndNotify() {
+        Map<ConsumerSession, ConsumerAssignment> assignments =
+                computeAssignment(currentLayout, sessions.values());
+        updateSegmentAssignmentIndex(assignments);
+
+        for (var entry : assignments.entrySet()) {
+            entry.getKey().sendAssignmentUpdate(entry.getValue());
+        }
+
+        return assignments;
+    }
+
+    private void updateSegmentAssignmentIndex(Map<ConsumerSession, 
ConsumerAssignment> assignments) {
+        segmentAssignments.clear();
+        for (var entry : assignments.entrySet()) {
+            for (ConsumerAssignment.AssignedSegment seg : 
entry.getValue().assignedSegments()) {
+                segmentAssignments.put(seg.segmentId(), entry.getKey());
+            }
+        }
+    }
+
+    private Map<ConsumerSession, ConsumerAssignment> snapshotAssignments() {
+        return computeAssignment(currentLayout, sessions.values());
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/package-info.java
new file mode 100644
index 00000000000..56c0050adc8
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
new file mode 100644
index 00000000000..e25066e4f1d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.testng.annotations.Test;
+
+public class SegmentLayoutTest {
+
+    @Test
+    public void testInitialLayout() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        assertEquals(layout.getEpoch(), 0);
+        assertEquals(layout.getActiveSegments().size(), 2);
+        assertEquals(layout.getAllSegments().size(), 2);
+
+        SegmentInfo seg0 = layout.getAllSegments().get(0L);
+        SegmentInfo seg1 = layout.getAllSegments().get(1L);
+        assertNotNull(seg0);
+        assertNotNull(seg1);
+
+        assertEquals(seg0.hashRange(), HashRange.of(0x0000, 0x7FFF));
+        assertEquals(seg1.hashRange(), HashRange.of(0x8000, 0xFFFF));
+        assertTrue(seg0.isActive());
+        assertTrue(seg1.isActive());
+    }
+
+    @Test
+    public void testSingleSegmentInitialLayout() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        assertEquals(layout.getActiveSegments().size(), 1);
+        SegmentInfo seg = layout.getAllSegments().get(0L);
+        assertEquals(seg.hashRange(), HashRange.full());
+    }
+
+    @Test
+    public void testFourSegmentInitialLayout() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(4, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        assertEquals(layout.getActiveSegments().size(), 4);
+        // Verify ranges cover the full space without gaps
+        int expectedStart = 0;
+        for (int i = 0; i < 4; i++) {
+            SegmentInfo seg = layout.getAllSegments().get((long) i);
+            assertEquals(seg.hashRange().start(), expectedStart);
+            expectedStart = seg.hashRange().end() + 1;
+        }
+        // Last segment should end at 0xFFFF
+        assertEquals(layout.getAllSegments().get(3L).hashRange().end(), 
0xFFFF);
+    }
+
+    @Test
+    public void testFindActiveSegment() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        SegmentInfo found = layout.findActiveSegment(0x1000);
+        assertEquals(found.segmentId(), 0);
+
+        found = layout.findActiveSegment(0x9000);
+        assertEquals(found.segmentId(), 1);
+    }
+
+    @Test
+    public void testSplitSegment() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        assertEquals(afterSplit.getEpoch(), 1);
+        assertEquals(afterSplit.getActiveSegments().size(), 3); // 1 original 
+ 2 new
+        assertEquals(afterSplit.getAllSegments().size(), 4); // 2 original + 2 
new
+
+        // Original segment should be sealed
+        SegmentInfo sealed = afterSplit.getAllSegments().get(0L);
+        assertTrue(sealed.isSealed());
+        assertEquals(sealed.childIds(), List.of(2L, 3L));
+
+        // New segments should be active
+        SegmentInfo child1 = afterSplit.getAllSegments().get(2L);
+        SegmentInfo child2 = afterSplit.getAllSegments().get(3L);
+        assertTrue(child1.isActive());
+        assertTrue(child2.isActive());
+        assertEquals(child1.parentIds(), List.of(0L));
+        assertEquals(child2.parentIds(), List.of(0L));
+
+        // Ranges should cover the original range
+        assertEquals(child1.hashRange(), HashRange.of(0x0000, 0x3FFF));
+        assertEquals(child2.hashRange(), HashRange.of(0x4000, 0x7FFF));
+    }
+
+    @Test
+    public void testSplitNonActiveSegment() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        // Segment 0 is now sealed, cannot split again
+        assertThrows(IllegalArgumentException.class, () -> 
afterSplit.splitSegment(0));
+    }
+
+    @Test
+    public void testMergeSegments() {
+        // Start with 2 segments, split seg-0, then merge the children back
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0); // seg-2 
[0000-3fff], seg-3 [4000-7fff]
+
+        SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3);
+
+        assertEquals(afterMerge.getEpoch(), 2);
+        assertEquals(afterMerge.getActiveSegments().size(), 2); // merged + 
seg-1
+
+        // Children should be sealed
+        assertTrue(afterMerge.getAllSegments().get(2L).isSealed());
+        assertTrue(afterMerge.getAllSegments().get(3L).isSealed());
+
+        // Merged segment should be active with original range
+        SegmentInfo merged = afterMerge.getAllSegments().get(4L);
+        assertTrue(merged.isActive());
+        assertEquals(merged.hashRange(), HashRange.of(0x0000, 0x7FFF));
+        assertEquals(merged.parentIds(), List.of(2L, 3L));
+    }
+
+    @Test
+    public void testMergeNonAdjacentSegments() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(4, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        // Segments 0 and 2 are not adjacent
+        assertThrows(IllegalArgumentException.class, () -> 
layout.mergeSegments(0, 2));
+    }
+
+    @Test
+    public void testPruneSegment() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        // Prune sealed segment 0
+        SegmentLayout afterPrune = afterSplit.pruneSegment(0);
+
+        assertFalse(afterPrune.getAllSegments().containsKey(0L));
+        // Children should have empty parent lists now
+        assertEquals(afterPrune.getAllSegments().get(2L).parentIds(), 
List.of());
+        assertEquals(afterPrune.getAllSegments().get(3L).parentIds(), 
List.of());
+    }
+
+    @Test
+    public void testCannotPruneActiveSegment() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        assertThrows(IllegalArgumentException.class, () -> 
layout.pruneSegment(0));
+    }
+
+    @Test
+    public void testGetChildren() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        List<SegmentInfo> children = afterSplit.getChildren(0);
+        assertEquals(children.size(), 2);
+        assertEquals(children.get(0).segmentId(), 1);
+        assertEquals(children.get(1).segmentId(), 2);
+    }
+
+    @Test
+    public void testGetParents() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        List<SegmentInfo> parents = afterSplit.getParents(1);
+        assertEquals(parents.size(), 1);
+        assertEquals(parents.get(0).segmentId(), 0);
+
+        // Root has no parents
+        assertEquals(afterSplit.getParents(0).size(), 0);
+    }
+
+    @Test
+    public void testGetLineage() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        List<SegmentInfo> lineage = afterSplit.getLineage(0);
+        // Lineage of seg-0: itself + its two children
+        assertEquals(lineage.size(), 3);
+    }
+
+    @Test
+    public void testToMetadata() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of("key", "value"));
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        SegmentLayout afterSplit = layout.splitSegment(0);
+
+        ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key", 
"value"));
+        assertEquals(restored.getEpoch(), 1);
+        assertEquals(restored.getNextSegmentId(), 4);
+        assertEquals(restored.getSegments().size(), 4);
+        assertEquals(restored.getProperties().get("key"), "value");
+    }
+
+    @Test
+    public void testNextSegmentIdAdvances() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+        assertEquals(layout.getNextSegmentId(), 2);
+
+        SegmentLayout split1 = layout.splitSegment(0);
+        assertEquals(split1.getNextSegmentId(), 4);
+
+        SegmentLayout split2 = split1.splitSegment(1);
+        assertEquals(split2.getNextSegmentId(), 6);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
new file mode 100644
index 00000000000..4de8ddb0a43
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SubscriptionCoordinatorTest {
+
+    private TopicName topicName;
+    private SegmentLayout initialLayout;
+    private ScalableTopicResources resources;
+    private ScheduledExecutorService scheduler;
+    private SubscriptionCoordinator coordinator;
+
+    @BeforeMethod
+    public void setup() {
+        topicName = TopicName.get("topic://tenant/ns/my-topic");
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(4, Map.of());
+        initialLayout = SegmentLayout.fromMetadata(metadata);
+        resources = mock(ScalableTopicResources.class);
+        // All persistence ops succeed
+        when(resources.registerConsumerAsync(any(), anyString(), anyString()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(resources.unregisterConsumerAsync(any(), anyString(), 
anyString()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        scheduler = Executors.newSingleThreadScheduledExecutor();
+        coordinator = new SubscriptionCoordinator("test-sub", topicName, 
initialLayout,
+                resources, scheduler, Duration.ofMillis(200));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        if (scheduler != null) {
+            scheduler.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testSingleConsumerGetsAllSegments() throws Exception {
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+
+        assertEquals(result.size(), 1);
+        ConsumerAssignment assignment = findByName(result, "consumer-1");
+        assertNotNull(assignment);
+        assertEquals(assignment.assignedSegments().size(), 4);
+        assertEquals(assignment.layoutEpoch(), 0);
+    }
+
+    @Test
+    public void testTwoConsumersBalanced() throws Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.registerConsumer("consumer-2", 2L, 
mock(TransportCnx.class)).get();
+
+        assertEquals(result.size(), 2);
+        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 2);
+        assertEquals(findByName(result, 
"consumer-2").assignedSegments().size(), 2);
+    }
+
+    @Test
+    public void testThreeConsumersWithFourSegments() throws Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+        coordinator.registerConsumer("consumer-2", 2L, 
mock(TransportCnx.class)).get();
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.registerConsumer("consumer-3", 3L, 
mock(TransportCnx.class)).get();
+
+        assertEquals(result.size(), 3);
+        int total = result.values().stream()
+                .mapToInt(a -> a.assignedSegments().size())
+                .sum();
+        assertEquals(total, 4);
+
+        Set<Long> assignedIds = new HashSet<>();
+        for (ConsumerAssignment assignment : result.values()) {
+            for (ConsumerAssignment.AssignedSegment seg : 
assignment.assignedSegments()) {
+                assertTrue(assignedIds.add(seg.segmentId()),
+                        "Segment " + seg.segmentId() + " assigned to multiple 
consumers");
+            }
+        }
+        assertEquals(assignedIds.size(), 4);
+    }
+
+    @Test
+    public void testUnregisterConsumerRebalances() throws Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+        coordinator.registerConsumer("consumer-2", 2L, 
mock(TransportCnx.class)).get();
+
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.unregisterConsumer("consumer-2").get();
+
+        assertEquals(result.size(), 1);
+        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 4);
+    }
+
+    @Test
+    public void testLayoutChangeRebalances() throws Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+
+        SegmentLayout newLayout = initialLayout.splitSegment(0);
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.onLayoutChange(newLayout).get();
+
+        assertEquals(result.size(), 1);
+        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 5);
+    }
+
+    @Test
+    public void testEmptyAfterAllConsumersRemoved() throws Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.unregisterConsumer("consumer-1").get();
+
+        assertTrue(result.isEmpty());
+        assertTrue(coordinator.getConsumers().isEmpty());
+    }
+
+    @Test
+    public void testAssignmentDeterminism() throws Exception {
+        coordinator.registerConsumer("consumer-a", 1L, 
mock(TransportCnx.class)).get();
+        Map<ConsumerSession, ConsumerAssignment> result1 =
+                coordinator.registerConsumer("consumer-b", 2L, 
mock(TransportCnx.class)).get();
+
+        SubscriptionCoordinator coordinator2 = new 
SubscriptionCoordinator("test-sub", topicName,
+                initialLayout, resources, scheduler, Duration.ofMillis(200));
+        coordinator2.registerConsumer("consumer-a", 1L, 
mock(TransportCnx.class)).get();
+        Map<ConsumerSession, ConsumerAssignment> result2 =
+                coordinator2.registerConsumer("consumer-b", 2L, 
mock(TransportCnx.class)).get();
+
+        assertEquals(segmentIds(findByName(result1, "consumer-a")),
+                segmentIds(findByName(result2, "consumer-a")));
+        assertEquals(segmentIds(findByName(result1, "consumer-b")),
+                segmentIds(findByName(result2, "consumer-b")));
+    }
+
+    // --- Session lifecycle tests ---
+
+    @Test
+    public void testReconnectWithinGracePeriodPreservesAssignment() throws 
Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+        coordinator.registerConsumer("consumer-2", 2L, 
mock(TransportCnx.class)).get();
+
+        // consumer-1 drops
+        coordinator.onConsumerDisconnect("consumer-1");
+        assertEquals(coordinator.getConsumers().size(), 2,
+                "session should still be tracked during grace period");
+
+        // Reconnect within the grace period with a new connection
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.registerConsumer("consumer-1", 99L, 
mock(TransportCnx.class)).get();
+
+        assertEquals(result.size(), 2);
+        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 2);
+        assertEquals(findByName(result, 
"consumer-2").assignedSegments().size(), 2);
+
+        // The session was not recreated — still 2 consumers, and consumer-1 
is reconnected
+        ConsumerSession reconnected = coordinator.getConsumers().stream()
+                .filter(s -> s.getConsumerName().equals("consumer-1"))
+                .findFirst().orElseThrow();
+        assertTrue(reconnected.isConnected());
+        assertEquals(reconnected.getConsumerId(), 99L);
+    }
+
+    @Test
+    public void testExpiredSessionIsEvictedAfterGracePeriod() {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).join();
+        coordinator.registerConsumer("consumer-2", 2L, 
mock(TransportCnx.class)).join();
+        assertEquals(coordinator.getConsumers().size(), 2);
+
+        coordinator.onConsumerDisconnect("consumer-1");
+
+        // Grace period is 200ms in tests — wait for eviction
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            Set<ConsumerSession> active = coordinator.getConsumers();
+            assertEquals(active.size(), 1);
+            assertEquals(active.iterator().next().getConsumerName(), 
"consumer-2");
+        });
+    }
+
+    @Test
+    public void testRestoreConsumersInstallsDisconnectedSessions() {
+        Map<ConsumerSession, ConsumerAssignment> assignments =
+                coordinator.restoreConsumers(List.of("consumer-1", 
"consumer-2"));
+
+        assertEquals(assignments.size(), 2);
+        assertEquals(coordinator.getConsumers().size(), 2);
+        // Restored sessions start in the disconnected state
+        for (ConsumerSession session : coordinator.getConsumers()) {
+            assertFalse(session.isConnected());
+            assertNotNull(session.getGraceTimer());
+        }
+    }
+
+    @Test
+    public void testRestoredConsumerResumesAssignmentOnReconnect() throws 
Exception {
+        coordinator.restoreConsumers(List.of("consumer-1", "consumer-2"));
+
+        // Reconnect consumer-1 — it should find its existing session and 
reuse the assignment
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.registerConsumer("consumer-1", 42L, 
mock(TransportCnx.class)).get();
+
+        assertEquals(result.size(), 2);
+        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 2);
+
+        ConsumerSession reconnected = coordinator.getConsumers().stream()
+                .filter(s -> s.getConsumerName().equals("consumer-1"))
+                .findFirst().orElseThrow();
+        assertTrue(reconnected.isConnected());
+    }
+
+    // --- Helpers ---
+
+    private static ConsumerAssignment findByName(Map<ConsumerSession, 
ConsumerAssignment> m, String name) {
+        return m.entrySet().stream()
+                .filter(e -> name.equals(e.getKey().getConsumerName()))
+                .map(Map.Entry::getValue)
+                .findFirst()
+                .orElse(null);
+    }
+
+    private static List<Long> segmentIds(ConsumerAssignment assignment) {
+        return assignment.assignedSegments().stream()
+                .map(ConsumerAssignment.AssignedSegment::segmentId)
+                .toList();
+    }
+}

Reply via email to