This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c6c9032bd6f [feat] PIP-468: Scalable Topic Controller (#25516)
c6c9032bd6f is described below
commit c6c9032bd6f7e240eae44c6666c93226a43ef88c
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 21 09:55:37 2026 -0700
[feat] PIP-468: Scalable Topic Controller (#25516)
---
pip/pip-468.md | 615 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 615 insertions(+)
diff --git a/pip/pip-468.md b/pip/pip-468.md
new file mode 100644
index 00000000000..0ffc1eaba15
--- /dev/null
+++ b/pip/pip-468.md
@@ -0,0 +1,615 @@
+# PIP-468: Scalable Topic Controller
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Motivation
+
+[PIP-460](pip-460.md) introduces scalable topics as a new topic type in
Pulsar, built on a DAG of range segments that can be split and merged to scale
independently of the initial configuration. PIP-460 defines the overall vision
but defers the detailed design of each component to dedicated sub-PIPs.
+
+This PIP specifies the **Scalable Topic Controller** — the broker-side
component responsible for:
+
+1. **Segment lifecycle management** — creating, terminating, and deleting the
underlying segment topics that back a scalable topic.
+2. **Segment layout coordination** — executing split and merge operations with
correct ordering guarantees so that no messages are lost and all subscription
cursors exist before producers are redirected.
+3. **Consumer assignment** — coordinating which consumers receive messages
from which segments for a given subscription.
+4. **Leader election** — ensuring exactly one broker acts as the controller
for each scalable topic, with automatic failover.
+5. **DAG watch sessions** — pushing topology updates to connected clients
(producers and consumers) when the segment layout changes.
+
+Without a well-defined controller, split and merge operations would be unsafe:
producers could write to segments that have no subscription cursors, consumers
could miss messages during topology changes, and concurrent layout
modifications could corrupt the segment DAG.
+
+---
+
+## Design
+
+### Architecture Overview
+
+The controller subsystem consists of four layers:
+
+```
+┌──────────────────────────────────────────────────────────┐
+│ Admin API Layer │
+│ ScalableTopics REST + Segments REST │
+└──────────────┬───────────────────────┬───────────────────┘
+ │ │
+┌──────────────▼───────────────────────▼───────────────────┐
+│ ScalableTopicService │
+│ Per-broker singleton; manages controllers │
+└──────────────────────────┬───────────────────────────────┘
+ │
+┌──────────────────────────▼───────────────────────────────┐
+│ ScalableTopicController (per topic) │
+│ Leader-elected; owns layout, split/merge, consumers │
+│ │
+│ ┌─────────────────┐ ┌───────────────────────────────┐ │
+│ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │
+│ │ (immutable DAG) │ │ per-subscription assignments │ │
+│ └─────────────────┘ └───────────────────────────────┘ │
+└──────────────────────────┬───────────────────────────────┘
+ │
+┌──────────────────────────▼───────────────────────────────┐
+│ ScalableTopicResources │
+│ Metadata store access (read/write/watch) │
+└──────────────────────────────────────────────────────────┘
+```
+
+### Broker Roles and Request Routing
+
+A scalable topic involves three distinct broker roles. Understanding which
broker handles which operation is critical to the design.
+
+#### Any broker — DAG watch sessions
+
+A **DAG watch session** (scalable topic lookup) can be served by **any
broker** in the cluster. The session reads segment metadata from the shared
metadata store and registers a watch for changes. No state is held on the
broker beyond the watch registration. This means producers and consumers can
connect to any broker for topic discovery — the same way regular topic lookups
work today.
+
+When the metadata changes (due to a split or merge performed by the controller
leader), the metadata store notification fires on whichever broker is serving
the watch session, and that broker pushes the updated DAG to the client.
+
+#### Controller leader — layout mutations and consumer assignment
+
+Each scalable topic has exactly one **controller leader** across the cluster,
elected via the metadata store. The controller leader is the only broker that
can:
+
+- Execute **split** and **merge** operations (the multi-step protocols
described below).
+- Accept **consumer registrations** and compute segment-to-consumer
assignments.
+- **Notify consumers** of assignment changes after topology updates.
+
+Clients discover the controller leader's broker URL through the DAG watch
session response (`controller_broker_url` field). StreamConsumer and
CheckpointConsumer clients then connect directly to the controller leader to
register and receive assignments.
+
+Admin API requests for split and merge are routed to the
`ScalableTopicService` on any broker, which obtains (or creates) a local
`ScalableTopicController` that participates in leader election. If the local
controller is the leader, it executes the operation directly. If not, the
request must be redirected to the leader (or the controller must first win the
election). The split and merge admin endpoints do not require the request to
land on the leader broker — the `ScalableTopicService` [...]
+
+#### Segment-owning brokers — produce and consume
+
+Individual **segment topics** (`segment://`) are regular persistent topics
from the broker's perspective. They are distributed across the cluster by the
standard Pulsar **load manager** and namespace bundle assignment — the same
mechanism used for `persistent://` topics. Each segment topic is owned by
exactly one broker at any time, determined by its namespace bundle hash.
+
+The controller leader does not need to own any of the segment topics. When the
controller needs to operate on a segment (create, terminate, delete, discover
subscriptions), it uses the **Segments admin API**, which routes the request to
whichever broker currently owns that segment's namespace bundle. This
decoupling means the controller can coordinate segments spread across many
brokers without requiring co-location.
+
+```
+┌─────────────┐ ┌─────────────┐ ┌─────────────┐
+│ Broker A │ │ Broker B │ │ Broker C │
+│ │ │ (controller │ │ │
+│ segment-0 │ │ leader) │ │ segment-2 │
+│ segment-1 │ │ │ │ │
+│ │ │ DAG watch │ │ DAG watch │
+│ DAG watch │ │ sessions │ │ sessions │
+│ sessions │ │ │ │ │
+└─────────────┘ └─────────────┘ └─────────────┘
+ │
+ Split/merge ops
+ use admin API to
+ reach segments on
+ Broker A and C
+```
+
+In this example, Broker B is the controller leader but owns no segments. When
it executes a split of segment-0, it calls the Segments admin API which routes
to Broker A (the owner of segment-0).
+
+### Components
+
+#### ScalableTopicService
+
+A per-broker singleton, created and owned by `BrokerService`. It manages the
lifecycle of all `ScalableTopicController` instances on the broker.
+
+**Responsibilities:**
+
+- **Controller management**: maintains a map of active controllers keyed by
topic name. Creates controllers on demand via `getOrCreateController()` and
releases them on topic unload or broker shutdown.
+- **Topic lifecycle**: handles `createScalableTopic()` and
`deleteScalableTopic()` admin operations, including creating/deleting the
underlying segment topics via the Segments admin API.
+- **Delegation**: routes `splitSegment()` and `mergeSegments()` requests to
the appropriate controller.
+- **Leader state monitoring**: listens for leader election state changes and
re-attempts election when the current leader is lost.
+
+#### ScalableTopicController
+
+A per-topic coordinator. Only one instance across the cluster is the
**leader** for a given scalable topic, ensured by leader election through the
metadata store. The leader is the only instance that modifies the segment
layout or assigns consumers.
+
+**State:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `topicName` | `TopicName` | The `topic://` name of the scalable topic |
+| `currentLayout` | `SegmentLayout` | The current immutable snapshot of the
segment DAG |
+| `subscriptions` | `Map<String, SubscriptionCoordinator>` | Per-subscription
consumer coordinators |
+| `leaderState` | `LeaderElectionState` | Current leader election state |
+| `leaderElection` | `LeaderElection<String>` | Metadata store leader election
handle; value is the broker URL |
+
+**Key operations:**
+
+- `initialize()` — loads the current metadata from the store, then attempts
leader election. The elected leader stores its broker service URL so that
clients can discover and connect to it.
+- `splitSegment(segmentId)` — splits an active segment at its midpoint (see
[Split Protocol](#split-protocol)).
+- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active
segments (see [Merge Protocol](#merge-protocol)).
+- `registerConsumer(subscription, consumer)` — registers a consumer and
returns its initial segment assignment.
+- `unregisterConsumer(subscription, consumer)` — removes a consumer and
triggers rebalancing.
+- `getLeaderBrokerUrl()` — returns the current leader's broker URL, used by
clients to connect to the controller.
+
+#### SegmentLayout
+
+An immutable, versioned snapshot of the segment DAG. All mutation methods
(`splitSegment`, `mergeSegments`, `pruneSegment`) return a **new**
`SegmentLayout` instance — the original is never modified. This ensures safe
concurrent reads without locking.
+
+**Fields:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `long` | Monotonically increasing version; incremented on every
layout change |
+| `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
+| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all segments
(active + sealed) |
+| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only `ACTIVE`
segments |
+
+**Key operations:**
+
+- `findActiveSegment(hash)` — returns the active segment whose hash range
contains the given hash value. Used by producers for message routing.
+- `splitSegment(segmentId)` — validates the segment is active, computes the
midpoint of its hash range, creates two child `SegmentInfo` records covering
`[start, mid]` and `[mid+1, end]`, seals the parent, and returns a new layout
with incremented epoch.
+- `mergeSegments(id1, id2)` — validates both segments are active and adjacent
(the end of one equals `start - 1` of the other), creates a merged
`SegmentInfo` covering the combined range, seals both parents, and returns a
new layout.
+- `getLineage(segmentId)` — returns the full ancestor + descendant chain for a
segment, used for DAG traversal during catch-up reads.
+- `toMetadata()` / `fromMetadata()` — converts to/from the persisted
`ScalableTopicMetadata` format.
+
+#### SegmentInfo
+
+A record representing a single segment in the DAG:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `segmentId` | `long` | Unique, monotonically increasing identifier |
+| `hashRange` | `HashRange` | The `[start, end]` inclusive hash range this
segment covers |
+| `state` | `SegmentState` | `ACTIVE` or `SEALED` |
+| `parentIds` | `List<Long>` | Parent segments (empty for root segments) |
+| `childIds` | `List<Long>` | Child segments (empty for leaf/active segments) |
+| `createdAtEpoch` | `long` | Layout epoch in which this segment was created
(`0` for the initial segments) |
+| `sealedAtEpoch` | `long` | Layout epoch in which this segment was sealed;
only meaningful when `state == SEALED`. `0` when the segment is `ACTIVE`; this
sentinel is unambiguous because sealing always happens as part of a split or
merge, which increments the epoch — so a segment can never be sealed at epoch
`0`. |
+
+Factory methods: `SegmentInfo.active(id, range, epoch)` and
`SegmentInfo.sealed(...)`.
+
+#### SubscriptionCoordinator
+
+Manages segment-to-consumer assignments for a single subscription within a
scalable topic.
+
+**Assignment strategy:** round-robin. Active segments are sorted by hash range
start; consumers are sorted by name. Segments are distributed evenly across
consumers. When a consumer is added or removed, or when the layout changes
(split/merge), the coordinator recomputes assignments and pushes updates to
affected consumers.
+
+**Key operations:**
+
+- `addConsumer(consumer)` — adds a consumer, rebalances, and returns the full
assignment map.
+- `removeConsumer(consumer)` — removes a consumer and redistributes its
segments.
+- `onLayoutChange(newLayout)` — called after a split or merge; recomputes all
assignments against the new set of active segments.
+
+#### ConsumerRegistration and ConsumerAssignment
+
+`ConsumerRegistration` is a record identifying a connected consumer:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `consumerId` | `long` | Protocol-level consumer ID |
+| `consumerName` | `String` | Stable name chosen by the client |
+| `cnx` | `TransportCnx` | The connection to push assignment updates |
+
+`ConsumerAssignment` is the assignment sent to a consumer:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `layoutEpoch` | `long` | The layout epoch this assignment is based on |
+| `assignedSegments` | `List<AssignedSegment>` | The segments assigned to this
consumer |
+
+Each `AssignedSegment` contains: `segmentId`, `hashRange`, and
`underlyingTopicName` (the `segment://` topic name the consumer should connect
to).
+
+### Metadata Store Schema
+
+Scalable topic metadata is stored in the metadata store under a well-known
path structure:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}
+```
+
+The value at this path is a JSON-serialized `ScalableTopicMetadata`:
+
+```json
+{
+ "epoch": 3,
+ "nextSegmentId": 5,
+ "segments": {
+ "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, "state":
"SEALED", "parentIds": [], "childIds": [1, 2], "createdAtEpoch": 0,
"sealedAtEpoch": 1 },
+ "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, "state":
"SEALED", "parentIds": [0], "childIds": [3, 4], "createdAtEpoch": 1,
"sealedAtEpoch": 3 },
+ "2": { "segmentId": 2, "hashRange": {"start": 32768, "end": 65535},
"state": "ACTIVE", "parentIds": [0], "childIds": [], "createdAtEpoch": 1,
"sealedAtEpoch": 0 },
+ "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, "state":
"ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3,
"sealedAtEpoch": 0 },
+ "4": { "segmentId": 4, "hashRange": {"start": 16384, "end": 32767},
"state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3,
"sealedAtEpoch": 0 }
+ },
+ "properties": {}
+}
+```
+
+The controller leader lock is stored at:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}/controller
+```
+
+The value is the broker service URL of the elected leader.
+
+### DAG Watch Sessions
+
+As described in [Broker Roles](#broker-roles-and-request-routing), any broker
can serve a DAG watch session because the segment metadata lives in the shared
metadata store — no controller leader involvement is required. The session is
established via the binary protocol:
+
+1. **Client sends `CommandScalableTopicLookup`** with a client-assigned
`sessionId` and the `topic://` topic name.
+2. **Broker creates a `DagWatchSession`** that:
+ - Loads the current `ScalableTopicMetadata` from the store.
+ - Resolves which broker owns each active segment's `segment://` topic (via
topic lookup).
+ - Reads the controller broker URL from the leader lock path.
+ - Registers a metadata store notification listener for changes to the
topic's metadata path.
+3. **Broker sends `CommandScalableTopicUpdate`** with the initial
`ScalableTopicDAG` containing the full segment DAG, broker addresses, and
controller URL.
+4. **On metadata change**, the notification listener fires. The broker reloads
metadata, re-resolves broker addresses, and pushes an updated
`CommandScalableTopicUpdate` to the client.
+5. **Client sends `CommandScalableTopicClose`** to tear down the session.
+
+The `ScalableTopicDAG` protocol message contains:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `uint64` | Layout epoch |
+| `segments` | `repeated SegmentInfoProto` | Full segment DAG |
+| `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for each
active segment |
+| `controller_broker_url` | `string` | Controller leader's broker URL |
+| `controller_broker_url_tls` | `string` | TLS variant |
+
+---
+
+## Split Protocol
+
+Splitting a segment is the core scaling-out operation. The protocol is
carefully ordered to guarantee that **no messages are lost** and **all
subscription cursors exist before producers are redirected**.
+
+### Step-by-step
+
+```
+ Controller Metadata Store Broker(s)
+ │ │ │
+ 1. Discover │──── getSubscriptions ───────>│ │
+ subscriptions│<─── [sub-A, sub-B] ─────────│ │
+ │ │ │
+ 2. Create │──── createSegment(child1, ──>│ ─── route ────────> │
+ children │ [sub-A, sub-B]) │ │ create
topic
+ │──── createSegment(child2, ──>│ ─── route ────────> │ + cursors
+ │ [sub-A, sub-B]) │ │
+ │ │ │
+ 3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
+ parent │ (parent) │ │ seal ML
+ │ │ │
+ 4. Update │──── CAS update ─────────────>│ │
+ metadata │ (atomic) │ │
+ │ │ │
+ 5. Notify │──── push to consumers ──────────────────────────> │
+ consumers │ (rebalance) │ │
+```
+
+**Step 1: Discover subscriptions.** The controller queries the parent segment
topic for its list of subscriptions. It first checks locally (if the segment is
on this broker), falling back to the admin API for remote segments.
+
+**Step 2: Create child segment topics.** Two new `segment://` topics are
created via the Segments admin API. Each creation request includes the list of
subscriptions discovered in step 1. The Segments API handler creates the
persistent topic and initializes subscription cursors at the `Earliest`
position. The admin API routes to whichever broker owns the namespace bundle
for each segment, so child segments may be created on different brokers.
+
+**Step 3: Terminate parent segment.** The parent segment topic is terminated
via the Segments admin API, which writes a termination marker to the managed
ledger. After termination, producers writing to the parent receive
`TopicTerminated` and must re-route to child segments.
+
+**Step 4: Atomic metadata update.** The controller issues a compare-and-swap
(CAS) update to the metadata store. The update transitions the parent to
`SEALED` state with child pointers, and adds the two new `ACTIVE` child
segments. The CAS guarantees atomicity — if another operation modified the
metadata concurrently, the update fails and must be retried.
+
+**Step 5: Notify consumers.** All `SubscriptionCoordinator` instances are
notified of the layout change. They recompute segment-to-consumer assignments
and push updates to connected consumers.
+
+### Why this ordering matters
+
+- Steps 1-2 **before** step 4: if we updated metadata first, clients would
discover the new segments immediately. Producers would start writing to child
segments that have no subscription cursors yet, causing those messages to be
missed by consumers.
+- Step 3 **before** step 4: terminating the parent before the metadata update
ensures that by the time clients see the new layout, the parent is already
sealed. There is no window where both parent and children are writable.
+- Step 2 creates cursors at `Earliest`: this is safe because the child topics
are brand new (empty). The cursor starts at the beginning, and the first
message written by a redirected producer will be the first message consumed.
+
+---
+
+## Merge Protocol
+
+Merging two adjacent segments is the core scaling-in operation. The protocol
follows the same ordering principle: **create first, update metadata last**.
+
+### Step-by-step
+
+**Step 1: Discover subscriptions.** The controller queries both parent
segments for their subscriptions and takes the union, so no subscription is
lost.
+
+**Step 2: Create merged segment topic.** A single new `segment://` topic is
created with all discovered subscriptions.
+
+**Step 3: Terminate both parent segments.** Both parents are terminated via
the admin API. This is done sequentially to avoid a race where producers of one
parent are still writing while the other is already sealed.
+
+**Step 4: Atomic metadata update.** The CAS update seals both parents with
child pointers to the merged segment, and adds the new `ACTIVE` merged segment.
+
+**Step 5: Notify consumers.** Same as split — rebalance and push.
+
+### Cross-broker coordination
+
+Unlike splits, merges inherently involve multiple brokers because the two
parent segments may be owned by different brokers (assigned by the load
manager). The controller leader handles this transparently: all segment
operations (create, terminate, delete) go through the Segments admin API, which
routes each request to the broker that currently owns the target segment's
namespace bundle. The controller does not need to know or care which broker
owns which segment — the admin API routing [...]
+
+---
+
+## Segment Topic Management via Admin API
+
+Segment topics are managed exclusively through a dedicated REST API at
`/admin/v2/segments`. This is a deliberate design choice: segment topics use
the `segment://` domain and must not be confused with regular `persistent://`
topics.
+
+### Endpoints
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a segment
topic. Optional request body with subscription names to pre-create. |
+| `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | Terminate
(seal) a segment topic. |
+| `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a segment
topic. |
+
+The `{descriptor}` is the segment's hash range and ID in the format
`{hexStart}-{hexEnd}-{segmentId}` (e.g., `0000-7fff-1`).
+
+### Create Segment
+
+When creating a segment topic, the handler:
+
+1. Constructs the full `segment://` topic name from the path components.
+2. Validates namespace bundle ownership (the request is routed to the owning
broker).
+3. Creates the persistent topic via `BrokerService.getOrCreateTopic()`.
+4. For each subscription in the request body, creates a cursor at the
`Earliest` position.
+
+### Terminate Segment
+
+Terminates the managed ledger backing the segment topic. After termination,
the topic accepts no further writes, and producers receive `TopicTerminated`.
+
+### Delete Segment
+
+Deletes the segment topic and its managed ledger. Used during scalable topic
deletion to clean up all underlying storage.
+
+---
+
+## Consumer Assignment
+
+Consumer assignment behavior depends on the consumer type:
+
+- **StreamConsumer** (ordered, cumulative ack) — requires coordinated
segment-to-consumer assignment from the controller leader. Each active segment
is owned by exactly one consumer at any time to preserve per-segment ordering.
This is the subject of the rest of this section.
+- **CheckpointConsumer** (unmanaged, for connectors) — same as StreamConsumer
from a controller perspective: it registers with the controller leader and
receives an explicit segment assignment.
+- **QueueConsumer** (unordered, individual ack) — **does not require any
controller-side assignment**. Because there is no ordering requirement, every
queue consumer attaches directly to **all** segments of the scalable topic —
both `ACTIVE` and `SEALED` — and each segment-owning broker independently
performs round-robin delivery across the queue consumers attached to that
segment. New segments produced by a split are picked up transparently via the
DAG watch session push; sealed segment [...]
+
+### Registration Flow (StreamConsumer / CheckpointConsumer)
+
+1. A `StreamConsumer` connects to the controller broker (discovered via the
DAG watch session's `controller_broker_url`).
+2. The consumer sends a registration request with its `subscription` name and
`consumerName`.
+3. The controller's `registerConsumer()` method routes to the appropriate
`SubscriptionCoordinator` (created on first use for that subscription).
+4. The coordinator adds the consumer, recomputes assignments, and returns the
consumer's `ConsumerAssignment` — a list of `(segmentId, hashRange,
segmentTopicName)` tuples.
+5. The consumer connects to each assigned `segment://` topic and begins
consuming.
+
+### Rebalancing
+
+Rebalancing occurs when:
+
+- A consumer is added or removed.
+- A split or merge changes the set of active segments.
+
+The rebalancing algorithm is round-robin:
+
+1. Collect all active segments, sorted by hash range start.
+2. Collect all consumers, sorted by name (for deterministic ordering).
+3. Assign segments to consumers in round-robin order.
+4. For each consumer whose assignment changed, push the new
`ConsumerAssignment`.
+
+### Consumer Session Lifecycle
+
+A consumer's registration and its segment assignment are treated as a
**persistent session**, not as a transient association tied to a TCP
connection. This is a deliberate departure from the existing Pulsar consumer
model, where consumer liveness is asserted purely by the TCP connection to the
broker.
+
+**Rationale.** Scalable topic consumer assignments have non-trivial cost: when
a consumer disappears, its segments are redistributed among the remaining
consumers, each of which may need to reconnect to different segment brokers and
(for ordered consumers) drain in-flight messages before the new assignment
takes effect. Treating a brief disconnection as a full consumer loss would
cause unnecessary rebalancing in common scenarios such as a consumer process
restart or a broker restart.
+
+**What is persisted vs. in-memory.**
+
+The **session itself is persisted** — the keep-alive state is not.
+
+- **Persisted in the metadata store (the session):** the
`ConsumerRegistration` — consumer name and its current segment assignment —
keyed by `consumerName` under the subscription path. Once a consumer
successfully registers, this entry is durable and outlives TCP disconnects,
client restarts, and controller leader failovers. The assignment survives
failover without forcing consumers to re-register.
+- **In-memory on the controller leader (the keep-alive):** whether each
consumer is currently connected, and the grace-period timer that runs while it
is disconnected. Keep-alive signals are **not** written to the metadata store;
the leader observes the consumer's connection state directly and tracks the
timer in RAM. This avoids a metadata store write on every liveness tick.
+
+**Session semantics (steady state).**
+
+- When a consumer registers, the `SubscriptionCoordinator` writes its
`ConsumerRegistration` and marks it connected in-memory.
+- If the consumer's connection drops, the leader does **not** immediately
remove the consumer. It transitions the in-memory state to "disconnected" and
starts a configurable **session grace period** timer for that consumer.
+- If the same `consumerName` reconnects within the grace period, the timer is
cancelled and the consumer resumes with the same persisted assignment — no
rebalance, no other consumer disturbed.
+- If the grace period expires, the leader deletes the `ConsumerRegistration`
from the metadata store and triggers a rebalance for the remaining consumers.
+
+**Behavior on controller leader failover.**
+
+Because liveness timers are in-memory only, they are lost when the leader
broker crashes. The new leader:
+
+1. Loads all `ConsumerRegistration` entries for each subscription from the
metadata store, restoring the prior segment assignment.
+2. Treats every consumer as "just disconnected" and starts a fresh
grace-period timer for each. No timestamps carry across from the previous
leader.
+3. Consumers reconnecting to the new leader within the fresh grace period
resume with the same assignment — seamless from their perspective.
+4. Consumers that fail to reconnect before the fresh timer expires are evicted
and their segments are redistributed.
+
+This means a leader failover always gives clients a full grace period to
reconnect, regardless of how long they had already been disconnected under the
old leader. The trade-off is explicit: we keep the metadata store writes
proportional to real membership changes (register / unregister / rebalance)
rather than to liveness ticks.
+
+**Covered scenarios.**
+
+- **Consumer process restart.** A client process restarts and reconnects with
the same `consumerName` well within the grace period — the same segments are
reassigned, no redistribution.
+- **Controller leader restart.** The leader is re-elected on a different
broker. Consumer entries and assignments are restored from the metadata store;
the new leader starts a fresh grace-period timer for every consumer so
reconnects are transparent.
+- **Segment-owning broker restart.** Segment topics are reassigned by the
standard Pulsar load manager; the consumer's segment-to-consumer assignment is
unaffected and reconnects to the new owning broker are transparent at the
consumer-assignment level.
+
+The grace period and related tunables are configurable per broker and will be
specified in a follow-up sub-PIP.
+
+### Layout Change Propagation
+
+When a split or merge completes and the metadata is updated:
+
+1. The controller reloads the metadata and creates a new `SegmentLayout`.
+2. `notifySubscriptions()` is called, which iterates all
`SubscriptionCoordinator` instances.
+3. Each coordinator calls `onLayoutChange(newLayout)`, which recomputes
assignments against the new active segments and pushes updates to affected
consumers.
+
+---
+
+## Leader Election and Failover
+
+### Election
+
+Each `ScalableTopicController` participates in leader election via the
metadata store's `LeaderElection` API. The election path is:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}/controller
+```
+
+The elected leader writes its broker service URL as the election value.
Clients discover the controller by reading this value (delivered as part of the
DAG watch session response).
+
+### Failover
+
+When the leader broker fails:
+
+1. The metadata store detects the session loss and clears the leader lock.
+2. Other brokers with active controllers for the same topic receive a
`NoLeader` state change notification.
+3. The `ScalableTopicService.onLeaderStateChange()` handler re-invokes
`controller.initialize()`, which reloads metadata and re-attempts election.
+4. The new leader restores in-memory state from the persisted metadata,
including all subscriptions and their consumer registrations with current
assignments. Consumers reconnecting within the session grace period find their
sessions intact and resume with the same segment assignment without a rebalance
(see [Consumer Session Lifecycle](#consumer-session-lifecycle)).
+
+### Non-leader brokers
+
+A broker that loses leadership or never wins it retains the controller
instance but all mutating operations (`splitSegment`, `mergeSegments`,
`registerConsumer`) throw `IllegalStateException` after a `checkLeader()`
guard. The controller remains available for read operations like `getLayout()`.
+
+---
+
+## Scalable Topic Lifecycle
+
+### Creation
+
+```
+Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
+ └─> ScalableTopicService.createScalableTopic(topic, numInitialSegments)
+ ├─> ScalableTopicController.createInitialMetadata(numInitialSegments)
+ │ └─> Divides [0x0000, 0xFFFF] into N equal ranges
+ │ Creates N SegmentInfo records (ACTIVE, no parents)
+ │ Returns ScalableTopicMetadata with epoch=0
+ ├─> ScalableTopicResources.createScalableTopicAsync(topic, metadata)
+ │ └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
+ └─> For each segment: createSegmentAsync via Segments admin API
+ └─> Creates segment:// topic on owning broker
+```
+
+### Deletion
+
+```
+Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
+ └─> ScalableTopicService.deleteScalableTopic(topic)
+ ├─> releaseController(topic)
+ │ └─> Closes leader election, removes from controllers map
+ ├─> Load metadata from store
+ ├─> For each segment: deleteSegmentAsync via Segments admin API
+ │ └─> Deletes segment:// topic on owning broker (best-effort)
+ └─> ScalableTopicResources.deleteScalableTopicAsync(topic)
+ └─> Removes metadata from store
+```
+
+---
+
+## Public-Facing Changes
+
+### Binary Protocol
+
+Three new protocol commands (added to `PulsarApi.proto`):
+
+| Command | Direction | Field ID | Description |
+|---------|-----------|----------|-------------|
+| `CommandScalableTopicLookup` | Client → Broker | 70 | Initiates a DAG watch
session |
+| `CommandScalableTopicUpdate` | Broker → Client | 71 | Initial response and
subsequent pushed updates |
+| `CommandScalableTopicClose` | Client → Broker | 72 | Closes the DAG watch
session |
+
+New protocol messages:
+
+- `ScalableTopicDAG` — the full segment DAG with broker addresses and
controller URL.
+- `SegmentInfoProto` — per-segment record in the DAG.
+- `SegmentBrokerAddress` — maps a segment ID to its owning broker.
+- `SegmentState` enum — `ACTIVE` or `SEALED`.
+
+### Admin API
+
+**Scalable Topics** (`/admin/v2/scalable`):
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `GET` | `/{tenant}/{namespace}` | List scalable topics |
+| `PUT` | `/{tenant}/{namespace}/{topic}` | Create scalable topic |
+| `GET` | `/{tenant}/{namespace}/{topic}` | Get topic metadata |
+| `DELETE` | `/{tenant}/{namespace}/{topic}` | Delete scalable topic |
+| `GET` | `/{tenant}/{namespace}/{topic}/stats` | Get aggregated stats for the
scalable topic (segment counts, per-segment rates, per-subscription state,
consumer counts) |
+| `PUT` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` |
Create a subscription on the scalable topic. The controller propagates it to
all active segments by issuing `createSubscription` on each `segment://` topic
via the Segments admin API. |
+| `DELETE` | `/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` |
Delete a subscription. The controller unregisters all consumers and deletes the
subscription from every segment. |
+| `POST` | `/{tenant}/{namespace}/{topic}/split/{segmentId}` | Split a segment
|
+| `POST` | `/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}` |
Merge two segments |
+
+**Segments** (`/admin/v2/segments`):
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create segment topic |
+| `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | Terminate
segment |
+| `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete segment
topic |
+
+### Admin Client API
+
+New `ScalableTopics` interface on `PulsarAdmin`:
+
+- `listScalableTopics(namespace)` / `listScalableTopicsAsync(namespace)`
+- `createScalableTopic(topic, numSegments)` / `createScalableTopicAsync(...)`
+- `getMetadata(topic)` / `getMetadataAsync(topic)`
+- `getStats(topic)` / `getStatsAsync(topic)`
+- `deleteScalableTopic(topic)` / `deleteScalableTopicAsync(topic)`
+- `createSubscription(topic, subscription)` / `createSubscriptionAsync(...)`
+- `deleteSubscription(topic, subscription)` / `deleteSubscriptionAsync(...)`
+- `splitSegment(topic, segmentId)` / `splitSegmentAsync(...)`
+- `mergeSegments(topic, segmentId1, segmentId2)` / `mergeSegmentsAsync(...)`
+- `createSegment(segmentTopic, subscriptions)` / `createSegmentAsync(...)`
+- `terminateSegment(segmentTopic)` / `terminateSegmentAsync(...)`
+- `deleteSegment(segmentTopic, force)` / `deleteSegmentAsync(...)`
+
+### Configuration
+
+| Property | Default | Description |
+|----------|---------|-------------|
+| `scalableTopicEnabled` | `true` | Enable scalable topic support on the
broker |
+
+Additional configuration for auto-split thresholds and consumer controller
session grace period will be specified in follow-up sub-PIPs.
+
+### Metadata Store Paths
+
+| Path | Content |
+|------|---------|
+| `/topics/{tenant}/{namespace}/{topic}` | `ScalableTopicMetadata` JSON —
segment DAG and global topic state |
+| `/topics/{tenant}/{namespace}/{topic}/controller` | Leader broker URL
(ephemeral) |
+| `/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}` |
`SubscriptionMetadata` — subscription-level config and the persisted set of
consumer registrations |
+|
`/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}`
| `ConsumerRegistration` — the durable session: consumer name and its current
segment assignment. Keep-alive state (connected/disconnected, grace-period
timer) is in-memory on the controller leader only and is **not** persisted
here. |
+
+---
+
+## Backward & Forward Compatibility
+
+### Upgrade
+
+The controller is a new component with no interaction with existing
partitioned or non-partitioned topics. Enabling `scalableTopicEnabled`
activates the new code paths. Existing topics are unaffected.
+
+### Downgrade / Rollback
+
+Scalable topic metadata and segment data remain in the metadata store and
BookKeeper. Rolling back to a version without scalable topic support leaves
this data intact but inaccessible. Upgrading again restores access.
+
+### Client Compatibility
+
+The existing Pulsar client SDK rejects `topic://` and `segment://` domains
with a clear error message directing users to the V5 client SDK. This check is
enforced in `PulsarClientImpl` for producer, consumer, and reader creation
paths.
+
+---
+
+## Security Considerations
+
+The controller follows the same tenant/namespace/topic authorization model as
existing Pulsar topics:
+
+- Admin API operations require topic-level admin permissions.
+- DAG watch sessions require topic-level lookup permissions.
+- Consumer registration requires topic-level consume permissions.
+- Segment operations are internal and authenticated via the broker's internal
admin client.
+
+The controller leader lock in the metadata store is accessible only to
authenticated brokers.
+
+---
+
+## Links
+
+- Parent PIP: [PIP-460: Scalable Topics](pip-460.md)
+- V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
+- Mailing List discussion thread: TBD