lhotari commented on code in PR #25516:
URL: https://github.com/apache/pulsar/pull/25516#discussion_r3111777609


##########
pip/pip-468.md:
##########
@@ -0,0 +1,615 @@
+# PIP-468: Scalable Topic Controller
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Motivation
+
+[PIP-460](pip-460.md) introduces scalable topics as a new topic type in 
Pulsar, built on a DAG of range segments that can be split and merged to scale 
independently of the initial configuration. PIP-460 defines the overall vision 
but defers the detailed design of each component to dedicated sub-PIPs.
+
+This PIP specifies the **Scalable Topic Controller** — the broker-side 
component responsible for:
+
+1. **Segment lifecycle management** — creating, terminating, and deleting the 
underlying segment topics that back a scalable topic.
+2. **Segment layout coordination** — executing split and merge operations with 
correct ordering guarantees so that no messages are lost and all subscription 
cursors exist before producers are redirected.
+3. **Consumer assignment** — coordinating which consumers receive messages 
from which segments for a given subscription.
+4. **Leader election** — ensuring exactly one broker acts as the controller 
for each scalable topic, with automatic failover.
+5. **DAG watch sessions** — pushing topology updates to connected clients 
(producers and consumers) when the segment layout changes.
+
+Without a well-defined controller, split and merge operations would be unsafe: 
producers could write to segments that have no subscription cursors, consumers 
could miss messages during topology changes, and concurrent layout 
modifications could corrupt the segment DAG.
+
+---
+
+## Design
+
+### Architecture Overview
+
+The controller subsystem consists of four layers:
+
+```
+┌──────────────────────────────────────────────────────────┐
+│                    Admin API Layer                        │
+│         ScalableTopics REST  +  Segments REST             │
+└──────────────┬───────────────────────┬───────────────────┘
+               │                       │
+┌──────────────▼───────────────────────▼───────────────────┐
+│                  ScalableTopicService                     │
+│         Per-broker singleton; manages controllers         │
+└──────────────────────────┬───────────────────────────────┘
+                           │
+┌──────────────────────────▼───────────────────────────────┐
+│              ScalableTopicController (per topic)          │
+│   Leader-elected; owns layout, split/merge, consumers     │
+│                                                           │
+│   ┌─────────────────┐  ┌───────────────────────────────┐ │
+│   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
+│   │  (immutable DAG) │  │  per-subscription assignments │ │
+│   └─────────────────┘  └───────────────────────────────┘ │
+└──────────────────────────┬───────────────────────────────┘
+                           │
+┌──────────────────────────▼───────────────────────────────┐
+│              ScalableTopicResources                       │
+│         Metadata store access (read/write/watch)          │
+└──────────────────────────────────────────────────────────┘
+```
+
+### Broker Roles and Request Routing
+
+A scalable topic involves three distinct broker roles. Understanding which 
broker handles which operation is critical to the design.
+
+#### Any broker — DAG watch sessions
+
+A **DAG watch session** (scalable topic lookup) can be served by **any 
broker** in the cluster. The session reads segment metadata from the shared 
metadata store and registers a watch for changes. No state is held on the 
broker beyond the watch registration. This means producers and consumers can 
connect to any broker for topic discovery — the same way regular topic lookups 
work today.
+
+When the metadata changes (due to a split or merge performed by the controller 
leader), the metadata store notification fires on whichever broker is serving 
the watch session, and that broker pushes the updated DAG to the client.
+
+#### Controller leader — layout mutations and consumer assignment
+
+Each scalable topic has exactly one **controller leader** across the cluster, 
elected via the metadata store. The controller leader is the only broker that 
can:
+
+- Execute **split** and **merge** operations (the multi-step protocols 
described below).
+- Accept **consumer registrations** and compute segment-to-consumer 
assignments.
+- **Notify consumers** of assignment changes after topology updates.
+
+Clients discover the controller leader's broker URL through the DAG watch 
session response (`controller_broker_url` field). StreamConsumer and 
CheckpointConsumer clients then connect directly to the controller leader to 
register and receive assignments.
+
+Admin API requests for split and merge are routed to the 
`ScalableTopicService` on any broker, which obtains (or creates) a local 
`ScalableTopicController` that participates in leader election. If the local 
controller is the leader, it executes the operation directly. If not, the 
request must be redirected to the leader (or the controller must first win the 
election). The split and merge admin endpoints do not require the request to 
land on the leader broker — the `ScalableTopicService` handles this 
transparently.
+
+#### Segment-owning brokers — produce and consume
+
+Individual **segment topics** (`segment://`) are regular persistent topics 
from the broker's perspective. They are distributed across the cluster by the 
standard Pulsar **load manager** and namespace bundle assignment — the same 
mechanism used for `persistent://` topics. Each segment topic is owned by 
exactly one broker at any time, determined by its namespace bundle hash.
+
+The controller leader does not need to own any of the segment topics. When the 
controller needs to operate on a segment (create, terminate, delete, discover 
subscriptions), it uses the **Segments admin API**, which routes the request to 
whichever broker currently owns that segment's namespace bundle. This 
decoupling means the controller can coordinate segments spread across many 
brokers without requiring co-location.
+
+```
+┌─────────────┐     ┌─────────────┐     ┌─────────────┐
+│  Broker A    │     │  Broker B    │     │  Broker C    │
+│              │     │  (controller │     │              │
+│  segment-0   │     │   leader)    │     │  segment-2   │
+│  segment-1   │     │              │     │              │
+│              │     │  DAG watch   │     │  DAG watch   │
+│  DAG watch   │     │  sessions    │     │  sessions    │
+│  sessions    │     │              │     │              │
+└─────────────┘     └─────────────┘     └─────────────┘
+                           │
+                    Split/merge ops
+                    use admin API to
+                    reach segments on
+                    Broker A and C
+```
+
+In this example, Broker B is the controller leader but owns no segments. When 
it executes a split of segment-0, it calls the Segments admin API which routes 
to Broker A (the owner of segment-0).
+
+### Components
+
+#### ScalableTopicService
+
+A per-broker singleton, created and owned by `BrokerService`. It manages the 
lifecycle of all `ScalableTopicController` instances on the broker.
+
+**Responsibilities:**
+
+- **Controller management**: maintains a map of active controllers keyed by 
topic name. Creates controllers on demand via `getOrCreateController()` and 
releases them on topic unload or broker shutdown.
+- **Topic lifecycle**: handles `createScalableTopic()` and 
`deleteScalableTopic()` admin operations, including creating/deleting the 
underlying segment topics via the Segments admin API.
+- **Delegation**: routes `splitSegment()` and `mergeSegments()` requests to 
the appropriate controller.
+- **Leader state monitoring**: listens for leader election state changes and 
re-attempts election when the current leader is lost.
+
+#### ScalableTopicController
+
+A per-topic coordinator. Only one instance across the cluster is the 
**leader** for a given scalable topic, ensured by leader election through the 
metadata store. The leader is the only instance that modifies the segment 
layout or assigns consumers.
+
+**State:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `topicName` | `TopicName` | The `topic://` name of the scalable topic |
+| `currentLayout` | `SegmentLayout` | The current immutable snapshot of the 
segment DAG |
+| `subscriptions` | `Map<String, SubscriptionCoordinator>` | Per-subscription 
consumer coordinators |
+| `leaderState` | `LeaderElectionState` | Current leader election state |
+| `leaderElection` | `LeaderElection<String>` | Metadata store leader election 
handle; value is the broker URL |
+
+**Key operations:**
+
+- `initialize()` — loads the current metadata from the store, then attempts 
leader election. The elected leader stores its broker service URL so that 
clients can discover and connect to it.
+- `splitSegment(segmentId)` — splits an active segment at its midpoint (see 
[Split Protocol](#split-protocol)).
+- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active 
segments (see [Merge Protocol](#merge-protocol)).
+- `registerConsumer(subscription, consumer)` — registers a consumer and 
returns its initial segment assignment.
+- `unregisterConsumer(subscription, consumer)` — removes a consumer and 
triggers rebalancing.
+- `getLeaderBrokerUrl()` — returns the current leader's broker URL, used by 
clients to connect to the controller.
+
+#### SegmentLayout
+
+An immutable, versioned snapshot of the segment DAG. All mutation methods 
(`splitSegment`, `mergeSegments`, `pruneSegment`) return a **new** 
`SegmentLayout` instance — the original is never modified. This ensures safe 
concurrent reads without locking.
+
+**Fields:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `long` | Monotonically increasing version; incremented on every 
layout change |
+| `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
+| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all segments 
(active + sealed) |
+| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only `ACTIVE` 
segments |
+
+**Key operations:**
+
+- `findActiveSegment(hash)` — returns the active segment whose hash range 
contains the given hash value. Used by producers for message routing.
+- `splitSegment(segmentId)` — validates the segment is active, computes the 
midpoint of its hash range, creates two child `SegmentInfo` records covering 
`[start, mid]` and `[mid+1, end]`, seals the parent, and returns a new layout 
with incremented epoch.
+- `mergeSegments(id1, id2)` — validates both segments are active and adjacent 
(the end of one equals `start - 1` of the other), creates a merged 
`SegmentInfo` covering the combined range, seals both parents, and returns a 
new layout.
+- `getLineage(segmentId)` — returns the full ancestor + descendant chain for a 
segment, used for DAG traversal during catch-up reads.
+- `toMetadata()` / `fromMetadata()` — converts to/from the persisted 
`ScalableTopicMetadata` format.
+
+#### SegmentInfo
+
+A record representing a single segment in the DAG:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `segmentId` | `long` | Unique, monotonically increasing identifier |
+| `hashRange` | `HashRange` | The `[start, end]` inclusive hash range this 
segment covers |
+| `state` | `SegmentState` | `ACTIVE` or `SEALED` |
+| `parentIds` | `List<Long>` | Parent segments (empty for root segments) |
+| `childIds` | `List<Long>` | Child segments (empty for leaf/active segments) |
+| `createdAtEpoch` | `long` | Layout epoch in which this segment was created 
(`0` for the initial segments) |
+| `sealedAtEpoch` | `long` | Layout epoch in which this segment was sealed; 
only meaningful when `state == SEALED`. `0` when the segment is `ACTIVE`; this 
sentinel is unambiguous because sealing always happens as part of a split or 
merge, which increments the epoch — so a segment can never be sealed at epoch 
`0`. |
+
+Factory methods: `SegmentInfo.active(id, range, epoch)` and 
`SegmentInfo.sealed(...)`.
+
+#### SubscriptionCoordinator
+
+Manages segment-to-consumer assignments for a single subscription within a 
scalable topic.
+
+**Assignment strategy:** round-robin. Active segments are sorted by hash range 
start; consumers are sorted by name. Segments are distributed evenly across 
consumers. When a consumer is added or removed, or when the layout changes 
(split/merge), the coordinator recomputes assignments and pushes updates to 
affected consumers.
+
+**Key operations:**
+
+- `addConsumer(consumer)` — adds a consumer, rebalances, and returns the full 
assignment map.
+- `removeConsumer(consumer)` — removes a consumer and redistributes its 
segments.
+- `onLayoutChange(newLayout)` — called after a split or merge; recomputes all 
assignments against the new set of active segments.
+
+#### ConsumerRegistration and ConsumerAssignment
+
+`ConsumerRegistration` is a record identifying a connected consumer:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `consumerId` | `long` | Protocol-level consumer ID |
+| `consumerName` | `String` | Stable name chosen by the client |
+| `cnx` | `TransportCnx` | The connection to push assignment updates |
+
+`ConsumerAssignment` is the assignment sent to a consumer:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `layoutEpoch` | `long` | The layout epoch this assignment is based on |
+| `assignedSegments` | `List<AssignedSegment>` | The segments assigned to this 
consumer |
+
+Each `AssignedSegment` contains: `segmentId`, `hashRange`, and 
`underlyingTopicName` (the `segment://` topic name the consumer should connect 
to).
+
+### Metadata Store Schema
+
+Scalable topic metadata is stored in the metadata store under a well-known 
path structure:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}
+```
+
+The value at this path is a JSON-serialized `ScalableTopicMetadata`:
+
+```json
+{
+  "epoch": 3,
+  "nextSegmentId": 5,
+  "segments": {
+    "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, "state": 
"SEALED", "parentIds": [], "childIds": [1, 2], "createdAtEpoch": 0, 
"sealedAtEpoch": 1 },
+    "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, "state": 
"SEALED", "parentIds": [0], "childIds": [3, 4], "createdAtEpoch": 1, 
"sealedAtEpoch": 3 },
+    "2": { "segmentId": 2, "hashRange": {"start": 32768, "end": 65535}, 
"state": "ACTIVE", "parentIds": [0], "childIds": [], "createdAtEpoch": 1, 
"sealedAtEpoch": 0 },
+    "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, "state": 
"ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3, 
"sealedAtEpoch": 0 },
+    "4": { "segmentId": 4, "hashRange": {"start": 16384, "end": 32767}, 
"state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3, 
"sealedAtEpoch": 0 }
+  },
+  "properties": {}
+}
+```
+
+The controller leader lock is stored at:
+
+```
+/topics/{tenant}/{namespace}/{encodedTopicName}/controller
+```
+
+The value is the broker service URL of the elected leader.
+
+### DAG Watch Sessions
+
+As described in [Broker Roles](#broker-roles-and-request-routing), any broker 
can serve a DAG watch session because the segment metadata lives in the shared 
metadata store — no controller leader involvement is required. The session is 
established via the binary protocol:
+
+1. **Client sends `CommandScalableTopicLookup`** with a client-assigned 
`sessionId` and the `topic://` topic name.
+2. **Broker creates a `DagWatchSession`** that:
+   - Loads the current `ScalableTopicMetadata` from the store.
+   - Resolves which broker owns each active segment's `segment://` topic (via 
topic lookup).
+   - Reads the controller broker URL from the leader lock path.
+   - Registers a metadata store notification listener for changes to the 
topic's metadata path.
+3. **Broker sends `CommandScalableTopicUpdate`** with the initial 
`ScalableTopicDAG` containing the full segment DAG, broker addresses, and 
controller URL.
+4. **On metadata change**, the notification listener fires. The broker reloads 
metadata, re-resolves broker addresses, and pushes an updated 
`CommandScalableTopicUpdate` to the client.
+5. **Client sends `CommandScalableTopicClose`** to tear down the session.
+
+The `ScalableTopicDAG` protocol message contains:
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `uint64` | Layout epoch |
+| `segments` | `repeated SegmentInfoProto` | Full segment DAG |
+| `segment_brokers` | `repeated SegmentBrokerAddress` | Broker URL for each 
active segment |
+| `controller_broker_url` | `string` | Controller leader's broker URL |
+| `controller_broker_url_tls` | `string` | TLS variant |
+
+---
+
+## Split Protocol
+
+Splitting a segment is the core scaling-out operation. The protocol is 
carefully ordered to guarantee that **no messages are lost** and **all 
subscription cursors exist before producers are redirected**.
+
+### Step-by-step
+
+```
+           Controller                   Metadata Store          Broker(s)
+               │                             │                     │
+  1. Discover  │──── getSubscriptions ───────>│                     │
+  subscriptions│<─── [sub-A, sub-B] ─────────│                     │
+               │                             │                     │
+  2. Create    │──── createSegment(child1, ──>│ ─── route ────────> │
+  children     │     [sub-A, sub-B])          │                     │ create 
topic
+               │──── createSegment(child2, ──>│ ─── route ────────> │ + cursors
+               │     [sub-A, sub-B])          │                     │
+               │                             │                     │
+  3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
+  parent       │     (parent)                 │                     │ seal ML
+               │                             │                     │
+  4. Update    │──── CAS update ─────────────>│                     │
+  metadata     │     (atomic)                 │                     │
+               │                             │                     │
+  5. Notify    │──── push to consumers ──────────────────────────> │
+  consumers    │     (rebalance)              │                     │
+```
+
+**Step 1: Discover subscriptions.** The controller queries the parent segment 
topic for its list of subscriptions. It first checks locally (if the segment is 
on this broker), falling back to the admin API for remote segments.
+
+**Step 2: Create child segment topics.** Two new `segment://` topics are 
created via the Segments admin API. Each creation request includes the list of 
subscriptions discovered in step 1. The Segments API handler creates the 
persistent topic and initializes subscription cursors at the `Earliest` 
position. The admin API routes to whichever broker owns the namespace bundle 
for each segment, so child segments may be created on different brokers.
+
+**Step 3: Terminate parent segment.** The parent segment topic is terminated 
via the Segments admin API, which writes a termination marker to the managed 
ledger. After termination, producers writing to the parent receive 
`TopicTerminated` and must re-route to child segments.
+
+**Step 4: Atomic metadata update.** The controller issues a compare-and-swap 
(CAS) update to the metadata store. The update transitions the parent to 
`SEALED` state with child pointers, and adds the two new `ACTIVE` child 
segments. The CAS guarantees atomicity — if another operation modified the 
metadata concurrently, the update fails and must be retried.
+
+**Step 5: Notify consumers.** All `SubscriptionCoordinator` instances are 
notified of the layout change. They recompute segment-to-consumer assignments 
and push updates to connected consumers.
+
+### Why this ordering matters
+
+- Steps 1-2 **before** step 4: if we updated metadata first, clients would 
discover the new segments immediately. Producers would start writing to child 
segments that have no subscription cursors yet, causing those messages to be 
missed by consumers.
+- Step 3 **before** step 4: terminating the parent before the metadata update 
ensures that by the time clients see the new layout, the parent is already 
sealed. There is no window where both parent and children are writable.
+- Step 2 creates cursors at `Earliest`: this is safe because the child topics 
are brand new (empty). The cursor starts at the beginning, and the first 
message written by a redirected producer will be the first message consumed.
+
+---
+
+## Merge Protocol
+
+Merging two adjacent segments is the core scaling-in operation. The protocol 
follows the same ordering principle: **create first, update metadata last**.
+
+### Step-by-step
+
+**Step 1: Discover subscriptions.** The controller queries both parent 
segments for their subscriptions and takes the union, so no subscription is 
lost.
+
+**Step 2: Create merged segment topic.** A single new `segment://` topic is 
created with all discovered subscriptions.
+
+**Step 3: Terminate both parent segments.** Both parents are terminated via 
the admin API. This is done sequentially to avoid a race where producers of one 
parent are still writing while the other is already sealed.
+
+**Step 4: Atomic metadata update.** The CAS update seals both parents with 
child pointers to the merged segment, and adds the new `ACTIVE` merged segment.
+
+**Step 5: Notify consumers.** Same as split — rebalance and push.
+
+### Cross-broker coordination
+
+Unlike splits, merges inherently involve multiple brokers because the two 
parent segments may be owned by different brokers (assigned by the load 
manager). The controller leader handles this transparently: all segment 
operations (create, terminate, delete) go through the Segments admin API, which 
routes each request to the broker that currently owns the target segment's 
namespace bundle. The controller does not need to know or care which broker 
owns which segment — the admin API routing handles it.
+
+---
+
+## Segment Topic Management via Admin API
+
+Segment topics are managed exclusively through a dedicated REST API at 
`/admin/v2/segments`. This is a deliberate design choice: segment topics use 
the `segment://` domain and must not be confused with regular `persistent://` 
topics.
+
+### Endpoints
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `PUT` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Create a segment 
topic. Optional request body with subscription names to pre-create. |
+| `POST` | `/{tenant}/{namespace}/{topic}/{descriptor}/terminate` | Terminate 
(seal) a segment topic. |
+| `DELETE` | `/{tenant}/{namespace}/{topic}/{descriptor}` | Delete a segment 
topic. |

Review Comment:
   are these APIs intended to be used by other external client than other 
brokers? if external clients aren't supposed to use these APIs, we should 
consider creating an internal API endpoint on a separate port which would be 
internal only to the cluster.
   When the internal API is separated, it wouldn't end up in the public OpenAPI 
specs and there wouldn't be a risk that it gets misused.



##########
pip/pip-468.md:
##########
@@ -0,0 +1,615 @@
+# PIP-468: Scalable Topic Controller
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Motivation
+
+[PIP-460](pip-460.md) introduces scalable topics as a new topic type in 
Pulsar, built on a DAG of range segments that can be split and merged to scale 
independently of the initial configuration. PIP-460 defines the overall vision 
but defers the detailed design of each component to dedicated sub-PIPs.
+
+This PIP specifies the **Scalable Topic Controller** — the broker-side 
component responsible for:
+
+1. **Segment lifecycle management** — creating, terminating, and deleting the 
underlying segment topics that back a scalable topic.
+2. **Segment layout coordination** — executing split and merge operations with 
correct ordering guarantees so that no messages are lost and all subscription 
cursors exist before producers are redirected.
+3. **Consumer assignment** — coordinating which consumers receive messages 
from which segments for a given subscription.
+4. **Leader election** — ensuring exactly one broker acts as the controller 
for each scalable topic, with automatic failover.
+5. **DAG watch sessions** — pushing topology updates to connected clients 
(producers and consumers) when the segment layout changes.
+
+Without a well-defined controller, split and merge operations would be unsafe: 
producers could write to segments that have no subscription cursors, consumers 
could miss messages during topology changes, and concurrent layout 
modifications could corrupt the segment DAG.
+
+---
+
+## Design
+
+### Architecture Overview
+
+The controller subsystem consists of four layers:
+
+```
+┌──────────────────────────────────────────────────────────┐
+│                    Admin API Layer                        │
+│         ScalableTopics REST  +  Segments REST             │
+└──────────────┬───────────────────────┬───────────────────┘
+               │                       │
+┌──────────────▼───────────────────────▼───────────────────┐
+│                  ScalableTopicService                     │
+│         Per-broker singleton; manages controllers         │
+└──────────────────────────┬───────────────────────────────┘
+                           │
+┌──────────────────────────▼───────────────────────────────┐
+│              ScalableTopicController (per topic)          │
+│   Leader-elected; owns layout, split/merge, consumers     │
+│                                                           │
+│   ┌─────────────────┐  ┌───────────────────────────────┐ │
+│   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
+│   │  (immutable DAG) │  │  per-subscription assignments │ │
+│   └─────────────────┘  └───────────────────────────────┘ │
+└──────────────────────────┬───────────────────────────────┘
+                           │
+┌──────────────────────────▼───────────────────────────────┐
+│              ScalableTopicResources                       │
+│         Metadata store access (read/write/watch)          │
+└──────────────────────────────────────────────────────────┘
+```
+
+### Broker Roles and Request Routing
+
+A scalable topic involves three distinct broker roles. Understanding which 
broker handles which operation is critical to the design.
+
+#### Any broker — DAG watch sessions
+
+A **DAG watch session** (scalable topic lookup) can be served by **any 
broker** in the cluster. The session reads segment metadata from the shared 
metadata store and registers a watch for changes. No state is held on the 
broker beyond the watch registration. This means producers and consumers can 
connect to any broker for topic discovery — the same way regular topic lookups 
work today.
+
+When the metadata changes (due to a split or merge performed by the controller 
leader), the metadata store notification fires on whichever broker is serving 
the watch session, and that broker pushes the updated DAG to the client.
+
+#### Controller leader — layout mutations and consumer assignment
+
+Each scalable topic has exactly one **controller leader** across the cluster, 
elected via the metadata store. The controller leader is the only broker that 
can:
+
+- Execute **split** and **merge** operations (the multi-step protocols 
described below).
+- Accept **consumer registrations** and compute segment-to-consumer 
assignments.
+- **Notify consumers** of assignment changes after topology updates.
+
+Clients discover the controller leader's broker URL through the DAG watch 
session response (`controller_broker_url` field). StreamConsumer and 
CheckpointConsumer clients then connect directly to the controller leader to 
register and receive assignments.
+
+Admin API requests for split and merge are routed to the 
`ScalableTopicService` on any broker, which obtains (or creates) a local 
`ScalableTopicController` that participates in leader election. If the local 
controller is the leader, it executes the operation directly. If not, the 
request must be redirected to the leader (or the controller must first win the 
election). The split and merge admin endpoints do not require the request to 
land on the leader broker — the `ScalableTopicService` handles this 
transparently.
+
+#### Segment-owning brokers — produce and consume
+
+Individual **segment topics** (`segment://`) are regular persistent topics 
from the broker's perspective. They are distributed across the cluster by the 
standard Pulsar **load manager** and namespace bundle assignment — the same 
mechanism used for `persistent://` topics. Each segment topic is owned by 
exactly one broker at any time, determined by its namespace bundle hash.
+
+The controller leader does not need to own any of the segment topics. When the 
controller needs to operate on a segment (create, terminate, delete, discover 
subscriptions), it uses the **Segments admin API**, which routes the request to 
whichever broker currently owns that segment's namespace bundle. This 
decoupling means the controller can coordinate segments spread across many 
brokers without requiring co-location.
+
+```
+┌─────────────┐     ┌─────────────┐     ┌─────────────┐
+│  Broker A    │     │  Broker B    │     │  Broker C    │
+│              │     │  (controller │     │              │
+│  segment-0   │     │   leader)    │     │  segment-2   │
+│  segment-1   │     │              │     │              │
+│              │     │  DAG watch   │     │  DAG watch   │
+│  DAG watch   │     │  sessions    │     │  sessions    │
+│  sessions    │     │              │     │              │
+└─────────────┘     └─────────────┘     └─────────────┘
+                           │
+                    Split/merge ops
+                    use admin API to
+                    reach segments on
+                    Broker A and C
+```
+
+In this example, Broker B is the controller leader but owns no segments. When 
it executes a split of segment-0, it calls the Segments admin API which routes 
to Broker A (the owner of segment-0).
+
+### Components
+
+#### ScalableTopicService
+
+A per-broker singleton, created and owned by `BrokerService`. It manages the 
lifecycle of all `ScalableTopicController` instances on the broker.
+
+**Responsibilities:**
+
+- **Controller management**: maintains a map of active controllers keyed by 
topic name. Creates controllers on demand via `getOrCreateController()` and 
releases them on topic unload or broker shutdown.
+- **Topic lifecycle**: handles `createScalableTopic()` and 
`deleteScalableTopic()` admin operations, including creating/deleting the 
underlying segment topics via the Segments admin API.
+- **Delegation**: routes `splitSegment()` and `mergeSegments()` requests to 
the appropriate controller.
+- **Leader state monitoring**: listens for leader election state changes and 
re-attempts election when the current leader is lost.
+
+#### ScalableTopicController
+
+A per-topic coordinator. Only one instance across the cluster is the 
**leader** for a given scalable topic, ensured by leader election through the 
metadata store. The leader is the only instance that modifies the segment 
layout or assigns consumers.
+
+**State:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `topicName` | `TopicName` | The `topic://` name of the scalable topic |
+| `currentLayout` | `SegmentLayout` | The current immutable snapshot of the 
segment DAG |
+| `subscriptions` | `Map<String, SubscriptionCoordinator>` | Per-subscription 
consumer coordinators |
+| `leaderState` | `LeaderElectionState` | Current leader election state |
+| `leaderElection` | `LeaderElection<String>` | Metadata store leader election 
handle; value is the broker URL |
+
+**Key operations:**
+
+- `initialize()` — loads the current metadata from the store, then attempts 
leader election. The elected leader stores its broker service URL so that 
clients can discover and connect to it.
+- `splitSegment(segmentId)` — splits an active segment at its midpoint (see 
[Split Protocol](#split-protocol)).
+- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active 
segments (see [Merge Protocol](#merge-protocol)).
+- `registerConsumer(subscription, consumer)` — registers a consumer and 
returns its initial segment assignment.
+- `unregisterConsumer(subscription, consumer)` — removes a consumer and 
triggers rebalancing.
+- `getLeaderBrokerUrl()` — returns the current leader's broker URL, used by 
clients to connect to the controller.
+
+#### SegmentLayout
+
+An immutable, versioned snapshot of the segment DAG. All mutation methods 
(`splitSegment`, `mergeSegments`, `pruneSegment`) return a **new** 
`SegmentLayout` instance — the original is never modified. This ensures safe 
concurrent reads without locking.
+
+**Fields:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `long` | Monotonically increasing version; incremented on every 
layout change |
+| `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
+| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all segments 
(active + sealed) |
+| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only `ACTIVE` 
segments |
+
+**Key operations:**
+
+- `findActiveSegment(hash)` — returns the active segment whose hash range 
contains the given hash value. Used by producers for message routing.
+- `splitSegment(segmentId)` — validates the segment is active, computes the 
midpoint of its hash range, creates two child `SegmentInfo` records covering 
`[start, mid]` and `[mid+1, end]`, seals the parent, and returns a new layout 
with incremented epoch.
+- `mergeSegments(id1, id2)` — validates both segments are active and adjacent 
(the end of one equals `start - 1` of the other), creates a merged 
`SegmentInfo` covering the combined range, seals both parents, and returns a 
new layout.
+- `getLineage(segmentId)` — returns the full ancestor + descendant chain for a 
segment, used for DAG traversal during catch-up reads.

Review Comment:
   In a catch up read, there's a need to take the subscription into account. 
Where would this be handled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to