lhotari commented on code in PR #25516: URL: https://github.com/apache/pulsar/pull/25516#discussion_r3112291568
########## pip/pip-468.md: ########## @@ -0,0 +1,615 @@ +# PIP-468: Scalable Topic Controller + +*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)* + +## Motivation + +[PIP-460](pip-460.md) introduces scalable topics as a new topic type in Pulsar, built on a DAG of range segments that can be split and merged to scale independently of the initial configuration. PIP-460 defines the overall vision but defers the detailed design of each component to dedicated sub-PIPs. + +This PIP specifies the **Scalable Topic Controller** — the broker-side component responsible for: + +1. **Segment lifecycle management** — creating, terminating, and deleting the underlying segment topics that back a scalable topic. +2. **Segment layout coordination** — executing split and merge operations with correct ordering guarantees so that no messages are lost and all subscription cursors exist before producers are redirected. +3. **Consumer assignment** — coordinating which consumers receive messages from which segments for a given subscription. +4. **Leader election** — ensuring exactly one broker acts as the controller for each scalable topic, with automatic failover. +5. **DAG watch sessions** — pushing topology updates to connected clients (producers and consumers) when the segment layout changes. + +Without a well-defined controller, split and merge operations would be unsafe: producers could write to segments that have no subscription cursors, consumers could miss messages during topology changes, and concurrent layout modifications could corrupt the segment DAG. + +--- + +## Design + +### Architecture Overview + +The controller subsystem consists of four layers: + +``` +┌──────────────────────────────────────────────────────────┐ +│ Admin API Layer │ +│ ScalableTopics REST + Segments REST │ +└──────────────┬───────────────────────┬───────────────────┘ + │ │ +┌──────────────▼───────────────────────▼───────────────────┐ +│ ScalableTopicService │ +│ Per-broker singleton; manages controllers │ +└──────────────────────────┬───────────────────────────────┘ + │ +┌──────────────────────────▼───────────────────────────────┐ +│ ScalableTopicController (per topic) │ +│ Leader-elected; owns layout, split/merge, consumers │ +│ │ +│ ┌─────────────────┐ ┌───────────────────────────────┐ │ +│ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │ +│ │ (immutable DAG) │ │ per-subscription assignments │ │ +│ └─────────────────┘ └───────────────────────────────┘ │ +└──────────────────────────┬───────────────────────────────┘ + │ +┌──────────────────────────▼───────────────────────────────┐ +│ ScalableTopicResources │ +│ Metadata store access (read/write/watch) │ +└──────────────────────────────────────────────────────────┘ +``` + +### Broker Roles and Request Routing + +A scalable topic involves three distinct broker roles. Understanding which broker handles which operation is critical to the design. + +#### Any broker — DAG watch sessions + +A **DAG watch session** (scalable topic lookup) can be served by **any broker** in the cluster. The session reads segment metadata from the shared metadata store and registers a watch for changes. No state is held on the broker beyond the watch registration. This means producers and consumers can connect to any broker for topic discovery — the same way regular topic lookups work today. + +When the metadata changes (due to a split or merge performed by the controller leader), the metadata store notification fires on whichever broker is serving the watch session, and that broker pushes the updated DAG to the client. + +#### Controller leader — layout mutations and consumer assignment + +Each scalable topic has exactly one **controller leader** across the cluster, elected via the metadata store. The controller leader is the only broker that can: + +- Execute **split** and **merge** operations (the multi-step protocols described below). +- Accept **consumer registrations** and compute segment-to-consumer assignments. +- **Notify consumers** of assignment changes after topology updates. + +Clients discover the controller leader's broker URL through the DAG watch session response (`controller_broker_url` field). StreamConsumer and CheckpointConsumer clients then connect directly to the controller leader to register and receive assignments. + +Admin API requests for split and merge are routed to the `ScalableTopicService` on any broker, which obtains (or creates) a local `ScalableTopicController` that participates in leader election. If the local controller is the leader, it executes the operation directly. If not, the request must be redirected to the leader (or the controller must first win the election). The split and merge admin endpoints do not require the request to land on the leader broker — the `ScalableTopicService` handles this transparently. + +#### Segment-owning brokers — produce and consume + +Individual **segment topics** (`segment://`) are regular persistent topics from the broker's perspective. They are distributed across the cluster by the standard Pulsar **load manager** and namespace bundle assignment — the same mechanism used for `persistent://` topics. Each segment topic is owned by exactly one broker at any time, determined by its namespace bundle hash. + +The controller leader does not need to own any of the segment topics. When the controller needs to operate on a segment (create, terminate, delete, discover subscriptions), it uses the **Segments admin API**, which routes the request to whichever broker currently owns that segment's namespace bundle. This decoupling means the controller can coordinate segments spread across many brokers without requiring co-location. + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Broker A │ │ Broker B │ │ Broker C │ +│ │ │ (controller │ │ │ +│ segment-0 │ │ leader) │ │ segment-2 │ +│ segment-1 │ │ │ │ │ +│ │ │ DAG watch │ │ DAG watch │ +│ DAG watch │ │ sessions │ │ sessions │ +│ sessions │ │ │ │ │ +└─────────────┘ └─────────────┘ └─────────────┘ + │ + Split/merge ops + use admin API to + reach segments on + Broker A and C +``` + +In this example, Broker B is the controller leader but owns no segments. When it executes a split of segment-0, it calls the Segments admin API which routes to Broker A (the owner of segment-0). + +### Components + +#### ScalableTopicService + +A per-broker singleton, created and owned by `BrokerService`. It manages the lifecycle of all `ScalableTopicController` instances on the broker. + +**Responsibilities:** + +- **Controller management**: maintains a map of active controllers keyed by topic name. Creates controllers on demand via `getOrCreateController()` and releases them on topic unload or broker shutdown. +- **Topic lifecycle**: handles `createScalableTopic()` and `deleteScalableTopic()` admin operations, including creating/deleting the underlying segment topics via the Segments admin API. +- **Delegation**: routes `splitSegment()` and `mergeSegments()` requests to the appropriate controller. +- **Leader state monitoring**: listens for leader election state changes and re-attempts election when the current leader is lost. + +#### ScalableTopicController + +A per-topic coordinator. Only one instance across the cluster is the **leader** for a given scalable topic, ensured by leader election through the metadata store. The leader is the only instance that modifies the segment layout or assigns consumers. + +**State:** + +| Field | Type | Description | +|-------|------|-------------| +| `topicName` | `TopicName` | The `topic://` name of the scalable topic | +| `currentLayout` | `SegmentLayout` | The current immutable snapshot of the segment DAG | +| `subscriptions` | `Map<String, SubscriptionCoordinator>` | Per-subscription consumer coordinators | +| `leaderState` | `LeaderElectionState` | Current leader election state | +| `leaderElection` | `LeaderElection<String>` | Metadata store leader election handle; value is the broker URL | + +**Key operations:** + +- `initialize()` — loads the current metadata from the store, then attempts leader election. The elected leader stores its broker service URL so that clients can discover and connect to it. +- `splitSegment(segmentId)` — splits an active segment at its midpoint (see [Split Protocol](#split-protocol)). +- `mergeSegments(segmentId1, segmentId2)` — merges two adjacent active segments (see [Merge Protocol](#merge-protocol)). +- `registerConsumer(subscription, consumer)` — registers a consumer and returns its initial segment assignment. +- `unregisterConsumer(subscription, consumer)` — removes a consumer and triggers rebalancing. +- `getLeaderBrokerUrl()` — returns the current leader's broker URL, used by clients to connect to the controller. + +#### SegmentLayout + +An immutable, versioned snapshot of the segment DAG. All mutation methods (`splitSegment`, `mergeSegments`, `pruneSegment`) return a **new** `SegmentLayout` instance — the original is never modified. This ensures safe concurrent reads without locking. + +**Fields:** + +| Field | Type | Description | +|-------|------|-------------| +| `epoch` | `long` | Monotonically increasing version; incremented on every layout change | +| `nextSegmentId` | `long` | Counter for assigning IDs to new segments | +| `allSegments` | `Map<Long, SegmentInfo>` | Complete DAG: all segments (active + sealed) | +| `activeSegments` | `Map<Long, SegmentInfo>` | Filtered view: only `ACTIVE` segments | + +**Key operations:** + +- `findActiveSegment(hash)` — returns the active segment whose hash range contains the given hash value. Used by producers for message routing. +- `splitSegment(segmentId)` — validates the segment is active, computes the midpoint of its hash range, creates two child `SegmentInfo` records covering `[start, mid]` and `[mid+1, end]`, seals the parent, and returns a new layout with incremented epoch. +- `mergeSegments(id1, id2)` — validates both segments are active and adjacent (the end of one equals `start - 1` of the other), creates a merged `SegmentInfo` covering the combined range, seals both parents, and returns a new layout. +- `getLineage(segmentId)` — returns the full ancestor + descendant chain for a segment, used for DAG traversal during catch-up reads. Review Comment: discussed. this is covered in future pips. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
