lhotari commented on code in PR #25516:
URL: https://github.com/apache/pulsar/pull/25516#discussion_r3112291568


##########
pip/pip-468.md:
##########
@@ -0,0 +1,615 @@
+# PIP-468: Scalable Topic Controller
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Motivation
+
+[PIP-460](pip-460.md) introduces scalable topics as a new topic type in 
Pulsar, built on a DAG of range segments that can be split and merged to scale 
independently of the initial configuration. PIP-460 defines the overall vision 
but defers the detailed design of each component to dedicated sub-PIPs.
+
+This PIP specifies the **Scalable Topic Controller** — the broker-side 
component responsible for:
+
+1. **Segment lifecycle management** — creating, terminating, and deleting the 
underlying segment topics that back a scalable topic.
+2. **Segment layout coordination** — executing split and merge operations with 
correct ordering guarantees so that no messages are lost and all subscription 
cursors exist before producers are redirected.
+3. **Consumer assignment** — coordinating which consumers receive messages 
from which segments for a given subscription.
+4. **Leader election** — ensuring exactly one broker acts as the controller 
for each scalable topic, with automatic failover.
+5. **DAG watch sessions** — pushing topology updates to connected clients 
(producers and consumers) when the segment layout changes.
+
+Without a well-defined controller, split and merge operations would be unsafe: 
producers could write to segments that have no subscription cursors, consumers 
could miss messages during topology changes, and concurrent layout 
modifications could corrupt the segment DAG.
+
+---
+
+## Design
+
+### Architecture Overview
+
+The controller subsystem consists of four layers:
+
+```
+┌──────────────────────────────────────────────────────────┐
+│                    Admin API Layer                        │
+│         ScalableTopics REST  +  Segments REST             │
+└──────────────┬───────────────────────┬───────────────────┘
+               │                       │
+┌──────────────▼───────────────────────▼───────────────────┐
+│                  ScalableTopicService                     │
+│         Per-broker singleton; manages controllers         │
+└──────────────────────────┬───────────────────────────────┘
+                           │
+┌──────────────────────────▼───────────────────────────────┐
+│              ScalableTopicController (per topic)          │
+│   Leader-elected; owns layout, split/merge, consumers     │
+│                                                           │
+│   ┌─────────────────┐  ┌───────────────────────────────┐ │
+│   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
+│   │  (immutable DAG) │  │  per-subscription assignments │ │
+│   └─────────────────┘  └───────────────────────────────┘ │
+└──────────────────────────┬───────────────────────────────┘
+                           │
+┌──────────────────────────▼───────────────────────────────┐
+│              ScalableTopicResources                       │
+│         Metadata store access (read/write/watch)          │
+└──────────────────────────────────────────────────────────┘
+```
+
+### Broker Roles and Request Routing
+
+A scalable topic involves three distinct broker roles. Understanding which 
broker handles which operation is critical to the design.
+
+#### Any broker — DAG watch sessions
+
+A **DAG watch session** (scalable topic lookup) can be served by **any 
broker** in the cluster. The session reads segment metadata from the shared 
metadata store and registers a watch for changes. No state is held on the 
broker beyond the watch registration. This means producers and consumers can 
connect to any broker for topic discovery — the same way regular topic lookups 
work today.
+
+When the metadata changes (due to a split or merge performed by the controller 
leader), the metadata store notification fires on whichever broker is serving 
the watch session, and that broker pushes the updated DAG to the client.
+
+#### Controller leader — layout mutations and consumer assignment
+
+Each scalable topic has exactly one **controller leader** across the cluster, 
elected via the metadata store. The controller leader is the only broker that 
can:
+
+- Execute **split** and **merge** operations (the multi-step protocols 
described below).
+- Accept **consumer registrations** and compute segment-to-consumer 
assignments.
+- **Notify consumers** of assignment changes after topology updates.
+
+Clients discover the controller leader's broker URL through the DAG watch 
session response (`controller_broker_url` field). StreamConsumer and 
CheckpointConsumer clients then connect directly to the controller leader to 
register and receive assignments.
+
+Admin API requests for split and merge are routed to the 
`ScalableTopicService` on any broker, which obtains (or creates) a local 
`ScalableTopicController` that participates in leader election. If the local 
controller is the leader, it executes the operation directly. If not, the 
request must be redirected to the leader (or the controller must first win the 
election). The split and merge admin endpoints do not require the request to 
land on the leader broker — the `ScalableTopicService` handles this 
transparently.
+
+#### Segment-owning brokers — produce and consume
+
+Individual **segment topics** (`segment://`) are regular persistent topics 
from the broker's perspective. They are distributed across the cluster by the 
standard Pulsar **load manager** and namespace bundle assignment — the same 
mechanism used for `persistent://` topics. Each segment topic is owned by 
exactly one broker at any time, determined by its namespace bundle hash.
+
+The controller leader does not need to own any of the segment topics. When the 
controller needs to operate on a segment (create, terminate, delete, discover 
subscriptions), it uses the **Segments admin API**, which routes the request to 
whichever broker currently owns that segment's namespace bundle. This 
decoupling means the controller can coordinate segments spread across many 
brokers without requiring co-location.
+
+```
+┌─────────────┐     ┌─────────────┐     ┌─────────────┐
+│  Broker A    │     │  Broker B    │     │  Broker C    │
+│              │     │  (controller │     │              │
+│  segment-0   │     │   leader)    │     │  segment-2   │
+│  segment-1   │     │              │     │              │
+│              │     │  DAG watch   │     │  DAG watch   │
+│  DAG watch   │     │  sessions    │     │  sessions    │
+│  sessions    │     │              │     │              │
+└─────────────┘     └─────────────┘     └─────────────┘
+                           │
+                    Split/merge ops
+                    use admin API to
+                    reach segments on
+                    Broker A and C
+```
+
+In this example, Broker B is the controller leader but owns no segments. When 
it executes a split of segment-0, it calls the Segments admin API which routes 
to Broker A (the owner of segment-0).
+
+### Components
+
+#### ScalableTopicService
+
+A per-broker singleton, created and owned by `BrokerService`. It manages the 
lifecycle of all `ScalableTopicController` instances on the broker.
+
+**Responsibilities:**
+
+- **Controller management**: maintains a map of active controllers keyed by 
topic name. Creates controllers on demand via `getOrCreateController()` and 
releases them on topic unload or broker shutdown.
+- **Topic lifecycle**: handles `createScalableTopic()` and 
`deleteScalableTopic()` admin operations, including creating/deleting the 
underlying segment topics via the Segments admin API.
+- **Delegation**: routes `splitSegment()` and `mergeSegments()` requests to 
the appropriate controller.
+- **Leader state monitoring**: listens for leader election state changes and 
re-attempts election when the current leader is lost.
+
+#### ScalableTopicController
+
+A per-topic coordinator. Only one instance across the cluster is the 
**leader** for a given scalable topic, ensured by leader election through the 
metadata store. The leader is the only instance that modifies the segment 
layout or assigns consumers.
+
+**State:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `topicName` | `TopicName` | The `topic://` name of the scalable topic |
+| `currentLayout` | `SegmentLayout` | The current immutable snapshot of the 
segment DAG |
+| `subscriptions` | `Map<String, SubscriptionCoordinator>` | Per-subscription 
consumer coordinators |
+| `leaderState` | `LeaderElectionState` | Current leader election state |
+| `leaderElection` | `LeaderElection<String>` | Metadata store leader election 
handle; value is the broker URL |
+
+**Key operations:**
+
+- `initialize()` — loads the current metadata from the store, then attempts 
leader election. The elected leader stores its broker service URL so that 
clients can discover and connect to it.
+- `splitSegment(segmentId)` — splits an active segment at its midpoint (see 
[Split Protocol](#split-protocol)).
+- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active 
segments (see [Merge Protocol](#merge-protocol)).
+- `registerConsumer(subscription, consumer)` — registers a consumer and 
returns its initial segment assignment.
+- `unregisterConsumer(subscription, consumer)` — removes a consumer and 
triggers rebalancing.
+- `getLeaderBrokerUrl()` — returns the current leader's broker URL, used by 
clients to connect to the controller.
+
+#### SegmentLayout
+
+An immutable, versioned snapshot of the segment DAG. All mutation methods 
(`splitSegment`, `mergeSegments`, `pruneSegment`) return a **new** 
`SegmentLayout` instance — the original is never modified. This ensures safe 
concurrent reads without locking.
+
+**Fields:**
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `epoch` | `long` | Monotonically increasing version; incremented on every 
layout change |
+| `nextSegmentId` | `long` | Counter for assigning IDs to new segments |
+| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all segments 
(active + sealed) |
+| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only `ACTIVE` 
segments |
+
+**Key operations:**
+
+- `findActiveSegment(hash)` — returns the active segment whose hash range 
contains the given hash value. Used by producers for message routing.
+- `splitSegment(segmentId)` — validates the segment is active, computes the 
midpoint of its hash range, creates two child `SegmentInfo` records covering 
`[start, mid]` and `[mid+1, end]`, seals the parent, and returns a new layout 
with incremented epoch.
+- `mergeSegments(id1, id2)` — validates both segments are active and adjacent 
(the end of one equals `start - 1` of the other), creates a merged 
`SegmentInfo` covering the combined range, seals both parents, and returns a 
new layout.
+- `getLineage(segmentId)` — returns the full ancestor + descendant chain for a 
segment, used for DAG traversal during catch-up reads.

Review Comment:
   discussed. this is covered in future pips.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to