This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 527efb6e47c [feat] PIP-468: Add ScalableTopicController and broker
infrastructure (#25559)
527efb6e47c is described below
commit 527efb6e47cb3afecf2b4aad28c70100782c4edc
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 24 12:49:51 2026 -0700
[feat] PIP-468: Add ScalableTopicController and broker infrastructure
(#25559)
---
.../broker/resources/ConsumerRegistration.java | 38 +++
.../pulsar/broker/resources/PulsarResources.java | 4 +
.../broker/resources/ScalableTopicMetadata.java | 54 ++++
.../broker/resources/ScalableTopicResources.java | 222 +++++++++++++
.../broker/resources/SubscriptionMetadata.java | 34 ++
.../pulsar/broker/resources/SubscriptionType.java | 41 +++
.../pulsar/broker/service/BrokerService.java | 14 +-
.../service/scalable/ConsumerAssignment.java | 46 +++
.../broker/service/scalable/ConsumerSession.java | 199 ++++++++++++
.../service/scalable/ScalableTopicController.java | 274 +++++++++++++++++
.../scalable/ScalableTopicLayoutResponse.java | 45 +++
.../service/scalable/ScalableTopicService.java | 218 +++++++++++++
.../broker/service/scalable/SegmentLayout.java | 269 ++++++++++++++++
.../service/scalable/SubscriptionCoordinator.java | 342 +++++++++++++++++++++
.../broker/service/scalable/package-info.java | 19 ++
.../broker/service/scalable/SegmentLayoutTest.java | 250 +++++++++++++++
.../scalable/SubscriptionCoordinatorTest.java | 266 ++++++++++++++++
17 files changed, 2333 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ConsumerRegistration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ConsumerRegistration.java
new file mode 100644
index 00000000000..665efd90c97
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ConsumerRegistration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+/**
+ * Persisted marker of a consumer registered against a subscription on a
scalable topic.
+ *
+ * <p>The znode <em>existence</em> is the durable session — it survives TCP
disconnects,
+ * client restarts, and controller leader failovers. The consumer name lives
in the znode
+ * path
+ * ({@code
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}})
+ * so no fields are stored in the value.
+ *
+ * <p>Keep-alive state (connected/disconnected, grace-period timer) is
<em>not</em>
+ * persisted — it is tracked in-memory by the controller leader only. A new
leader taking
+ * over reads these entries and starts a fresh grace-period timer for each
consumer.
+ *
+ * <p>The current segment assignment is <em>not</em> persisted either: since
assignment is
+ * a pure function of the sorted consumer names and the current segment
layout, the new
+ * leader recomputes it deterministically after loading the registrations.
+ */
+public record ConsumerRegistration() {}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index 7b89fe69194..db5a8db549c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -52,6 +52,8 @@ public class PulsarResources {
@Getter
private final LoadBalanceResources loadBalanceResources;
@Getter
+ private final ScalableTopicResources scalableTopicResources;
+ @Getter
private final Optional<MetadataStoreExtended> localMetadataStore;
@Getter
private final Optional<MetadataStore> configurationMetadataStore;
@@ -87,6 +89,7 @@ public class PulsarResources {
bookieResources = new BookieResources(localMetadataStore,
operationTimeoutSec);
topicResources = new TopicResources(localMetadataStore);
loadBalanceResources = new
LoadBalanceResources(localMetadataStore, operationTimeoutSec);
+ scalableTopicResources = new
ScalableTopicResources(localMetadataStore, operationTimeoutSec);
} else {
dynamicConfigResources = null;
localPolicies = null;
@@ -94,6 +97,7 @@ public class PulsarResources {
bookieResources = null;
topicResources = null;
loadBalanceResources = null;
+ scalableTopicResources = null;
}
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
new file mode 100644
index 00000000000..8af85231651
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * Metadata for a scalable topic, stored in the metadata store at
+ * {@code /topics/{tenant}/{namespace}/{topic}}.
+ *
+ * <p>The segments map represents the full DAG of the topic's segment history.
+ * The epoch is incremented on every layout change (split/merge/prune).
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ScalableTopicMetadata {
+ /** Incremented on every layout change (split/merge/prune). */
+ private long epoch;
+
+ /** Next available segment ID (monotonically increasing). */
+ private long nextSegmentId;
+
+ /** All segments: active + historical (keyed by segmentId). */
+ @Builder.Default
+ private Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+
+ /** User-defined topic properties. */
+ @Builder.Default
+ private Map<String, String> properties = Map.of();
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
new file mode 100644
index 00000000000..235f17e2d31
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.CustomLog;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+
+/**
+ * Metadata store access for scalable topic metadata.
+ *
+ * <p>Paths:
+ * <ul>
+ * <li>{@code /topics/{tenant}/{namespace}/{encodedTopicName}} — {@link
ScalableTopicMetadata}
+ * (segment DAG and global topic state)</li>
+ * <li>{@code /topics/{tenant}/{namespace}/{encodedTopicName}/controller} —
controller leader
+ * lock (ephemeral, broker URL as value)</li>
+ * <li>{@code
/topics/{tenant}/{namespace}/{encodedTopicName}/subscriptions/{subscription}} —
+ * {@link SubscriptionMetadata}</li>
+ * <li>{@code
/topics/{tenant}/{namespace}/{encodedTopicName}/subscriptions/{subscription}
+ * /consumers/{consumerName}} — {@link ConsumerRegistration} (durable
session entry)</li>
+ * </ul>
+ */
+@CustomLog
+public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata> {
+
+ private static final String SCALABLE_TOPIC_PATH = "/topics";
+ private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
+ private static final String CONSUMERS_SEGMENT = "consumers";
+
+ private final MetadataCache<SubscriptionMetadata> subscriptionCache;
+ private final MetadataCache<ConsumerRegistration>
consumerRegistrationCache;
+
+ public ScalableTopicResources(MetadataStore store, int
operationTimeoutSec) {
+ super(store, ScalableTopicMetadata.class, operationTimeoutSec);
+ this.subscriptionCache =
store.getMetadataCache(SubscriptionMetadata.class);
+ this.consumerRegistrationCache =
store.getMetadataCache(ConsumerRegistration.class);
+ }
+
+ public CompletableFuture<Void> createScalableTopicAsync(TopicName tn,
ScalableTopicMetadata metadata) {
+ return createAsync(topicPath(tn), metadata);
+ }
+
+ public CompletableFuture<Optional<ScalableTopicMetadata>>
getScalableTopicMetadataAsync(TopicName tn) {
+ return getAsync(topicPath(tn));
+ }
+
+ public CompletableFuture<Optional<ScalableTopicMetadata>>
getScalableTopicMetadataAsync(TopicName tn,
+
boolean refresh) {
+ if (refresh) {
+ return refreshAndGetAsync(topicPath(tn));
+ }
+ return getAsync(topicPath(tn));
+ }
+
+ public CompletableFuture<Void> updateScalableTopicAsync(TopicName tn,
+
Function<ScalableTopicMetadata,
+
ScalableTopicMetadata> updateFunction) {
+ return setAsync(topicPath(tn), updateFunction);
+ }
+
+ public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
+ return deleteAsync(topicPath(tn));
+ }
+
+ public CompletableFuture<Boolean> scalableTopicExistsAsync(TopicName tn) {
+ return existsAsync(topicPath(tn));
+ }
+
+ public CompletableFuture<List<String>>
listScalableTopicsAsync(NamespaceName ns) {
+ return getChildrenAsync(joinPath(SCALABLE_TOPIC_PATH, ns.toString()))
+ .thenApply(list -> list.stream()
+ .map(encoded -> TopicName.get("topic", ns,
Codec.decode(encoded)).toString())
+ .collect(Collectors.toList()));
+ }
+
+ // --- Subscriptions ---
+
+ /**
+ * Create a subscription record. Fails if it already exists.
+ */
+ public CompletableFuture<Void> createSubscriptionAsync(TopicName tn,
String subscription,
+ SubscriptionType
type) {
+ return subscriptionCache.create(subscriptionPath(tn, subscription),
+ new SubscriptionMetadata(type));
+ }
+
+ public CompletableFuture<Optional<SubscriptionMetadata>>
getSubscriptionAsync(TopicName tn, String subscription) {
+ return subscriptionCache.get(subscriptionPath(tn, subscription));
+ }
+
+ public CompletableFuture<Boolean> subscriptionExistsAsync(TopicName tn,
String subscription) {
+ return subscriptionCache.exists(subscriptionPath(tn, subscription));
+ }
+
+ /**
+ * Delete a subscription and all its consumer registration children.
Best-effort on
+ * children — a missing child is ignored.
+ */
+ public CompletableFuture<Void> deleteSubscriptionAsync(TopicName tn,
String subscription) {
+ String subPath = subscriptionPath(tn, subscription);
+ String consumersPath = joinPath(subPath, CONSUMERS_SEGMENT);
+ // Delete all consumer children first, then the consumers dir, then
the subscription
+ return subscriptionCache.getChildren(consumersPath)
+ .thenCompose(children -> {
+ if (children == null || children.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<?>[] futs = children.stream()
+ .map(c -> consumerRegistrationCache
+ .delete(joinPath(consumersPath, c))
+ .exceptionally(ignoreMissing()))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(futs);
+ })
+ .thenCompose(__ -> subscriptionCache.delete(subPath)
+ .exceptionally(ignoreMissing()));
+ }
+
+ public CompletableFuture<List<String>> listSubscriptionsAsync(TopicName
tn) {
+ return subscriptionCache.getChildren(joinPath(topicPath(tn),
SUBSCRIPTIONS_SEGMENT))
+ .thenApply(list -> list == null ? List.of() : list);
+ }
+
+ // --- Consumer registrations ---
+
+ /**
+ * Persist a consumer registration under a subscription. This is the
durable session
+ * entry — once written, the consumer's segment assignment survives
controller leader
+ * failover, client restarts, and TCP disconnects within the session grace
period.
+ *
+ * <p>Idempotent: if the registration already exists, this completes
successfully without
+ * overwriting it. Used by the controller leader on consumer register.
+ */
+ public CompletableFuture<Void> registerConsumerAsync(TopicName tn, String
subscription, String consumerName) {
+ String path = consumerPath(tn, subscription, consumerName);
+ return consumerRegistrationCache.create(path, new
ConsumerRegistration())
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
MetadataStoreException.AlreadyExistsException) {
+ // Already registered — treat as success.
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(cause);
+ });
+ }
+
+ /**
+ * Remove a persisted consumer registration. Missing entries are ignored.
+ */
+ public CompletableFuture<Void> unregisterConsumerAsync(TopicName tn,
String subscription, String consumerName) {
+ return consumerRegistrationCache.delete(consumerPath(tn, subscription,
consumerName))
+ .exceptionally(ignoreMissing());
+ }
+
+ /**
+ * List all persisted consumer names for a subscription. Used by the
controller leader
+ * on initialize() / failover to restore the in-memory session state.
+ */
+ public CompletableFuture<List<String>> listConsumersAsync(TopicName tn,
String subscription) {
+ return consumerRegistrationCache
+ .getChildren(joinPath(subscriptionPath(tn, subscription),
CONSUMERS_SEGMENT))
+ .thenApply(list -> list == null ? List.of() : list);
+ }
+
+ // --- Paths ---
+
+ /**
+ * Get the metadata store path for the controller leader lock.
+ */
+ public String controllerLockPath(TopicName tn) {
+ return joinPath(topicPath(tn), "controller");
+ }
+
+ public String topicPath(TopicName tn) {
+ return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(),
tn.getEncodedLocalName());
+ }
+
+ public String subscriptionPath(TopicName tn, String subscription) {
+ return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription);
+ }
+
+ public String consumerPath(TopicName tn, String subscription, String
consumerName) {
+ return joinPath(subscriptionPath(tn, subscription), CONSUMERS_SEGMENT,
consumerName);
+ }
+
+ private static <T> Function<Throwable, T> ignoreMissing() {
+ return ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof MetadataStoreException.NotFoundException) {
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(cause);
+ };
+ }
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionMetadata.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionMetadata.java
new file mode 100644
index 00000000000..56ec27381cd
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionMetadata.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+/**
+ * Persisted subscription-level metadata for a scalable topic.
+ *
+ * <p>Stored at: {@code
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}}
+ *
+ * <p>The subscription name is encoded in the znode path, so only
subscription-kind info
+ * is carried in the value. The {@link SubscriptionType} tells the controller
whether to
+ * coordinate segment-to-consumer assignment for this subscription.
+ *
+ * <p>The set of consumer registrations for this subscription is persisted as
children
+ * of this node under {@code .../consumers/{consumerName}} — see
+ * {@link ConsumerRegistration}.
+ */
+public record SubscriptionMetadata(SubscriptionType type) {}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionType.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionType.java
new file mode 100644
index 00000000000..6cc39d12d7c
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/SubscriptionType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+/**
+ * Kind of subscription on a scalable topic. Controls whether the
scalable-topic
+ * controller coordinates segment-to-consumer assignment for the subscription,
or whether
+ * consumers attach to every segment directly without controller involvement.
+ */
+public enum SubscriptionType {
+
+ /**
+ * Controller-managed: each active segment is assigned to exactly one
consumer at a
+ * time to preserve per-segment ordering. Covers both {@code
StreamConsumer} (ordered,
+ * cumulative ack) and {@code CheckpointConsumer} (unmanaged, for
connectors).
+ */
+ STREAM,
+
+ /**
+ * Controller-bypassing: every consumer attaches directly to every segment
(active
+ * and sealed); each segment-owning broker round-robins messages across
the attached
+ * consumers. No ordering guarantee.
+ */
+ QUEUE
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 366e19dd7a5..7ecb6f2bdfd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -165,7 +165,6 @@ import
org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
@@ -321,6 +320,9 @@ public class BrokerService implements Closeable {
@Getter
private final BundlesQuotas bundlesQuotas;
+ @Getter
+ private volatile
org.apache.pulsar.broker.service.scalable.ScalableTopicService
scalableTopicService;
+
private PulsarChannelInitializer.Factory pulsarChannelInitFactory =
PulsarChannelInitializer.DEFAULT_FACTORY;
private final List<Channel> listenChannels = new ArrayList<>(2);
@@ -615,6 +617,14 @@ public class BrokerService implements Closeable {
this.producerNameGenerator = new
DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_NAME_GENERATOR_PATH,
pulsar.getConfiguration().getClusterName());
+ // Initialize scalable topic service
+ var scalableTopicResources =
pulsar.getPulsarResources().getScalableTopicResources();
+ if (scalableTopicResources != null) {
+ this.scalableTopicService = new
org.apache.pulsar.broker.service.scalable.ScalableTopicService(
+ this, scalableTopicResources,
pulsar.getCoordinationService());
+ this.scalableTopicService.start();
+ }
+
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
List<BindAddress> bindAddresses =
BindAddressValidator.validateBindAddresses(serviceConfig,
Arrays.asList("pulsar", "pulsar+ssl"));
@@ -1215,7 +1225,7 @@ public class BrokerService implements Closeable {
if (tp != null) {
return tp;
}
- final boolean isPersistentTopic =
topicName.getDomain().equals(TopicDomain.persistent);
+ final boolean isPersistentTopic = topicName.isPersistent();
if (isPersistentTopic) {
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
log.debug().attr("topic", topicName).log("Broker is unable
to load persistent topic");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerAssignment.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerAssignment.java
new file mode 100644
index 00000000000..7615f64db98
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerAssignment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.List;
+import org.apache.pulsar.common.scalable.HashRange;
+
+/**
+ * Represents the set of segments assigned to a consumer by the controller.
+ *
+ * @param layoutEpoch the layout epoch at the time of this assignment
+ * @param assignedSegments the segments assigned to this consumer
+ */
+public record ConsumerAssignment(
+ long layoutEpoch,
+ List<AssignedSegment> assignedSegments
+) {
+ public ConsumerAssignment {
+ assignedSegments = List.copyOf(assignedSegments);
+ }
+
+ /**
+ * A single segment assignment for a consumer.
+ */
+ public record AssignedSegment(
+ long segmentId,
+ HashRange hashRange,
+ String underlyingTopicName
+ ) {}
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
new file mode 100644
index 00000000000..3e798ca418b
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.apache.pulsar.broker.service.TransportCnx;
+
+/**
+ * In-memory handle for a consumer registered with the controller leader.
+ *
+ * <p>Session identity is the stable {@code consumerName} chosen by the client.
+ * This class wraps the <em>durable</em> portion (persisted via
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) with the
+ * <em>transient</em> keep-alive state: the current transport connection,
whether the
+ * consumer is currently connected, and — when disconnected — a grace-period
timer that
+ * evicts the session if the consumer does not reconnect in time.
+ *
+ * <p>The keep-alive fields ({@code connected}, {@code consumerId}, {@code
cnx},
+ * {@code graceTimer}) are <em>not</em> persisted; they live on the controller
leader only
+ * and are reset to a "just disconnected" state when a new leader takes over.
+ *
+ * <p>Equality and hash are based on {@code consumerName} alone so that a
reconnection
+ * with a new protocol-level {@code consumerId} resolves to the same session.
+ *
+ * <p>The grace-period timer is fully encapsulated: {@link
#markDisconnected()} schedules
+ * it, {@link #attach(long, TransportCnx)} cancels it. The eviction action is
supplied by
+ * the caller as a {@link Runnable} at construction, so the session doesn't
need a
+ * back-reference to the coordinator.
+ */
+public class ConsumerSession {
+
+ @Getter
+ private final String consumerName;
+
+ /** Current protocol-level consumer ID. Changes on each reconnect. */
+ @Getter
+ private volatile long consumerId;
+
+ /** Current transport connection. Null when disconnected. */
+ @Getter
+ private volatile TransportCnx cnx;
+
+ /** Whether the consumer is currently connected. */
+ @Getter
+ private volatile boolean connected;
+
+ /** Scheduled eviction task, non-null only while disconnected and within
grace period. */
+ @Getter
+ private volatile ScheduledFuture<?> graceTimer;
+
+ private final Duration gracePeriod;
+ private final ScheduledExecutorService scheduler;
+ private final Runnable onGraceExpiry;
+ private final Logger log;
+
+ public ConsumerSession(String consumerName,
+ long consumerId,
+ TransportCnx cnx,
+ Duration gracePeriod,
+ ScheduledExecutorService scheduler,
+ Runnable onGraceExpiry,
+ Logger parentLogger) {
+ this.consumerName = consumerName;
+ this.consumerId = consumerId;
+ this.cnx = cnx;
+ this.connected = cnx != null;
+ this.gracePeriod = gracePeriod;
+ this.scheduler = scheduler;
+ this.onGraceExpiry = onGraceExpiry;
+ this.log = Logger.get(ConsumerSession.class).with()
+ .ctx(parentLogger)
+ .attr("consumerName", consumerName)
+ .attr("consumerId", () -> this.consumerId)
+ .attr("connected", () -> this.connected)
+ .build();
+ }
+
+ /**
+ * Create a session for a consumer whose registration was loaded from the
metadata store
+ * on controller leader failover. The returned session is in the "just
disconnected"
+ * state with its grace-period timer already armed — if the consumer does
not reconnect
+ * within the grace period the {@code onGraceExpiry} callback fires.
+ */
+ public static ConsumerSession restored(String consumerName,
+ Duration gracePeriod,
+ ScheduledExecutorService scheduler,
+ Runnable onGraceExpiry,
+ Logger parentLogger) {
+ ConsumerSession session = new ConsumerSession(consumerName, -1L, null,
+ gracePeriod, scheduler, onGraceExpiry, parentLogger);
+ session.startGraceTimer();
+ return session;
+ }
+
+ /**
+ * Attach a new transport connection to this session (reconnect path).
Cancels any
+ * active grace timer and marks the session connected.
+ */
+ public synchronized void attach(long consumerId, TransportCnx cnx) {
+ this.consumerId = consumerId;
+ this.cnx = cnx;
+ this.connected = true;
+ cancelGraceTimer();
+ }
+
+ /**
+ * Mark the session as disconnected and start the grace-period timer. The
eviction task
+ * (supplied at construction as {@code onGraceExpiry}) runs on the
scheduler when the
+ * timer fires unless a reconnect arrives first via {@link #attach(long,
TransportCnx)}.
+ */
+ public synchronized void markDisconnected() {
+ this.connected = false;
+ this.cnx = null;
+ log.info().attr("gracePeriodSeconds", gracePeriod.toSeconds())
+ .log("Consumer disconnected; starting grace period");
+ startGraceTimer();
+ }
+
+ /**
+ * Start (or restart) the grace-period eviction timer using the configured
+ * {@code onGraceExpiry} callback. Any previously-running timer is
cancelled first.
+ *
+ * <p>Called from {@link #markDisconnected()} and from {@link #restored}.
+ */
+ private synchronized void startGraceTimer() {
+ setGraceTimer(scheduler.schedule(onGraceExpiry,
+ gracePeriod.toMillis(), TimeUnit.MILLISECONDS));
+ }
+
+ private synchronized void setGraceTimer(ScheduledFuture<?> timer) {
+ cancelGraceTimer();
+ this.graceTimer = timer;
+ }
+
+ /**
+ * Cancel any pending grace timer. Package-private so that the coordinator
can cancel
+ * the timer when the consumer explicitly unregisters (in which case the
session is
+ * removed from the coordinator's map and no eviction callback should
fire).
+ */
+ synchronized void cancelGraceTimer() {
+ if (graceTimer != null) {
+ graceTimer.cancel(false);
+ graceTimer = null;
+ }
+ }
+
+ /**
+ * Push an assignment update to this consumer, if currently connected. The
actual
+ * wire push is wired up in the protocol-commands commit; at this stage
the method
+ * exists so {@link SubscriptionCoordinator} can call it.
+ */
+ public void sendAssignmentUpdate(ConsumerAssignment assignment) {
+ // no-op until the protocol-commands commit wires in the wire protocol
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConsumerSession that = (ConsumerSession) o;
+ return Objects.equals(consumerName, that.consumerName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(consumerName);
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerSession{name=" + consumerName + ", connected=" +
connected + "}";
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
new file mode 100644
index 00000000000..3ef0e6b9598
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+
+/**
+ * Per-topic coordinator that manages the segment layout and consumer
assignments
+ * for a single scalable topic.
+ *
+ * <p>Only one instance of this controller runs across the cluster for a given
topic,
+ * ensured by leader election via the metadata store. The leader stores its
broker URL
+ * so that clients can discover and connect to it.
+ */
+@Slf4j
+public class ScalableTopicController {
+
+ @Getter
+ private final TopicName topicName;
+ private final ScalableTopicResources resources;
+ private final BrokerService brokerService;
+ private final LeaderElection<String> leaderElection;
+
+ private volatile SegmentLayout currentLayout;
+
+ /** Per-subscription consumer tracking. */
+ private final ConcurrentHashMap<String, SubscriptionCoordinator>
subscriptions = new ConcurrentHashMap<>();
+
+ @Getter
+ private volatile LeaderElectionState leaderState =
LeaderElectionState.NoLeader;
+
+ ScalableTopicController(TopicName topicName,
+ ScalableTopicResources resources,
+ BrokerService brokerService,
+ LeaderElection<String> leaderElection) {
+ this.topicName = topicName;
+ this.resources = resources;
+ this.brokerService = brokerService;
+ this.leaderElection = leaderElection;
+ }
+
+ /**
+ * Initialize: load current layout from metadata store and attempt to
become leader.
+ *
+ * <p>On successful election, also loads all persisted subscriptions and
consumer
+ * registrations from the metadata store. Each restored consumer is
installed in a
+ * "just disconnected" state with a fresh grace-period timer, so consumers
that were
+ * registered under a previous leader will have the full grace window to
reconnect to
+ * this new leader without losing their segment assignment.
+ */
+ public CompletableFuture<Void> initialize() {
+ return resources.getScalableTopicMetadataAsync(topicName, true)
+ .thenCompose(optMd -> {
+ if (optMd.isEmpty()) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("Scalable topic not
found: " + topicName));
+ }
+ this.currentLayout =
SegmentLayout.fromMetadata(optMd.get());
+ return electLeader();
+ })
+ .thenCompose(__ -> {
+ if (isLeader()) {
+ return restoreSessionsFromStore();
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /**
+ * Load persisted subscriptions and consumer registrations from the
metadata store and
+ * install them into per-subscription {@link SubscriptionCoordinator}
instances. Called
+ * on successful leader election so the newly-elected leader can resume
servicing
+ * consumers that were registered under a previous leader.
+ */
+ private CompletableFuture<Void> restoreSessionsFromStore() {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subNames -> {
+ if (subNames.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<?>[] futures = subNames.stream()
+ .map(this::restoreSubscription)
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(futures);
+ });
+ }
+
+ private CompletableFuture<Void> restoreSubscription(String subscription) {
+ return resources.listConsumersAsync(topicName, subscription)
+ .thenAccept(consumerNames -> {
+ SubscriptionCoordinator coordinator =
subscriptions.computeIfAbsent(
+ subscription, this::createCoordinator);
+ coordinator.restoreConsumers(consumerNames);
+ log.info("[{}] restored subscription {} with {}
consumer(s)",
+ topicName, subscription, consumerNames.size());
+ });
+ }
+
+ private SubscriptionCoordinator createCoordinator(String subscription) {
+ return new SubscriptionCoordinator(
+ subscription,
+ topicName,
+ currentLayout,
+ resources,
+ brokerService.getPulsar().getExecutor());
+ }
+
+ private CompletableFuture<Void> electLeader() {
+ // Store the brokerId as the leader-election value — not the raw
pulsar:// URL.
+ // Callers that need a service URL (DagWatchSession for clients, the
REST layer for
+ // HTTP redirection) look up the broker's advertised addresses via
+ // NamespaceService.createLookupResult(brokerId, ...), matching the
pattern used by
+ // the cluster-leader redirection in NamespacesBase.
+ String brokerId = brokerService.getPulsar().getBrokerId();
+ return leaderElection.elect(brokerId)
+ .thenAccept(state -> {
+ this.leaderState = state;
+ log.info("Leader election for scalable topic {}:
state={}", topicName, state);
+ });
+ }
+
+ public boolean isLeader() {
+ return leaderState == LeaderElectionState.Leading;
+ }
+
+ /**
+ * Get the current leader's brokerId (as stored in leader election).
Callers resolve
+ * it to a service URL via
+ * {@link
org.apache.pulsar.broker.namespace.NamespaceService#createLookupResult(String,
+ * boolean, String)}.
+ */
+ public CompletableFuture<Optional<String>> getLeaderBrokerId() {
+ return leaderElection.getLeaderValue();
+ }
+
+ // --- Layout operations (only valid on leader) ---
+
+ public CompletableFuture<SegmentLayout> getLayout() {
+ return CompletableFuture.completedFuture(currentLayout);
+ }
+
+ // --- Consumer management ---
+
+ /**
+ * Register a consumer for a subscription. The controller persists a
durable session
+ * entry and returns the consumer's segment assignment.
+ *
+ * <p>If a session with the same {@code consumerName} already exists (for
example
+ * because the consumer is reconnecting within the grace period), the
existing
+ * assignment is reused and no rebalance occurs.
+ */
+ public CompletableFuture<ConsumerAssignment> registerConsumer(String
subscription,
+ String
consumerName,
+ long
consumerId,
+
TransportCnx cnx) {
+ checkLeader();
+ SubscriptionCoordinator coordinator = subscriptions.computeIfAbsent(
+ subscription, this::createCoordinator);
+ return coordinator.registerConsumer(consumerName, consumerId, cnx)
+ .thenApply(assignments -> {
+ // Look up by name since the key may have been an existing
session
+ return assignments.entrySet().stream()
+ .filter(e ->
consumerName.equals(e.getKey().getConsumerName()))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElse(null);
+ });
+ }
+
+ /**
+ * Explicit unregister: the consumer is leaving the subscription for good.
Deletes the
+ * persisted session entry and rebalances remaining consumers.
+ */
+ public CompletableFuture<Void> unregisterConsumer(String subscription,
String consumerName) {
+ checkLeader();
+ SubscriptionCoordinator coordinator = subscriptions.get(subscription);
+ if (coordinator == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return coordinator.unregisterConsumer(consumerName)
+ .thenAccept(__ -> {
+ if (coordinator.getConsumers().isEmpty()) {
+ subscriptions.remove(subscription);
+ }
+ });
+ }
+
+ /**
+ * Called when a consumer's transport connection drops. Does <em>not</em>
delete the
+ * persisted session — the coordinator marks the consumer disconnected and
starts the
+ * grace-period timer. The consumer can reconnect within the grace period
and resume
+ * with the same segment assignment.
+ */
+ public void onConsumerDisconnect(String subscription, String consumerName)
{
+ SubscriptionCoordinator coordinator = subscriptions.get(subscription);
+ if (coordinator != null) {
+ coordinator.onConsumerDisconnect(consumerName);
+ }
+ }
+
+ // --- Lifecycle ---
+
+ public CompletableFuture<Void> close() {
+ subscriptions.clear();
+ return leaderElection.asyncClose();
+ }
+
+ // --- Internal helpers ---
+
+ private void checkLeader() {
+ if (!isLeader()) {
+ throw new IllegalStateException("This broker is not the leader for
topic: " + topicName);
+ }
+ }
+
+ /**
+ * Create initial segment layout for a new scalable topic.
+ */
+ public static ScalableTopicMetadata createInitialMetadata(int
numInitialSegments,
+ Map<String, String>
properties) {
+ if (numInitialSegments < 1) {
+ throw new IllegalArgumentException("Must have at least 1 segment");
+ }
+
+ int rangeSize = (HashRange.MAX_HASH + 1) / numInitialSegments;
+ Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+
+ for (int i = 0; i < numInitialSegments; i++) {
+ int start = i * rangeSize;
+ int end = (i == numInitialSegments - 1) ? HashRange.MAX_HASH :
(start + rangeSize - 1);
+ HashRange range = HashRange.of(start, end);
+ SegmentInfo segment = SegmentInfo.active(i, range, 0);
+ segments.put((long) i, segment);
+ }
+
+ return ScalableTopicMetadata.builder()
+ .epoch(0)
+ .nextSegmentId(numInitialSegments)
+ .segments(segments)
+ .properties(properties != null ? properties : Map.of())
+ .build();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicLayoutResponse.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicLayoutResponse.java
new file mode 100644
index 00000000000..326a304b277
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicLayoutResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * Response sent to clients containing the full DAG state along with broker
addresses
+ * for each active segment and the controller broker address.
+ *
+ * <p>This is used for both the initial lookup response and subsequent pushed
updates
+ * via the DAG watch session.
+ *
+ * @param epoch the current layout epoch
+ * @param segments full DAG of segments (active + sealed)
+ * @param segmentBrokerAddresses broker service URL for each active segment
(segmentId to URL)
+ * @param segmentBrokerAddressesTls optional TLS broker URL for each active
segment
+ * @param controllerBrokerUrl broker running the ScalableTopicController
+ * @param controllerBrokerUrlTls optional TLS URL for the controller broker
+ */
+public record ScalableTopicLayoutResponse(
+ long epoch,
+ Map<Long, SegmentInfo> segments,
+ Map<Long, String> segmentBrokerAddresses,
+ Map<Long, String> segmentBrokerAddressesTls,
+ String controllerBrokerUrl,
+ String controllerBrokerUrlTls
+) {}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
new file mode 100644
index 00000000000..c151df6e81b
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+
+/**
+ * Central service managing all scalable topics on this broker.
+ *
+ * <p>Lifecycle is tied to {@link BrokerService}. This service handles:
+ * <ul>
+ * <li>Creating and deleting scalable topics</li>
+ * <li>Managing {@link ScalableTopicController} instances for topics this
broker coordinates</li>
+ * <li>Admin operations: split/merge</li>
+ * </ul>
+ */
+@Slf4j
+public class ScalableTopicService {
+
+ private final BrokerService brokerService;
+ private final ScalableTopicResources resources;
+ private final CoordinationService coordinationService;
+
+ /** Active controllers for topics this broker coordinates. */
+ private final ConcurrentHashMap<String, ScalableTopicController>
controllers = new ConcurrentHashMap<>();
+
+ public ScalableTopicService(BrokerService brokerService,
+ ScalableTopicResources resources,
+ CoordinationService coordinationService) {
+ this.brokerService = brokerService;
+ this.resources = resources;
+ this.coordinationService = coordinationService;
+ }
+
+ // --- Lifecycle ---
+
+ public void start() {
+ log.info("ScalableTopicService started");
+ }
+
+ public void close() {
+ log.info("Closing ScalableTopicService, releasing {} controllers",
controllers.size());
+ controllers.values().forEach(controller -> {
+ try {
+ controller.close().join();
+ } catch (Exception e) {
+ log.warn("Error closing controller for topic {}",
controller.getTopicName(), e);
+ }
+ });
+ controllers.clear();
+ }
+
+ // --- Controller management ---
+
+ /**
+ * Get or create a controller for a scalable topic. The controller will
attempt
+ * leader election; only the leader actively coordinates consumers.
+ */
+ public CompletableFuture<ScalableTopicController>
getOrCreateController(TopicName topic) {
+ String key = topic.toString();
+ ScalableTopicController existing = controllers.get(key);
+ if (existing != null) {
+ return CompletableFuture.completedFuture(existing);
+ }
+
+ String lockPath = resources.controllerLockPath(topic);
+ LeaderElection<String> election =
coordinationService.getLeaderElection(
+ String.class, lockPath, state -> onLeaderStateChange(topic,
state));
+
+ ScalableTopicController controller = new ScalableTopicController(
+ topic, resources, brokerService, election);
+ controllers.put(key, controller);
+
+ return controller.initialize()
+ .thenApply(__ -> controller)
+ .exceptionally(ex -> {
+ controllers.remove(key);
+ throw new RuntimeException("Failed to initialize
controller for " + topic, ex);
+ });
+ }
+
+ /**
+ * Release the controller for a topic (e.g., on topic unload).
+ */
+ public CompletableFuture<Void> releaseController(TopicName topic) {
+ ScalableTopicController controller =
controllers.remove(topic.toString());
+ if (controller != null) {
+ return controller.close();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // --- Admin operations ---
+
+ /**
+ * Create a new scalable topic with the given number of initial segments.
+ */
+ public CompletableFuture<Void> createScalableTopic(TopicName topic, int
numInitialSegments) {
+ return createScalableTopic(topic, numInitialSegments, Map.of());
+ }
+
+ public CompletableFuture<Void> createScalableTopic(TopicName topic, int
numInitialSegments,
+ Map<String, String>
properties) {
+ if (topic.getDomain() != TopicDomain.topic) {
+ return CompletableFuture.failedFuture(
+ new IllegalArgumentException("Expected topic domain, got:
" + topic.getDomain()));
+ }
+ if (numInitialSegments < 1) {
+ return CompletableFuture.failedFuture(
+ new IllegalArgumentException("numInitialSegments must be
>= 1"));
+ }
+
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(
+ numInitialSegments, properties);
+
+ return resources.createScalableTopicAsync(topic, metadata)
+ .thenCompose(__ -> {
+ // Create underlying persistent topics for each initial
segment
+ CompletableFuture<?>[] segmentFutures =
metadata.getSegments().values().stream()
+ .map(segment ->
createUnderlyingSegmentTopic(topic, segment))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(segmentFutures);
+ });
+ }
+
+ /**
+ * Delete a scalable topic and all its segment topics.
+ */
+ public CompletableFuture<Void> deleteScalableTopic(TopicName topic) {
+ return releaseController(topic)
+ .thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topic))
+ .thenCompose(optMd -> {
+ if (optMd.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ ScalableTopicMetadata metadata = optMd.get();
+ // Delete all underlying segment topics
+ CompletableFuture<?>[] deleteFutures =
metadata.getSegments().values().stream()
+ .map(segment ->
deleteUnderlyingSegmentTopic(topic, segment))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(deleteFutures);
+ })
+ .thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
+ }
+
+ // --- Internal helpers ---
+
+ private void onLeaderStateChange(TopicName topic, LeaderElectionState
state) {
+ log.info("Leader state change for scalable topic {}: {}", topic,
state);
+ if (state == LeaderElectionState.NoLeader) {
+ // Try to re-elect
+ ScalableTopicController controller =
controllers.get(topic.toString());
+ if (controller != null) {
+ controller.initialize().exceptionally(ex -> {
+ log.warn("Failed to re-elect for topic {}", topic, ex);
+ return null;
+ });
+ }
+ }
+ }
+
+ private CompletableFuture<Void> createUnderlyingSegmentTopic(TopicName
parentTopic, SegmentInfo segment) {
+ TopicName segmentTopic = SegmentTopicName.fromParent(
+ parentTopic, segment.hashRange(), segment.segmentId());
+ String persistentName = toPersistentName(segmentTopic);
+ return brokerService.getOrCreateTopic(persistentName)
+ .thenAccept(t -> log.info("Created segment topic: {}",
persistentName));
+ }
+
+ private CompletableFuture<Void> deleteUnderlyingSegmentTopic(TopicName
parentTopic, SegmentInfo segment) {
+ TopicName segmentTopic = SegmentTopicName.fromParent(
+ parentTopic, segment.hashRange(), segment.segmentId());
+ String persistentName = toPersistentName(segmentTopic);
+ return brokerService.deleteTopic(persistentName, true)
+ .exceptionally(ex -> {
+ log.warn("Failed to delete segment topic {}: {}",
persistentName, ex.getMessage());
+ return null;
+ });
+ }
+
+ /**
+ * Convert a segment:// topic name to persistent:// for the underlying
managed ledger topic.
+ */
+ private String toPersistentName(TopicName segmentTopic) {
+ return "persistent://" + segmentTopic.getTenant() + "/"
+ + segmentTopic.getNamespacePortion() + "/"
+ + segmentTopic.getLocalName();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
new file mode 100644
index 00000000000..5da17726b88
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * In-memory versioned view of a scalable topic's segment DAG.
+ *
+ * <p>This is an immutable snapshot. Mutation methods (split, merge, prune)
return new instances
+ * with the updated state.
+ */
+public class SegmentLayout {
+ @Getter
+ private final long epoch;
+ @Getter
+ private final long nextSegmentId;
+ @Getter
+ private final Map<Long, SegmentInfo> allSegments;
+ @Getter
+ private final Map<Long, SegmentInfo> activeSegments;
+
+ public SegmentLayout(long epoch, long nextSegmentId, Map<Long,
SegmentInfo> allSegments) {
+ this.epoch = epoch;
+ this.nextSegmentId = nextSegmentId;
+ this.allSegments = Collections.unmodifiableMap(new
LinkedHashMap<>(allSegments));
+ this.activeSegments = allSegments.values().stream()
+ .filter(SegmentInfo::isActive)
+ .collect(Collectors.toUnmodifiableMap(SegmentInfo::segmentId,
s -> s));
+ }
+
+ public static SegmentLayout fromMetadata(ScalableTopicMetadata metadata) {
+ return new SegmentLayout(metadata.getEpoch(),
metadata.getNextSegmentId(), metadata.getSegments());
+ }
+
+ /**
+ * Find the active segment whose hash range contains the given hash value.
+ */
+ public SegmentInfo findActiveSegment(int hash) {
+ for (SegmentInfo segment : activeSegments.values()) {
+ if (segment.hashRange().contains(hash)) {
+ return segment;
+ }
+ }
+ throw new IllegalStateException("No active segment covers hash: " +
hash);
+ }
+
+ /**
+ * Get the full lineage chain for a segment (ancestors + descendants).
+ */
+ public List<SegmentInfo> getLineage(long segmentId) {
+ List<SegmentInfo> lineage = new ArrayList<>();
+ collectAncestors(segmentId, lineage);
+ SegmentInfo self = allSegments.get(segmentId);
+ if (self != null) {
+ lineage.add(self);
+ }
+ collectDescendants(segmentId, lineage);
+ return lineage;
+ }
+
+ /**
+ * Get direct children of a segment.
+ */
+ public List<SegmentInfo> getChildren(long segmentId) {
+ SegmentInfo segment = allSegments.get(segmentId);
+ if (segment == null) {
+ return List.of();
+ }
+ return segment.childIds().stream()
+ .map(allSegments::get)
+ .filter(s -> s != null)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Get direct parents of a segment.
+ */
+ public List<SegmentInfo> getParents(long segmentId) {
+ SegmentInfo segment = allSegments.get(segmentId);
+ if (segment == null) {
+ return List.of();
+ }
+ return segment.parentIds().stream()
+ .map(allSegments::get)
+ .filter(s -> s != null)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Produce a new layout by splitting a segment at its midpoint.
+ *
+ * @param segmentId the active segment to split
+ * @return a new SegmentLayout with the split applied
+ */
+ public SegmentLayout splitSegment(long segmentId) {
+ SegmentInfo segment = allSegments.get(segmentId);
+ if (segment == null) {
+ throw new IllegalArgumentException("Segment not found: " +
segmentId);
+ }
+ if (!segment.isActive()) {
+ throw new IllegalArgumentException("Cannot split non-active
segment: " + segmentId);
+ }
+
+ HashRange[] splitRanges = segment.hashRange().split();
+ long newEpoch = epoch + 1;
+ long childId1 = nextSegmentId;
+ long childId2 = nextSegmentId + 1;
+
+ SegmentInfo sealedParent = segment.sealed(newEpoch, List.of(childId1,
childId2));
+ SegmentInfo child1 = SegmentInfo.active(childId1, splitRanges[0],
List.of(segmentId), newEpoch);
+ SegmentInfo child2 = SegmentInfo.active(childId2, splitRanges[1],
List.of(segmentId), newEpoch);
+
+ Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
+ newSegments.put(segmentId, sealedParent);
+ newSegments.put(childId1, child1);
+ newSegments.put(childId2, child2);
+
+ return new SegmentLayout(newEpoch, nextSegmentId + 2, newSegments);
+ }
+
+ /**
+ * Produce a new layout by merging two adjacent active segments.
+ *
+ * @param segmentId1 the first segment (must be active and adjacent to
segmentId2)
+ * @param segmentId2 the second segment (must be active and adjacent to
segmentId1)
+ * @return a new SegmentLayout with the merge applied
+ */
+ public SegmentLayout mergeSegments(long segmentId1, long segmentId2) {
+ SegmentInfo seg1 = allSegments.get(segmentId1);
+ SegmentInfo seg2 = allSegments.get(segmentId2);
+ if (seg1 == null || seg2 == null) {
+ throw new IllegalArgumentException("Segment not found");
+ }
+ if (!seg1.isActive() || !seg2.isActive()) {
+ throw new IllegalArgumentException("Both segments must be active");
+ }
+ if (!seg1.hashRange().isAdjacentTo(seg2.hashRange())) {
+ throw new IllegalArgumentException("Segments are not adjacent: "
+ + seg1.hashRange() + " and " + seg2.hashRange());
+ }
+
+ long newEpoch = epoch + 1;
+ long mergedId = nextSegmentId;
+ HashRange mergedRange = seg1.hashRange().merge(seg2.hashRange());
+
+ SegmentInfo sealed1 = seg1.sealed(newEpoch, List.of(mergedId));
+ SegmentInfo sealed2 = seg2.sealed(newEpoch, List.of(mergedId));
+ SegmentInfo merged = SegmentInfo.active(mergedId, mergedRange,
+ List.of(segmentId1, segmentId2), newEpoch);
+
+ Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
+ newSegments.put(segmentId1, sealed1);
+ newSegments.put(segmentId2, sealed2);
+ newSegments.put(mergedId, merged);
+
+ return new SegmentLayout(newEpoch, nextSegmentId + 1, newSegments);
+ }
+
+ /**
+ * Prune an expired segment from the DAG. The segment must be sealed and
have no
+ * children that are still in the DAG (i.e., children have already been
pruned or
+ * the segment is a leaf that was sealed).
+ *
+ * @param segmentId the segment to prune
+ * @return a new SegmentLayout with the segment removed
+ */
+ public SegmentLayout pruneSegment(long segmentId) {
+ SegmentInfo segment = allSegments.get(segmentId);
+ if (segment == null) {
+ throw new IllegalArgumentException("Segment not found: " +
segmentId);
+ }
+ if (segment.isActive()) {
+ throw new IllegalArgumentException("Cannot prune an active
segment: " + segmentId);
+ }
+
+ Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
+ newSegments.remove(segmentId);
+
+ // Remove this segment from its children's parent lists
+ for (long childId : segment.childIds()) {
+ SegmentInfo child = newSegments.get(childId);
+ if (child != null) {
+ List<Long> newParentIds = child.parentIds().stream()
+ .filter(id -> id != segmentId)
+ .collect(Collectors.toList());
+ newSegments.put(childId, child.withParentIds(newParentIds));
+ }
+ }
+
+ // Remove this segment from its parents' child lists
+ for (long parentId : segment.parentIds()) {
+ SegmentInfo parent = newSegments.get(parentId);
+ if (parent != null) {
+ List<Long> newChildIds = parent.childIds().stream()
+ .filter(id -> id != segmentId)
+ .collect(Collectors.toList());
+ newSegments.put(parentId, parent.withChildIds(newChildIds));
+ }
+ }
+
+ return new SegmentLayout(epoch + 1, nextSegmentId, newSegments);
+ }
+
+ /**
+ * Convert back to metadata for persistence.
+ */
+ public ScalableTopicMetadata toMetadata(Map<String, String> properties) {
+ return ScalableTopicMetadata.builder()
+ .epoch(epoch)
+ .nextSegmentId(nextSegmentId)
+ .segments(new LinkedHashMap<>(allSegments))
+ .properties(properties)
+ .build();
+ }
+
+ private void collectAncestors(long segmentId, List<SegmentInfo> result) {
+ SegmentInfo segment = allSegments.get(segmentId);
+ if (segment == null) {
+ return;
+ }
+ for (long parentId : segment.parentIds()) {
+ collectAncestors(parentId, result);
+ SegmentInfo parent = allSegments.get(parentId);
+ if (parent != null) {
+ result.add(parent);
+ }
+ }
+ }
+
+ private void collectDescendants(long segmentId, List<SegmentInfo> result) {
+ SegmentInfo segment = allSegments.get(segmentId);
+ if (segment == null) {
+ return;
+ }
+ for (long childId : segment.childIds()) {
+ SegmentInfo child = allSegments.get(childId);
+ if (child != null) {
+ result.add(child);
+ }
+ collectDescendants(childId, result);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
new file mode 100644
index 00000000000..ff55b7854ed
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+
+/**
+ * Manages segment-to-consumer assignments within a single subscription of a
scalable topic.
+ *
+ * <p>Consumer sessions are persisted in the metadata store (as
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) and
tracked in-memory
+ * as {@link ConsumerSession} objects. The distinction is important: the
session <em>itself</em>
+ * is durable (survives TCP disconnects, client restarts, and controller
leader failovers),
+ * but the keep-alive tracking (connected / grace-period timer) is in-memory
only.
+ *
+ * <p>When a consumer's connection drops, the coordinator does <em>not</em>
immediately evict
+ * it. Instead it marks the session disconnected and starts a grace-period
timer. If the
+ * consumer reconnects (with the same {@code consumerName}) before the timer
fires, its
+ * existing assignment is restored with no rebalance. If the timer fires, the
persisted
+ * registration is deleted and a rebalance is triggered.
+ *
+ * <p>On controller leader failover, the new leader reloads persisted
registrations via
+ * {@link #restoreConsumers(Collection)}, which installs them in a "just
disconnected" state
+ * with fresh grace-period timers — giving every consumer the full window to
reconnect to the
+ * new leader regardless of how long they had been disconnected under the old
one.
+ */
+public class SubscriptionCoordinator {
+
+ private static final Logger LOG =
Logger.get(SubscriptionCoordinator.class);
+ private final Logger log;
+
+ // TODO: make configurable via broker config (e.g.
scalableTopicConsumerSessionTimeoutSeconds)
+ private static final Duration DEFAULT_GRACE_PERIOD =
Duration.ofSeconds(60);
+
+ @Getter
+ private final String subscriptionName;
+ private final TopicName topicName;
+ private final ScalableTopicResources resources;
+ private final ScheduledExecutorService scheduler;
+ private final Duration gracePeriod;
+
+ /** Keyed by consumerName — the stable session identity. */
+ private final Map<String, ConsumerSession> sessions = new
ConcurrentHashMap<>();
+ private Map<Long, ConsumerSession> segmentAssignments = new
LinkedHashMap<>();
+ private SegmentLayout currentLayout;
+
+ public SubscriptionCoordinator(String subscriptionName,
+ TopicName topicName,
+ SegmentLayout initialLayout,
+ ScalableTopicResources resources,
+ ScheduledExecutorService scheduler) {
+ this(subscriptionName, topicName, initialLayout, resources, scheduler,
DEFAULT_GRACE_PERIOD);
+ }
+
+ public SubscriptionCoordinator(String subscriptionName,
+ TopicName topicName,
+ SegmentLayout initialLayout,
+ ScalableTopicResources resources,
+ ScheduledExecutorService scheduler,
+ Duration gracePeriod) {
+ this.subscriptionName = subscriptionName;
+ this.topicName = topicName;
+ this.currentLayout = initialLayout;
+ this.resources = resources;
+ this.scheduler = scheduler;
+ this.gracePeriod = gracePeriod;
+ this.log = LOG.with().attr("topic", topicName).attr("subscription",
subscriptionName).build();
+ }
+
+ // --- Register / unregister / reconnect ---
+
+ /**
+ * Register a consumer — either a fresh registration or a reconnect of an
existing
+ * session. If the {@code consumerName} already has a persisted session,
its assignment
+ * is preserved and the new connection is attached; otherwise the
registration is
+ * persisted and a rebalance is triggered.
+ *
+ * @return assignment map for all consumers (unchanged on reconnect,
recomputed on fresh register)
+ */
+ public synchronized CompletableFuture<Map<ConsumerSession,
ConsumerAssignment>> registerConsumer(
+ String consumerName, long consumerId, TransportCnx cnx) {
+ ConsumerSession existing = sessions.get(consumerName);
+ if (existing != null) {
+ // Reconnect: attach the new connection, cancel any grace timer,
and push the
+ // current assignment without rebalancing other consumers.
+ existing.attach(consumerId, cnx);
+ Map<ConsumerSession, ConsumerAssignment> current =
+ computeAssignment(currentLayout, sessions.values());
+ ConsumerAssignment assignment = current.get(existing);
+ if (assignment != null) {
+ existing.sendAssignmentUpdate(assignment);
+ }
+ return CompletableFuture.completedFuture(current);
+ }
+
+ // Fresh registration — persist first, then install in-memory and
rebalance.
+ ConsumerSession session = newSession(consumerName, consumerId, cnx);
+ return resources.registerConsumerAsync(topicName, subscriptionName,
consumerName)
+ .thenApply(__ -> {
+ synchronized (this) {
+ sessions.put(consumerName, session);
+ return rebalanceAndNotify();
+ }
+ });
+ }
+
+ /**
+ * Explicit unregister (consumer asked to leave the subscription). Cancels
any pending
+ * grace timer, deletes the persisted registration, and rebalances.
+ */
+ public synchronized CompletableFuture<Map<ConsumerSession,
ConsumerAssignment>> unregisterConsumer(
+ String consumerName) {
+ ConsumerSession removed = sessions.remove(consumerName);
+ if (removed == null) {
+ return CompletableFuture.completedFuture(snapshotAssignments());
+ }
+ removed.cancelGraceTimer();
+ return resources.unregisterConsumerAsync(topicName, subscriptionName,
consumerName)
+ .thenApply(__ -> {
+ synchronized (this) {
+ if (sessions.isEmpty()) {
+ segmentAssignments.clear();
+ return Map.of();
+ }
+ return rebalanceAndNotify();
+ }
+ });
+ }
+
+ /**
+ * Called when a consumer's transport connection drops (not an explicit
unregister).
+ * Marks the session disconnected and schedules an eviction task after the
grace period.
+ * If the consumer reconnects with the same name before the timer fires,
the timer is
+ * cancelled and no rebalance happens.
+ */
+ public synchronized void onConsumerDisconnect(String consumerName) {
+ ConsumerSession session = sessions.get(consumerName);
+ if (session == null || !session.isConnected()) {
+ return;
+ }
+ session.markDisconnected();
+ }
+
+ /**
+ * Restore consumer sessions loaded from the metadata store on controller
leader failover.
+ * All restored sessions start in the "just disconnected" state with a
fresh grace-period
+ * timer — consumers reconnecting within that window resume with the same
assignment.
+ */
+ public synchronized Map<ConsumerSession, ConsumerAssignment>
restoreConsumers(
+ Collection<String> persistedConsumerNames) {
+ for (String name : persistedConsumerNames) {
+ if (sessions.containsKey(name)) {
+ continue;
+ }
+ // restored() arms the grace timer internally. The eviction
callback takes the
+ // coordinator's monitor, which we hold here, so ordering against
the upcoming
+ // sessions.put is guaranteed.
+ ConsumerSession session = ConsumerSession.restored(name,
gracePeriod, scheduler,
+ () -> evictExpiredConsumer(name), log);
+ sessions.put(name, session);
+ }
+ // Compute the deterministic assignment against the current layout. No
sends: the
+ // consumers aren't connected yet. They will receive their assignment
on reconnect.
+ Map<ConsumerSession, ConsumerAssignment> result =
+ computeAssignment(currentLayout, sessions.values());
+ updateSegmentAssignmentIndex(result);
+ return result;
+ }
+
+ /**
+ * Handle a layout change (split/merge). Recompute and push assignments to
connected
+ * consumers.
+ */
+ public synchronized CompletableFuture<Map<ConsumerSession,
ConsumerAssignment>> onLayoutChange(
+ SegmentLayout newLayout) {
+ this.currentLayout = newLayout;
+ if (sessions.isEmpty()) {
+ segmentAssignments.clear();
+ return CompletableFuture.completedFuture(Map.of());
+ }
+ return CompletableFuture.completedFuture(rebalanceAndNotify());
+ }
+
+ // --- Accessors ---
+
+ public synchronized Set<ConsumerSession> getConsumers() {
+ return Set.copyOf(sessions.values());
+ }
+
+ // --- Internals ---
+
+ /**
+ * Build a new {@link ConsumerSession} wired with this coordinator's grace
period,
+ * scheduler, logger context, and eviction callback.
+ */
+ private ConsumerSession newSession(String consumerName, long consumerId,
TransportCnx cnx) {
+ return new ConsumerSession(consumerName, consumerId, cnx, gracePeriod,
scheduler,
+ () -> evictExpiredConsumer(consumerName), log);
+ }
+
+ /**
+ * Evict a consumer whose grace-period timer has fired. Runs on the
scheduler thread.
+ */
+ private void evictExpiredConsumer(String consumerName) {
+ synchronized (this) {
+ ConsumerSession session = sessions.get(consumerName);
+ if (session == null) {
+ return;
+ }
+ if (session.isConnected()) {
+ // Raced with a reconnect — abort the eviction.
+ return;
+ }
+ sessions.remove(consumerName);
+ log.info().attr("consumer", consumerName)
+ .log("Consumer evicted after grace period");
+ }
+ // Delete persisted registration outside the lock (async) and then
rebalance.
+ resources.unregisterConsumerAsync(topicName, subscriptionName,
consumerName)
+ .exceptionally(ex -> {
+ log.warn().attr("consumer", consumerName).exception(ex)
+ .log("Failed to delete persisted registration");
+ return null;
+ })
+ .thenRun(() -> {
+ synchronized (this) {
+ if (!sessions.isEmpty()) {
+ rebalanceAndNotify();
+ } else {
+ segmentAssignments.clear();
+ }
+ }
+ });
+ }
+
+ /**
+ * Compute a balanced assignment of active segments to consumers.
+ *
+ * <p>Strategy: sort segments by hash range start, sort consumers by name,
then
+ * round-robin. Deterministic: the same inputs always produce the same
output, so a new
+ * leader recomputing assignments after failover gets the same result as
the old leader.
+ */
+ Map<ConsumerSession, ConsumerAssignment> computeAssignment(
+ SegmentLayout layout, Collection<ConsumerSession> consumers) {
+
+ if (consumers.isEmpty()) {
+ return Map.of();
+ }
+
+ List<SegmentInfo> sortedSegments =
layout.getActiveSegments().values().stream()
+ .sorted(Comparator.comparing(SegmentInfo::hashRange))
+ .toList();
+
+ List<ConsumerSession> sortedConsumers = consumers.stream()
+ .sorted(Comparator.comparing(ConsumerSession::getConsumerName))
+ .toList();
+
+ Map<ConsumerSession, List<ConsumerAssignment.AssignedSegment>>
assignmentLists =
+ new LinkedHashMap<>();
+ for (ConsumerSession consumer : sortedConsumers) {
+ assignmentLists.put(consumer, new ArrayList<>());
+ }
+
+ int consumerIndex = 0;
+ for (SegmentInfo segment : sortedSegments) {
+ ConsumerSession consumer = sortedConsumers.get(consumerIndex %
sortedConsumers.size());
+ TopicName segmentTopic = SegmentTopicName.fromParent(topicName,
segment.hashRange(),
+ segment.segmentId());
+ assignmentLists.get(consumer).add(new
ConsumerAssignment.AssignedSegment(
+ segment.segmentId(),
+ segment.hashRange(),
+ segmentTopic.toString()
+ ));
+ consumerIndex++;
+ }
+
+ Map<ConsumerSession, ConsumerAssignment> result = new
LinkedHashMap<>();
+ for (var entry : assignmentLists.entrySet()) {
+ result.put(entry.getKey(), new ConsumerAssignment(
+ layout.getEpoch(), entry.getValue()));
+ }
+ return result;
+ }
+
+ private Map<ConsumerSession, ConsumerAssignment> rebalanceAndNotify() {
+ Map<ConsumerSession, ConsumerAssignment> assignments =
+ computeAssignment(currentLayout, sessions.values());
+ updateSegmentAssignmentIndex(assignments);
+
+ for (var entry : assignments.entrySet()) {
+ entry.getKey().sendAssignmentUpdate(entry.getValue());
+ }
+
+ return assignments;
+ }
+
+ private void updateSegmentAssignmentIndex(Map<ConsumerSession,
ConsumerAssignment> assignments) {
+ segmentAssignments.clear();
+ for (var entry : assignments.entrySet()) {
+ for (ConsumerAssignment.AssignedSegment seg :
entry.getValue().assignedSegments()) {
+ segmentAssignments.put(seg.segmentId(), entry.getKey());
+ }
+ }
+ }
+
+ private Map<ConsumerSession, ConsumerAssignment> snapshotAssignments() {
+ return computeAssignment(currentLayout, sessions.values());
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/package-info.java
new file mode 100644
index 00000000000..56c0050adc8
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
new file mode 100644
index 00000000000..e25066e4f1d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.testng.annotations.Test;
+
+public class SegmentLayoutTest {
+
+ @Test
+ public void testInitialLayout() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ assertEquals(layout.getEpoch(), 0);
+ assertEquals(layout.getActiveSegments().size(), 2);
+ assertEquals(layout.getAllSegments().size(), 2);
+
+ SegmentInfo seg0 = layout.getAllSegments().get(0L);
+ SegmentInfo seg1 = layout.getAllSegments().get(1L);
+ assertNotNull(seg0);
+ assertNotNull(seg1);
+
+ assertEquals(seg0.hashRange(), HashRange.of(0x0000, 0x7FFF));
+ assertEquals(seg1.hashRange(), HashRange.of(0x8000, 0xFFFF));
+ assertTrue(seg0.isActive());
+ assertTrue(seg1.isActive());
+ }
+
+ @Test
+ public void testSingleSegmentInitialLayout() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ assertEquals(layout.getActiveSegments().size(), 1);
+ SegmentInfo seg = layout.getAllSegments().get(0L);
+ assertEquals(seg.hashRange(), HashRange.full());
+ }
+
+ @Test
+ public void testFourSegmentInitialLayout() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(4, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ assertEquals(layout.getActiveSegments().size(), 4);
+ // Verify ranges cover the full space without gaps
+ int expectedStart = 0;
+ for (int i = 0; i < 4; i++) {
+ SegmentInfo seg = layout.getAllSegments().get((long) i);
+ assertEquals(seg.hashRange().start(), expectedStart);
+ expectedStart = seg.hashRange().end() + 1;
+ }
+ // Last segment should end at 0xFFFF
+ assertEquals(layout.getAllSegments().get(3L).hashRange().end(),
0xFFFF);
+ }
+
+ @Test
+ public void testFindActiveSegment() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ SegmentInfo found = layout.findActiveSegment(0x1000);
+ assertEquals(found.segmentId(), 0);
+
+ found = layout.findActiveSegment(0x9000);
+ assertEquals(found.segmentId(), 1);
+ }
+
+ @Test
+ public void testSplitSegment() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ assertEquals(afterSplit.getEpoch(), 1);
+ assertEquals(afterSplit.getActiveSegments().size(), 3); // 1 original
+ 2 new
+ assertEquals(afterSplit.getAllSegments().size(), 4); // 2 original + 2
new
+
+ // Original segment should be sealed
+ SegmentInfo sealed = afterSplit.getAllSegments().get(0L);
+ assertTrue(sealed.isSealed());
+ assertEquals(sealed.childIds(), List.of(2L, 3L));
+
+ // New segments should be active
+ SegmentInfo child1 = afterSplit.getAllSegments().get(2L);
+ SegmentInfo child2 = afterSplit.getAllSegments().get(3L);
+ assertTrue(child1.isActive());
+ assertTrue(child2.isActive());
+ assertEquals(child1.parentIds(), List.of(0L));
+ assertEquals(child2.parentIds(), List.of(0L));
+
+ // Ranges should cover the original range
+ assertEquals(child1.hashRange(), HashRange.of(0x0000, 0x3FFF));
+ assertEquals(child2.hashRange(), HashRange.of(0x4000, 0x7FFF));
+ }
+
+ @Test
+ public void testSplitNonActiveSegment() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ // Segment 0 is now sealed, cannot split again
+ assertThrows(IllegalArgumentException.class, () ->
afterSplit.splitSegment(0));
+ }
+
+ @Test
+ public void testMergeSegments() {
+ // Start with 2 segments, split seg-0, then merge the children back
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0); // seg-2
[0000-3fff], seg-3 [4000-7fff]
+
+ SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3);
+
+ assertEquals(afterMerge.getEpoch(), 2);
+ assertEquals(afterMerge.getActiveSegments().size(), 2); // merged +
seg-1
+
+ // Children should be sealed
+ assertTrue(afterMerge.getAllSegments().get(2L).isSealed());
+ assertTrue(afterMerge.getAllSegments().get(3L).isSealed());
+
+ // Merged segment should be active with original range
+ SegmentInfo merged = afterMerge.getAllSegments().get(4L);
+ assertTrue(merged.isActive());
+ assertEquals(merged.hashRange(), HashRange.of(0x0000, 0x7FFF));
+ assertEquals(merged.parentIds(), List.of(2L, 3L));
+ }
+
+ @Test
+ public void testMergeNonAdjacentSegments() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(4, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ // Segments 0 and 2 are not adjacent
+ assertThrows(IllegalArgumentException.class, () ->
layout.mergeSegments(0, 2));
+ }
+
+ @Test
+ public void testPruneSegment() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ // Prune sealed segment 0
+ SegmentLayout afterPrune = afterSplit.pruneSegment(0);
+
+ assertFalse(afterPrune.getAllSegments().containsKey(0L));
+ // Children should have empty parent lists now
+ assertEquals(afterPrune.getAllSegments().get(2L).parentIds(),
List.of());
+ assertEquals(afterPrune.getAllSegments().get(3L).parentIds(),
List.of());
+ }
+
+ @Test
+ public void testCannotPruneActiveSegment() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ assertThrows(IllegalArgumentException.class, () ->
layout.pruneSegment(0));
+ }
+
+ @Test
+ public void testGetChildren() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ List<SegmentInfo> children = afterSplit.getChildren(0);
+ assertEquals(children.size(), 2);
+ assertEquals(children.get(0).segmentId(), 1);
+ assertEquals(children.get(1).segmentId(), 2);
+ }
+
+ @Test
+ public void testGetParents() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ List<SegmentInfo> parents = afterSplit.getParents(1);
+ assertEquals(parents.size(), 1);
+ assertEquals(parents.get(0).segmentId(), 0);
+
+ // Root has no parents
+ assertEquals(afterSplit.getParents(0).size(), 0);
+ }
+
+ @Test
+ public void testGetLineage() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ List<SegmentInfo> lineage = afterSplit.getLineage(0);
+ // Lineage of seg-0: itself + its two children
+ assertEquals(lineage.size(), 3);
+ }
+
+ @Test
+ public void testToMetadata() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of("key", "value"));
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ SegmentLayout afterSplit = layout.splitSegment(0);
+
+ ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key",
"value"));
+ assertEquals(restored.getEpoch(), 1);
+ assertEquals(restored.getNextSegmentId(), 4);
+ assertEquals(restored.getSegments().size(), 4);
+ assertEquals(restored.getProperties().get("key"), "value");
+ }
+
+ @Test
+ public void testNextSegmentIdAdvances() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+ assertEquals(layout.getNextSegmentId(), 2);
+
+ SegmentLayout split1 = layout.splitSegment(0);
+ assertEquals(split1.getNextSegmentId(), 4);
+
+ SegmentLayout split2 = split1.splitSegment(1);
+ assertEquals(split2.getNextSegmentId(), 6);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
new file mode 100644
index 00000000000..4de8ddb0a43
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SubscriptionCoordinatorTest {
+
+ private TopicName topicName;
+ private SegmentLayout initialLayout;
+ private ScalableTopicResources resources;
+ private ScheduledExecutorService scheduler;
+ private SubscriptionCoordinator coordinator;
+
+ @BeforeMethod
+ public void setup() {
+ topicName = TopicName.get("topic://tenant/ns/my-topic");
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(4, Map.of());
+ initialLayout = SegmentLayout.fromMetadata(metadata);
+ resources = mock(ScalableTopicResources.class);
+ // All persistence ops succeed
+ when(resources.registerConsumerAsync(any(), anyString(), anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(resources.unregisterConsumerAsync(any(), anyString(),
anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ coordinator = new SubscriptionCoordinator("test-sub", topicName,
initialLayout,
+ resources, scheduler, Duration.ofMillis(200));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testSingleConsumerGetsAllSegments() throws Exception {
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+
+ assertEquals(result.size(), 1);
+ ConsumerAssignment assignment = findByName(result, "consumer-1");
+ assertNotNull(assignment);
+ assertEquals(assignment.assignedSegments().size(), 4);
+ assertEquals(assignment.layoutEpoch(), 0);
+ }
+
+ @Test
+ public void testTwoConsumersBalanced() throws Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.registerConsumer("consumer-2", 2L,
mock(TransportCnx.class)).get();
+
+ assertEquals(result.size(), 2);
+ assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 2);
+ assertEquals(findByName(result,
"consumer-2").assignedSegments().size(), 2);
+ }
+
+ @Test
+ public void testThreeConsumersWithFourSegments() throws Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ coordinator.registerConsumer("consumer-2", 2L,
mock(TransportCnx.class)).get();
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.registerConsumer("consumer-3", 3L,
mock(TransportCnx.class)).get();
+
+ assertEquals(result.size(), 3);
+ int total = result.values().stream()
+ .mapToInt(a -> a.assignedSegments().size())
+ .sum();
+ assertEquals(total, 4);
+
+ Set<Long> assignedIds = new HashSet<>();
+ for (ConsumerAssignment assignment : result.values()) {
+ for (ConsumerAssignment.AssignedSegment seg :
assignment.assignedSegments()) {
+ assertTrue(assignedIds.add(seg.segmentId()),
+ "Segment " + seg.segmentId() + " assigned to multiple
consumers");
+ }
+ }
+ assertEquals(assignedIds.size(), 4);
+ }
+
+ @Test
+ public void testUnregisterConsumerRebalances() throws Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ coordinator.registerConsumer("consumer-2", 2L,
mock(TransportCnx.class)).get();
+
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.unregisterConsumer("consumer-2").get();
+
+ assertEquals(result.size(), 1);
+ assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 4);
+ }
+
+ @Test
+ public void testLayoutChangeRebalances() throws Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+
+ SegmentLayout newLayout = initialLayout.splitSegment(0);
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.onLayoutChange(newLayout).get();
+
+ assertEquals(result.size(), 1);
+ assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 5);
+ }
+
+ @Test
+ public void testEmptyAfterAllConsumersRemoved() throws Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.unregisterConsumer("consumer-1").get();
+
+ assertTrue(result.isEmpty());
+ assertTrue(coordinator.getConsumers().isEmpty());
+ }
+
+ @Test
+ public void testAssignmentDeterminism() throws Exception {
+ coordinator.registerConsumer("consumer-a", 1L,
mock(TransportCnx.class)).get();
+ Map<ConsumerSession, ConsumerAssignment> result1 =
+ coordinator.registerConsumer("consumer-b", 2L,
mock(TransportCnx.class)).get();
+
+ SubscriptionCoordinator coordinator2 = new
SubscriptionCoordinator("test-sub", topicName,
+ initialLayout, resources, scheduler, Duration.ofMillis(200));
+ coordinator2.registerConsumer("consumer-a", 1L,
mock(TransportCnx.class)).get();
+ Map<ConsumerSession, ConsumerAssignment> result2 =
+ coordinator2.registerConsumer("consumer-b", 2L,
mock(TransportCnx.class)).get();
+
+ assertEquals(segmentIds(findByName(result1, "consumer-a")),
+ segmentIds(findByName(result2, "consumer-a")));
+ assertEquals(segmentIds(findByName(result1, "consumer-b")),
+ segmentIds(findByName(result2, "consumer-b")));
+ }
+
+ // --- Session lifecycle tests ---
+
+ @Test
+ public void testReconnectWithinGracePeriodPreservesAssignment() throws
Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ coordinator.registerConsumer("consumer-2", 2L,
mock(TransportCnx.class)).get();
+
+ // consumer-1 drops
+ coordinator.onConsumerDisconnect("consumer-1");
+ assertEquals(coordinator.getConsumers().size(), 2,
+ "session should still be tracked during grace period");
+
+ // Reconnect within the grace period with a new connection
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.registerConsumer("consumer-1", 99L,
mock(TransportCnx.class)).get();
+
+ assertEquals(result.size(), 2);
+ assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 2);
+ assertEquals(findByName(result,
"consumer-2").assignedSegments().size(), 2);
+
+ // The session was not recreated — still 2 consumers, and consumer-1
is reconnected
+ ConsumerSession reconnected = coordinator.getConsumers().stream()
+ .filter(s -> s.getConsumerName().equals("consumer-1"))
+ .findFirst().orElseThrow();
+ assertTrue(reconnected.isConnected());
+ assertEquals(reconnected.getConsumerId(), 99L);
+ }
+
+ @Test
+ public void testExpiredSessionIsEvictedAfterGracePeriod() {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).join();
+ coordinator.registerConsumer("consumer-2", 2L,
mock(TransportCnx.class)).join();
+ assertEquals(coordinator.getConsumers().size(), 2);
+
+ coordinator.onConsumerDisconnect("consumer-1");
+
+ // Grace period is 200ms in tests — wait for eviction
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ Set<ConsumerSession> active = coordinator.getConsumers();
+ assertEquals(active.size(), 1);
+ assertEquals(active.iterator().next().getConsumerName(),
"consumer-2");
+ });
+ }
+
+ @Test
+ public void testRestoreConsumersInstallsDisconnectedSessions() {
+ Map<ConsumerSession, ConsumerAssignment> assignments =
+ coordinator.restoreConsumers(List.of("consumer-1",
"consumer-2"));
+
+ assertEquals(assignments.size(), 2);
+ assertEquals(coordinator.getConsumers().size(), 2);
+ // Restored sessions start in the disconnected state
+ for (ConsumerSession session : coordinator.getConsumers()) {
+ assertFalse(session.isConnected());
+ assertNotNull(session.getGraceTimer());
+ }
+ }
+
+ @Test
+ public void testRestoredConsumerResumesAssignmentOnReconnect() throws
Exception {
+ coordinator.restoreConsumers(List.of("consumer-1", "consumer-2"));
+
+ // Reconnect consumer-1 — it should find its existing session and
reuse the assignment
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.registerConsumer("consumer-1", 42L,
mock(TransportCnx.class)).get();
+
+ assertEquals(result.size(), 2);
+ assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 2);
+
+ ConsumerSession reconnected = coordinator.getConsumers().stream()
+ .filter(s -> s.getConsumerName().equals("consumer-1"))
+ .findFirst().orElseThrow();
+ assertTrue(reconnected.isConnected());
+ }
+
+ // --- Helpers ---
+
+ private static ConsumerAssignment findByName(Map<ConsumerSession,
ConsumerAssignment> m, String name) {
+ return m.entrySet().stream()
+ .filter(e -> name.equals(e.getKey().getConsumerName()))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElse(null);
+ }
+
+ private static List<Long> segmentIds(ConsumerAssignment assignment) {
+ return assignment.assignedSegments().stream()
+ .map(ConsumerAssignment.AssignedSegment::segmentId)
+ .toList();
+ }
+}