lhotari commented on code in PR #25516:
URL: https://github.com/apache/pulsar/pull/25516#discussion_r3112289961
##########
pip/pip-468.md:
##########
@@ -0,0 +1,615 @@
+# PIP-468: Scalable Topic Controller
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Motivation
+
+[PIP-460](pip-460.md) introduces scalable topics as a new topic type in
Pulsar, built on a DAG of range segments that can be split and merged to scale
independently of the initial configuration. PIP-460 defines the overall vision
but defers the detailed design of each component to dedicated sub-PIPs.
+
+This PIP specifies the **Scalable Topic Controller** — the broker-side
component responsible for:
+
+1. **Segment lifecycle management** — creating, terminating, and deleting the
underlying segment topics that back a scalable topic.
+2. **Segment layout coordination** — executing split and merge operations with
correct ordering guarantees so that no messages are lost and all subscription
cursors exist before producers are redirected.
+3. **Consumer assignment** — coordinating which consumers receive messages
from which segments for a given subscription.
+4. **Leader election** — ensuring exactly one broker acts as the controller
for each scalable topic, with automatic failover.
+5. **DAG watch sessions** — pushing topology updates to connected clients
(producers and consumers) when the segment layout changes.
+
+Without a well-defined controller, split and merge operations would be unsafe:
producers could write to segments that have no subscription cursors, consumers
could miss messages during topology changes, and concurrent layout
modifications could corrupt the segment DAG.
+
+---
+
+## Design
+
+### Architecture Overview
+
+The controller subsystem consists of four layers:
+
+```
+┌──────────────────────────────────────────────────────────┐
+│ Admin API Layer │
+│ ScalableTopics REST + Segments REST │
+└──────────────┬───────────────────────┬───────────────────┘
+ │ │
+┌──────────────▼───────────────────────▼───────────────────┐
+│ ScalableTopicService │
+│ Per-broker singleton; manages controllers │
+└──────────────────────────┬───────────────────────────────┘
+ │
+┌──────────────────────────▼───────────────────────────────┐
+│ ScalableTopicController (per topic) │
+│ Leader-elected; owns layout, split/merge, consumers │
+│ │
+│ ┌─────────────────┐ ┌───────────────────────────────┐ │
+│ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │
+│ │ (immutable DAG) │ │ per-subscription assignments │ │
+│ └─────────────────┘ └───────────────────────────────┘ │
+└──────────────────────────┬───────────────────────────────┘
+ │
+┌──────────────────────────▼───────────────────────────────┐
+│ ScalableTopicResources │
+│ Metadata store access (read/write/watch) │
+└──────────────────────────────────────────────────────────┘
+```
+
+### Broker Roles and Request Routing
+
+A scalable topic involves three distinct broker roles. Understanding which
broker handles which operation is critical to the design.
+
+#### Any broker — DAG watch sessions
+
+A **DAG watch session** (scalable topic lookup) can be served by **any
broker** in the cluster. The session reads segment metadata from the shared
metadata store and registers a watch for changes. No state is held on the
broker beyond the watch registration. This means producers and consumers can
connect to any broker for topic discovery — the same way regular topic lookups
work today.
+
+When the metadata changes (due to a split or merge performed by the controller
leader), the metadata store notification fires on whichever broker is serving
the watch session, and that broker pushes the updated DAG to the client.
+
+#### Controller leader — layout mutations and consumer assignment
+
+Each scalable topic has exactly one **controller leader** across the cluster,
elected via the metadata store. The controller leader is the only broker that
can:
+
+- Execute **split** and **merge** operations (the multi-step protocols
described below).
+- Accept **consumer registrations** and compute segment-to-consumer
assignments.
+- **Notify consumers** of assignment changes after topology updates.
+
+Clients discover the controller leader's broker URL through the DAG watch
session response (`controller_broker_url` field). StreamConsumer and
CheckpointConsumer clients then connect directly to the controller leader to
register and receive assignments.
+
+Admin API requests for split and merge are routed to the
`ScalableTopicService` on any broker, which obtains (or creates) a local
`ScalableTopicController` that participates in leader election. If the local
controller is the leader, it executes the operation directly. If not, the
request must be redirected to the leader (or the controller must first win the
election). The split and merge admin endpoints do not require the request to
land on the leader broker — the `ScalableTopicService` handles this
transparently.
+
+#### Segment-owning brokers — produce and consume
+
+Individual **segment topics** (`segment://`) are regular persistent topics
from the broker's perspective. They are distributed across the cluster by the
standard Pulsar **load manager** and namespace bundle assignment — the same
mechanism used for `persistent://` topics. Each segment topic is owned by
exactly one broker at any time, determined by its namespace bundle hash.
+
+The controller leader does not need to own any of the segment topics. When the
controller needs to operate on a segment (create, terminate, delete, discover
subscriptions), it uses the **Segments admin API**, which routes the request to
whichever broker currently owns that segment's namespace bundle. This
decoupling means the controller can coordinate segments spread across many
brokers without requiring co-location.
+
+```
+┌─────────────┐ ┌─────────────┐ ┌─────────────┐
+│ Broker A │ │ Broker B │ │ Broker C │
+│ │ │ (controller │ │ │
+│ segment-0 │ │ leader) │ │ segment-2 │
+│ segment-1 │ │ │ │ │
+│ │ │ DAG watch │ │ DAG watch │
+│ DAG watch │ │ sessions │ │ sessions │
+│ sessions │ │ │ │ │
+└─────────────┘ └─────────────┘ └─────────────┘
+ │
+ Split/merge ops
+ use admin API to
+ reach segments on
+ Broker A and C
+```
+
+In this example, Broker B is the controller leader but owns no segments. When
it executes a split of segment-0, it calls the Segments admin API which routes
to Broker A (the owner of segment-0).
+
+### Components
+
+#### ScalableTopicService
+
+A per-broker singleton, created and owned by `BrokerService`. It manages the
lifecycle of all `ScalableTopicController` instances on the broker.
+
+**Responsibilities:**
+
+- **Controller management**: maintains a map of active controllers keyed by
topic name. Creates controllers on demand via `getOrCreateController()` and
releases them on topic unload or broker shutdown.
+- **Topic lifecycle**: handles `createScalableTopic()` and
`deleteScalableTopic()` admin operations, including creating/deleting the
underlying segment topics via the Segments admin API.
+- **Delegation**: routes `splitSegment()` and `mergeSegments()` requests to
the appropriate controller.
+- **Leader state monitoring**: listens for leader election state changes and
re-attempts election when the current leader is lost.
+
+#### ScalableTopicController
+
+A per-topic coordinator. Only one instance across the cluster is the
**leader** for a given scalable topic, ensured by leader election through the
metadata store. The leader is the only instance that modifies the segment
layout or assigns consumers.
+
+**State:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `topicName` | `TopicName` | The `topic://` name of the scalable topic |
+| `currentLayout` | `SegmentLayout` | The current immutable snapshot of the
segment DAG |
+| `subscriptions` | `Map<String, SubscriptionCoordinator>` | Per-subscription
consumer coordinators |
+| `leaderState` | `LeaderElectionState` | Current leader election state |
+| `leaderElection` | `LeaderElection<String>` | Metadata store leader election
handle; value is the broker URL |
+
+**Key operations:**
+
+- `initialize()` — loads the current metadata from the store, then attempts
leader election. The elected leader stores its broker service URL so that
clients can discover and connect to it.
+- `splitSegment(segmentId)` — splits an active segment at its midpoint (see
[Split Protocol](#split-protocol)).
+- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active
segments (see [Merge Protocol](#merge-protocol)).
+- `registerConsumer(subscription, consumer)` — registers a consumer and
returns its initial segment assignment.
+- `unregisterConsumer(subscription, consumer)` — removes a consumer and
triggers rebalancing.
+- `getLeaderBrokerUrl()` — returns the current leader's broker URL, used by
clients to connect to the controller.
+
+#### SegmentLayout
+
+An immutable, versioned snapshot of the segment DAG. All mutation methods
(`splitSegment`, `mergeSegments`, `pruneSegment`) return a **new**
`SegmentLayout` instance — the original is never modified. This ensures safe
concurrent reads without locking.
+
+**Fields:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `long` | Monotonically increasing version; incremented on every
layout change |
+| `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
+| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all segments
(active + sealed) |
+| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only `ACTIVE`
segments |
+
+**Key operations:**
+
+- `findActiveSegment(hash)` — returns the active segment whose hash range
contains the given hash value. Used by producers for message routing.
+- `splitSegment(segmentId)` — validates the segment is active, computes the
midpoint of its hash range, creates two child `SegmentInfo` records covering
`[start, mid]` and `[mid+1, end]`, seals the parent, and returns a new layout
with incremented epoch.
+- `mergeSegments(id1, id2)` — validates both segments are active and adjacent
(the end of one equals `start - 1` of the other), creates a merged
`SegmentInfo` covering the combined range, seals both parents, and returns a
new layout.
+- `getLineage(segmentId)` — returns the full ancestor + descendant chain for a
segment, used for DAG traversal during catch-up reads.
+- `toMetadata()` / `fromMetadata()` — converts to/from the persisted
`ScalableTopicMetadata` format.
+
+#### SegmentInfo
+
+A record representing a single segment in the DAG:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `segmentId` | `long` | Unique, monotonically increasing identifier |
+| `hashRange` | `HashRange` | The `[start, end]` inclusive hash range this
segment covers |
+| `state` | `SegmentState` | `ACTIVE` or `SEALED` |
+| `parentIds` | `List<Long>` | Parent segments (empty for root segments) |
+| `childIds` | `List<Long>` | Child segments (empty for leaf/active segments) |
+| `createdAtEpoch` | `long` | Layout epoch in which this segment was created
(`0` for the initial segments) |
+| `sealedAtEpoch` | `long` | Layout epoch in which this segment was sealed;
only meaningful when `state == SEALED`. `0` when the segment is `ACTIVE`; this
sentinel is unambiguous because sealing always happens as part of a split or
merge, which increments the epoch — so a segment can never be sealed at epoch
`0`. |
+
+Factory methods: `SegmentInfo.active(id, range, epoch)` and
`SegmentInfo.sealed(...)`.
+
+#### SubscriptionCoordinator
+
+Manages segment-to-consumer assignments for a single subscription within a
scalable topic.
+
+**Assignment strategy:** round-robin. Active segments are sorted by hash range
start; consumers are sorted by name. Segments are distributed evenly across
consumers. When a consumer is added or removed, or when the layout changes
(split/merge), the coordinator recomputes assignments and pushes updates to
affected consumers.
+
+**Key operations:**
+
+- `addConsumer(consumer)` — adds a consumer, rebalances, and returns the full
assignment map.
+- `removeConsumer(consumer)` — removes a consumer and redistributes its
segments.
+- `onLayoutChange(newLayout)` — called after a split or merge; recomputes all
assignments against the new set of active segments.
+
+#### ConsumerRegistration and ConsumerAssignment
+
+`ConsumerRegistration` is a record identifying a connected consumer:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `consumerId` | `long` | Protocol-level consumer ID |
+| `consumerName` | `String` | Stable name chosen by the client |
+| `cnx` | `TransportCnx` | The connection to push assignment updates |
+
+`ConsumerAssignment` is the assignment sent to a consumer:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `layoutEpoch` | `long` | The layout epoch this assignment is based on |
+| `assignedSegments` | `List<AssignedSegment>` | The segments assigned to this
consumer |
+
+Each `AssignedSegment` contains: `segmentId`, `hashRange`, and
`underlyingTopicName` (the `segment://` topic name the consumer should connect
to).
+
+### Metadata Store Schema
+
+Scalable topic metadata is stored in the metadata store under a well-known
path structure:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}
+```
+
+The value at this path is a JSON-serialized `ScalableTopicMetadata`:
+
+```json
+{
+ "epoch": 3,
+ "nextSegmentId": 5,
+ "segments": {
+ "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, "state":
"SEALED", "parentIds": [], "childIds": [1, 2], "createdAtEpoch": 0,
"sealedAtEpoch": 1 },
+ "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, "state":
"SEALED", "parentIds": [0], "childIds": [3, 4], "createdAtEpoch": 1,
"sealedAtEpoch": 3 },
+ "2": { "segmentId": 2, "hashRange": {"start": 32768, "end": 65535},
"state": "ACTIVE", "parentIds": [0], "childIds": [], "createdAtEpoch": 1,
"sealedAtEpoch": 0 },
+ "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, "state":
"ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3,
"sealedAtEpoch": 0 },
+ "4": { "segmentId": 4, "hashRange": {"start": 16384, "end": 32767},
"state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3,
"sealedAtEpoch": 0 }
+ },
+ "properties": {}
+}
+```
+
+The controller leader lock is stored at:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}/controller
+```
+
+The value is the broker service URL of the elected leader.
+
+### DAG Watch Sessions
+
+As described in [Broker Roles](#broker-roles-and-request-routing), any broker
can serve a DAG watch session because the segment metadata lives in the shared
metadata store — no controller leader involvement is required. The session is
established via the binary protocol:
+
+1. **Client sends `CommandScalableTopicLookup`** with a client-assigned
`sessionId` and the `topic://` topic name.
+2. **Broker creates a `DagWatchSession`** that:
+ - Loads the current `ScalableTopicMetadata` from the store.
+ - Resolves which broker owns each active segment's `segment://` topic (via
topic lookup).
+ - Reads the controller broker URL from the leader lock path.
+ - Registers a metadata store notification listener for changes to the
topic's metadata path.
+3. **Broker sends `CommandScalableTopicUpdate`** with the initial
`ScalableTopicDAG` containing the full segment DAG, broker addresses, and
controller URL.
+4. **On metadata change**, the notification listener fires. The broker reloads
metadata, re-resolves broker addresses, and pushes an updated
`CommandScalableTopicUpdate` to the client.
+5. **Client sends `CommandScalableTopicClose`** to tear down the session.
+
+The `ScalableTopicDAG` protocol message contains:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `uint64` | Layout epoch |
+| `segments` | `repeated SegmentInfoProto` | Full segment DAG |
+| `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for each
active segment |
+| `controller_broker_url` | `string` | Controller leader's broker URL |
+| `controller_broker_url_tls` | `string` | TLS variant |
+
+---
+
+## Split Protocol
+
+Splitting a segment is the core scaling-out operation. The protocol is
carefully ordered to guarantee that **no messages are lost** and **all
subscription cursors exist before producers are redirected**.
+
+### Step-by-step
+
+```
+ Controller Metadata Store Broker(s)
+ │ │ │
+ 1. Discover │──── getSubscriptions ───────>│ │
+ subscriptions│<─── [sub-A, sub-B] ─────────│ │
+ │ │ │
+ 2. Create │──── createSegment(child1, ──>│ ─── route ────────> │
+ children │ [sub-A, sub-B]) │ │ create
topic
+ │──── createSegment(child2, ──>│ ─── route ────────> │ + cursors
+ │ [sub-A, sub-B]) │ │
+ │ │ │
+ 3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
+ parent │ (parent) │ │ seal ML
+ │ │ │
+ 4. Update │──── CAS update ─────────────>│ │
+ metadata │ (atomic) │ │
+ │ │ │
+ 5. Notify │──── push to consumers ──────────────────────────> │
+ consumers │ (rebalance) │ │
+```
+
+**Step 1: Discover subscriptions.** The controller queries the parent segment
topic for its list of subscriptions. It first checks locally (if the segment is
on this broker), falling back to the admin API for remote segments.
+
+**Step 2: Create child segment topics.** Two new `segment://` topics are
created via the Segments admin API. Each creation request includes the list of
subscriptions discovered in step 1. The Segments API handler creates the
persistent topic and initializes subscription cursors at the `Earliest`
position. The admin API routes to whichever broker owns the namespace bundle
for each segment, so child segments may be created on different brokers.
+
+**Step 3: Terminate parent segment.** The parent segment topic is terminated
via the Segments admin API, which writes a termination marker to the managed
ledger. After termination, producers writing to the parent receive
`TopicTerminated` and must re-route to child segments.
+
+**Step 4: Atomic metadata update.** The controller issues a compare-and-swap
(CAS) update to the metadata store. The update transitions the parent to
`SEALED` state with child pointers, and adds the two new `ACTIVE` child
segments. The CAS guarantees atomicity — if another operation modified the
metadata concurrently, the update fails and must be retried.
+
+**Step 5: Notify consumers.** All `SubscriptionCoordinator` instances are
notified of the layout change. They recompute segment-to-consumer assignments
and push updates to connected consumers.
+
+### Why this ordering matters
+
+- Steps 1-2 **before** step 4: if we updated metadata first, clients would
discover the new segments immediately. Producers would start writing to child
segments that have no subscription cursors yet, causing those messages to be
missed by consumers.
+- Step 3 **before** step 4: terminating the parent before the metadata update
ensures that by the time clients see the new layout, the parent is already
sealed. There is no window where both parent and children are writable.
+- Step 2 creates cursors at `Earliest`: this is safe because the child topics
are brand new (empty). The cursor starts at the beginning, and the first
message written by a redirected producer will be the first message consumed.
+
+---
+
+## Merge Protocol
+
+Merging two adjacent segments is the core scaling-in operation. The protocol
follows the same ordering principle: **create first, update metadata last**.
+
+### Step-by-step
+
+**Step 1: Discover subscriptions.** The controller queries both parent
segments for their subscriptions and takes the union, so no subscription is
lost.
+
+**Step 2: Create merged segment topic.** A single new `segment://` topic is
created with all discovered subscriptions.
+
+**Step 3: Terminate both parent segments.** Both parents are terminated via
the admin API. This is done sequentially to avoid a race where producers of one
parent are still writing while the other is already sealed.
+
+**Step 4: Atomic metadata update.** The CAS update seals both parents with
child pointers to the merged segment, and adds the new `ACTIVE` merged segment.
+
+**Step 5: Notify consumers.** Same as split — rebalance and push.
+
+### Cross-broker coordination
+
+Unlike splits, merges inherently involve multiple brokers because the two
parent segments may be owned by different brokers (assigned by the load
manager). The controller leader handles this transparently: all segment
operations (create, terminate, delete) go through the Segments admin API, which
routes each request to the broker that currently owns the target segment's
namespace bundle. The controller does not need to know or care which broker
owns which segment — the admin API routing handles it.
+
+---
+
+## Segment Topic Management via Admin API
+
+Segment topics are managed exclusively through a dedicated REST API at
`/admin/v2/segments`. This is a deliberate design choice: segment topics use
the `segment://` domain and must not be confused with regular `persistent://`
topics.
+
+### Endpoints
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a segment
topic. Optional request body with subscription names to pre-create. |
+| `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | Terminate
(seal) a segment topic. |
+| `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a segment
topic. |
Review Comment:
discussed. we can keep these available on the current API endpoint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]