This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 18653513c50 [feat][pip] PIP-460: Scalable Topics (Topics v5) (#25315)
18653513c50 is described below

commit 18653513c50443f483b1e502c3965ee0c222e3fd
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 1 16:48:28 2026 +0300

    [feat][pip] PIP-460: Scalable Topics (Topics v5) (#25315)
---
 pip/pip-460.md | 426 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 426 insertions(+)

diff --git a/pip/pip-460.md b/pip/pip-460.md
new file mode 100644
index 00000000000..2dbb25af74f
--- /dev/null
+++ b/pip/pip-460.md
@@ -0,0 +1,426 @@
+# PIP-460: Scalable Topics (Topics v5)
+## Motivation
+
+Pulsar's partitioned topic model has three fundamental limitations that make 
it unsuitable as the sole topic type for applications requiring elastic, 
transparent scaling.
+
+### Key ordering breaks when partition count changes
+
+Many applications rely on key-based message ordering: all messages for the 
same key must be processed in sequence, guaranteeing correctness for 
deduplication, session aggregation, stateful stream processing, and similar use 
cases. Key-to-partition routing uses modulo hashing: `hash(key) % 
numPartitions`. When the partition count changes — say from 4 to 8 — the modulo 
mapping changes completely. A key previously routed to partition 2 may now hash 
to partition 6. Messages produced after th [...]
+
+### Partitions can never decrease
+
+Pulsar does not support reducing the number of partitions. Once a topic is 
scaled up to 64 partitions, it remains at 64 partitions permanently — even if 
traffic drops to a level where 4 partitions would suffice. Decreasing 
partitions would require re-routing keys (breaking ordering), migrating 
unconsumed messages from removed partitions, and updating every active 
consumer's assignment. No safe mechanism exists for this today. The result is 
**partition count drift**: topics accumulate mor [...]
+
+### The cost of getting the partition count wrong
+
+Because partitions cannot shrink and growing them breaks ordering, operators 
must predict the right partition count at topic creation time. 
Over-provisioning wastes resources; under-provisioning leads to write-hot 
partitions that eventually require re-creating the topic with a higher 
partition count. Neither outcome is acceptable for a system that aims to be 
operationally simple at scale.
+
+---
+
+## Background Knowledge
+
+### Partitioned Topics Today
+
+Apache Pulsar scales topics by pre-configuring a fixed number of 
**partitions**. Each partition is an independent persistent topic backed by a 
single managed ledger on a single broker. A managed ledger is Pulsar's 
abstraction over Apache BookKeeper, providing an append-only log with 
cursor-based consumption tracking.
+
+For key-ordered messaging, producers route messages using `hash(key) % 
numPartitions`, guaranteeing that all messages with the same key land on the 
same partition and are consumed in order. Consumer groups are partition-aware: 
each consumer in a group is assigned one or more partitions, and the assignment 
is stable as long as the partition count does not change.
+
+### The Managed Ledger Layer
+
+A managed ledger is a sequence of BookKeeper ledgers forming a single logical 
stream. Subscriptions track their position in this stream using cursors. The 
managed ledger layer handles ledger rollover (sealing a full ledger and opening 
a new one), cursor advancement, and interaction with BookKeeper for durable 
storage.
+
+Pulsar's existing topic termination feature allows a managed ledger to be 
permanently sealed — a capability that is relevant to scalable topics when a 
range segment is split or merged.
+
+### Relevant Existing PIPs
+
+- **[PIP-379](pip-379.md)**: Key_Shared Draining Hashes — improves per-key 
ordering guarantees during consumer changes. The "draining hashes" concept and 
its mechanisms are directly relevant to scalable topics, where a 
Key_Shared-style dispatch mode is used to fan out consumption within a single 
range segment.
+- **[PIP-335](pip-335.md)**: Oxia Metadata Store — Oxia is the preferred 
metadata store backend for Pulsar 5.0 and is particularly relevant for scalable 
topics because it supports streaming watch sessions, which the scalable topics 
lookup mechanism is designed to use.
+
+---
+
+## Goals
+
+### In Scope
+
+- Introduce **Scalable Topics** as a new, distinct topic type in Pulsar 
(referred to as Topics v5 in the codebase).
+- Support **transparent range splitting** — increasing the number of parallel 
write and read streams without disrupting in-flight messages or breaking key 
ordering.
+- Support **range merging** — reducing the number of parallel streams when 
traffic decreases, recovering resources without requiring a topic re-creation.
+- Preserve **key ordering guarantees** across splits and merges through 
range-based routing (replacing modulo hashing with range-hash assignment).
+- Support **Key_Shared-style dispatch** within a single range segment, 
allowing multiple consumers to share a segment while preserving key-ordered 
delivery.
+- Introduce a **new client API** designed from the ground up for scalable 
topics, with separate type-safe interfaces for each consumer type.
+- Expose a **persistent, push-based lookup session** so clients receive 
topology updates without polling.
+- Introduce a **consumer controller** with leader election and persistent 
assignment state for coordinated, stable segment assignment.
+- Improve the **consumer model** to reduce out-of-order and duplicate message 
processing in clients by introducing a persistent lease and consumer session 
model.
+- Improve the **producer protocol** to support application-level flow control, 
removing the dependency on TCP/IP flow control.
+- Change the **message entry format** to support scalable topics and future 
use cases.
+- Define a **phased delivery path** (Pulsar 4.x → 5.0 LTS) that allows 
incremental delivery and community feedback before the LTS commitment.
+- This PIP defines the overall design direction. High-level architecture and 
detailed design for each subproblem is covered by dedicated sub-PIPs (see 
[Sub-PIPs](#sub-pips)).
+
+### Postponed after Pulsar 5.0 release
+
+In some of the above areas, the core design must accommodate features that may 
be deferred beyond the Pulsar 5.0 LTS timeline so that the implementation can 
be extended later without a redesign. Potentially deferred features include:
+
+- Full Geo-replication support including Replicated Subscriptions
+- Transaction support
+- Stream processing connector implementations (Flink, Beam)
+- Tooling for migrating existing partitioned topics to scalable topics
+- Any in-scope features that are lower priority and cannot be completed before 
the Pulsar 5.0 LTS release due to time constraints.
+
+---
+
+## Prior Art
+
+### Northguard (LinkedIn)
+
+Northguard introduces a **range-based** data model. A topic is a named 
collection of ranges that together cover the full keyspace. Each range is a 
sequence of segments (the unit of replication), where each segment is assigned 
to a different set of brokers — a technique called **log striping** that keeps 
the cluster balanced by design.
+
+Scaling is achieved through **range splitting and merging** using a **buddy 
algorithm**: a range can be split into exactly two child ranges, and only two 
ranges previously split from the same parent can be merged back. This provides 
clean ordering guarantees — all records in a parent range happen-before records 
in any child range, and all records in sibling ranges happen-before records in 
their merged parent. It also naturally aligns keyspaces across different 
topics, eliminating the nee [...]
+
+Unlike indexed partitions, which require a global synchronization barrier when 
the partition count changes, range splits only interrupt producers writing to 
the specific range being split.
+
+### Pravega
+
+Pravega uses a similar segment-based model with splitting and merging. 
Neighboring keyspace ranges can be merged, which is slightly more flexible than 
Northguard's strict buddy constraint. Pravega's consumer-side abstractions 
include:
+
+- **Reader Groups** that balance segments across readers (consumers).
+- **Reader scaling** where applications (or Flink integration) can dynamically 
adjust the number of reader tasks.
+- **Checkpoints** that store the state of all active positions across a reader 
group.
+- **Transactions** for atomic writes across segments.
+
+Pravega's Flink connector experience highlights a key challenge: when range 
topology changes, the stream processing connector must handle checkpoint 
restoration across segment boundaries and coordinate watermarks correctly 
across a dynamic number of streams. For key-ordered processing, Pravega routes 
events to segments by routing key, ensuring all events for a given key are read 
by the same parallel Flink instance. However, stateful downstream operators 
that require key-based ordering st [...]
+
+---
+
+## Preliminary Design Direction
+
+> **Note:** This section does not contain the high-level design for Scalable 
Topics. The full high-level design — covering functionality, capabilities and 
quality characteristics, concepts, components, interactions, contracts, and 
related design decisions — will be defined in a dedicated sub-PIP once PIP-460 
has received high-level acceptance from the Pulsar community to proceed with 
planning.
+>
+> PIP-460 is intentionally a vision and problem statement document. Its 
purpose is to establish the motivation, goals, and key challenges that need to 
be resolved and to seek community alignment in the direction before detailed 
design work begins.
+
+The sections below sketch some preliminary design directions to illustrate the 
problem space. These are subject to change as the detailed design evolves in 
sub-PIPs.
+
+### Design Principles
+
+1. **Works on existing Pulsar architecture.** Scalable topics layer on top of 
Pulsar's existing broker, managed ledger, and BookKeeper stack rather than 
replacing them. The range segment maps directly to one managed ledger.
+2. **Simplicity for users.** The user mental model — a single topic name that 
scales transparently — should be simpler than partitioned topics, not more 
complex.
+3. **Long-term default.** The long-term vision is that scalable topics 
eventually become the default topic type, replacing the current partitioned and 
non-partitioned topic model.
+4. **Clean break for the client API.** Existing Pulsar clients are not 
compatible with scalable topics. This is intentional: it creates the 
opportunity to revisit the client API, revisit the binary protocol, and change 
the message entry format without carrying forward legacy constraints.
+
+### Concepts/Vocabulary (Preliminary)
+
+| Concept                           | Description                              
                                                                                
                                                                                
                                                                                
                   |
+|-----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Scalable Topic**                | A new topic type identified by the 
`topic://` URL scheme. Scales transparently through range splitting and merging 
without requiring recreation or downtime.                                       
                                                                                
                         |
+| **Range Segment**                 | The fundamental unit of a scalable 
topic. Covers a contiguous portion of the keyspace (a hash range) and is backed 
by exactly one managed ledger on one broker. Often referred to as "range" for 
brevity.                                                                        
                           |
+| **Keyspace**                      | The full space of possible key hashes. 
Each range segment is assigned a non-overlapping slice of the keyspace; 
together all active segments cover the full keyspace.                           
                                                                                
                             |
+| **Segment DAG**                   | The directed acyclic graph that records 
the full history of range segments for a topic. Parent-child edges are added 
when segments are split or merged. Encodes a strict happens-before 
relationship: all messages in a parent segment precede messages in its 
children.                                    |
+| **Split**                         | An operation that seals a range segment 
and creates two child segments, each covering a sub-portion of the parent's 
keyspace. A local operation on the owning broker.                               
                                                                                
                        |
+| **Merge**                         | An operation that seals two or more 
adjacent range segments and creates a single child segment covering their 
combined keyspace. Requires cross-broker coordination.                          
                                                                                
                              |
+| **Merge Leader**                  | The broker that coordinates a merge 
operation, responsible for sealing all involved segments and creating the 
merged child.                                                                   
                                                                                
                              |
+| **Watch Session**                 | A persistent connection between a client 
and a broker through which the broker pushes the initial topic state and 
subsequent DAG topology updates. Replaces one-shot lookup requests.             
                                                                                
                          |
+| **Consumer Controller**           | A broker elected via leader election 
that manages segment-to-consumer assignments for a subscription. Persists 
assignment state to survive broker failures.                                    
                                                                                
                             |
+| **Consumer Controller Session**   | A persistent bidirectional stream 
between a consumer and the controller broker used to push assignments and 
receive progress reports.                                                       
                                                                                
                                |
+| **Consumer Identity**             | A stable name chosen by the client that 
identifies a consumer controller session. Allows the session to be restored 
after a reconnect within the grace period.                                      
                                                                                
                        |
+| **Consumer Session Grace Period** | Acts as a persistent lease on segment 
assignments. When a consumer disconnects, its assignments are held in reserve 
for the duration of the grace period rather than immediately redistributed. If 
the consumer reconnects within this window, the lease is renewed and 
assignments are restored unchanged. |
+| **StreamConsumer**                | A consumer type registered with the 
controller that receives exclusive segment assignments and processes messages 
in key order within each segment.                                               
                                                                                
                          |
+| **QueueConsumer**                 | A consumer type that subscribes to all 
active segments with shared dispatch. No cross-segment ordering guarantees.     
                                                                                
                                                                                
                     |
+| **CheckpointConsumer**            | A consumer type registered with the 
controller that tracks read positions externally via a serializable 
`Checkpoint` snapshot. Designed for stream processing frameworks such as Flink. 
                                                                                
                                    |
+| **Checkpoint**                    | A serializable snapshot of read 
positions across all assigned segments, used by the CheckpointConsumer to 
restore processing state after a restart or topology change.                    
                                                                                
                                  |
+| **Tailing Reads**                 | A consumption pattern where consumers 
follow closely behind producers on active (non-sealed) segments, making 
consumer-side I/O metrics potentially useful for scaling decisions.             
                                                                                
                              |
+
+### Range Segments and the Segment Directed Acyclic Graph (DAG)
+
+A scalable topic is a **DAG of range segments**. A topic starts with one or 
more range segments. Each range segment covers a contiguous portion of the 
keyspace (expressed as a hash range). At produce time, a keyed message is 
routed to the active segment whose hash range covers the message's key hash.
+
+A range segment can be **split** into two child segments, each inheriting a 
sub-portion of the parent's keyspace. Adjacent range segments can be **merged** 
into a single child segment covering the combined keyspace. These operations 
extend the DAG by adding parent-child edges.
+
+The DAG encodes a strict happens-before relationship: all messages in a parent 
segment happen-before messages in any of its child segments. Consumers must 
traverse the DAG in this order to maintain correctness across splits and merges.
+
+A "range segment" is often referred to as a "range" for brevity. Each range is 
backed by exactly one managed ledger and is owned by exactly one broker at any 
given time.
+
+### Splitting and Merging Protocol
+
+**Splitting** does not require cross-broker coordination — since a range is 
owned by a single broker, the split is a local operation on the owning broker:
+
+1. The current segment is sealed. A termination marker entry is written to the 
managed ledger.
+2. Two child segments are created with defined keyspace boundaries.
+3. The updated DAG is pushed to all connected clients via their watch sessions.
+4. Producers begin routing to the appropriate child segment. The previous 
consumer for the parent segment is preferentially assigned to one of the 
children to minimize rebalancing.
+
+**Merging** requires cross-broker coordination since the segments being merged 
may be owned by different brokers. One broker acts as the **merge leader**, 
coordinating the seal of all involved segments and the creation of the merged 
child. Merging multiple adjacent segments at once is supported. As with 
splitting, the updated DAG is pushed to clients via their active watch sessions.
+
+### Topic URL Scheme
+
+Scalable topics use a dedicated URL scheme to make the topic type unambiguous 
at the API and routing level:
+
+- **Topic URL:** `topic://{tenant}/{namespace}/{name}`
+- **Range segment URL:** `segment://{tenant}/{namespace}/{name}/{segment-id}`
+
+Using a distinct scheme — rather than a prefix in the topic name — avoids 
ambiguity with existing `persistent://` and `non-persistent://` URLs and 
enables topic type detection without an extra metadata round-trip.
+
+### Lookup: Persistent Watch Sessions
+
+Rather than a one-shot request/response, the lookup for a scalable topic 
establishes a **persistent watch session** between the client and a broker. The 
broker returns the initial topic state — current active segments, their 
keyspace boundaries, and broker addresses — and then **proactively pushes 
updates** whenever the segment DAG changes due to a split or merge. This avoids 
polling and the need for clients to re-issue lookups on topology changes.
+
+For producers, any broker can serve the watch session, since segment metadata 
lives in the shared metadata store. For controller-coordinated consumers 
(StreamConsumer and CheckpointConsumer), the watch session additionally 
delivers the address of the elected **controller broker** for the subscription.
+
+### Consumer Models
+
+Three consumption patterns are supported:
+
+- **StreamConsumer:** Registered with the controller; receives exclusive 
assignment of a subset of range segments; processes messages in key order 
within each assigned segment. Analogous to Kafka's consumer group model, 
Pravega's reader groups, or Pulsar's existing exclusive and failover 
subscription models. Can additionally use a Key_Shared-style dispatch within a 
single segment to scale the number of consumers beyond the segment count, 
without requiring a split.
+- **QueueConsumer:** Subscribes independently to all active segments using 
shared dispatch; no controller interaction required. Messages are delivered 
without cross-segment ordering guarantees, analogous to Pulsar's existing 
shared subscription model.
+- **CheckpointConsumer:** Registered with the controller; receives segment 
assignments like the StreamConsumer, but tracks read positions externally via a 
serializable `Checkpoint` snapshot rather than subscription cursors. Designed 
for use by stream processing frameworks (Flink, Beam) that manage their own 
state and require restorable read positions across topology changes.
+
+### Consumer Controller and Leader Election
+
+StreamConsumer and CheckpointConsumer require coordinated segment assignment. 
A single broker, elected as the **controller** via leader election in the 
metadata store, manages segment-to-consumer assignments for a given 
subscription. The controller persists the assignment state so that it can be 
restored after a broker failure without requiring consumers to rebalance from 
scratch.
+
+Consumers register with the controller using a **persistent bidirectional 
stream**, establishing a **consumer controller session**. The controller pushes 
segment assignment updates through this stream, and consumers report segment 
completions and acknowledgment positions back. Each session is identified by a 
stable **consumer identity** (a name chosen by the client). If the underlying 
connection drops, the session enters a **grace period** during which the 
consumer's assigned segments ar [...]
+
+### New Client API
+
+A new API namespace is introduced (e.g., `org.apache.pulsar.client.api.v5`) 
designed from the ground up for scalable topics. The existing API remains 
unchanged and fully supported in 5.0 LTS releases. Deprecation of the existing 
API would be planned for a later LTS release (6.0 LTS) and removed in the next 
LTS release after that (7.0 LTS).
+
+The new API introduces **type-safe, separate consumer interfaces** — 
`StreamConsumer`, `QueueConsumer`, and `CheckpointConsumer` — each exposing 
only the operations valid for that consumption pattern. This makes invalid 
operations unrepresentable at compile time rather than producing runtime 
exceptions. Each interface has its own builder exposing only the configuration 
options that apply to it.
+
+This replaces Pulsar's existing subscription type model (Exclusive, Failover, 
Shared, Key_Shared), where a single `Consumer` interface exposes operations 
that are silently no-ops depending on the subscription type.
+
+New client API implementations are required for all supported language client 
SDKs, such as Java, Python, Go, C++, DotNet, and Node.js.
+
+### Phased Delivery
+
+Given the Pulsar 5.0 LTS timeline (targeting October 2026) and the strategy of 
shipping pre-LTS features in 4.x for community feedback:
+
+1. **Phase 1 (4.3.0 or 4.4.0):** Introduce the new client API, range segment 
abstraction, a basic scalable topic type starting with a single segment, and 
manual splitting via admin API. Validates the storage and metadata model 
end-to-end.
+2. **Phase 2 (4.4.0):** Consumer controller, stream consumer rebalancing on 
segment changes, auto-split based on I/O thresholds, and the queue consumer 
model. Limited Geo-Replication support (no replicated subscriptions).
+3. **Phase 3 (5.0 LTS):** Range merging and finalized client API
+4. **Phase 4 (5.1 and later):** Replicated subscriptions, transaction support 
across ranges, stream processing connector integration.
+
+The above phases are subject to change as the design evolves.
+
+---
+
+## Key Design Decisions (Preliminary)
+
+| Decision | Approach |
+|----------|----------|
+| Scaling mechanism | Range splitting and merging via a segment DAG |
+| Storage per range | One managed ledger per range segment |
+| Key routing | Range-based hash assignment (replaces `hash(key) % 
numPartitions`) |
+| Split coordination | Single-broker operation (range is owned by one broker) |
+| Merge coordination | Cross-broker, leader-based; supports merging multiple 
adjacent ranges at once |
+| Lookup | Persistent watch session with server-pushed DAG updates |
+| Consumer coordination | Elected controller broker; persistent bidirectional 
stream; persisted assignment state |
+| Consumer controller session | Identified by consumer identity; grace period 
(~1 min) holds assigned segments in reserve on disconnect, allowing 
reconnection without rebalancing |
+| Consumer model | StreamConsumer + QueueConsumer + CheckpointConsumer 
(replaces Exclusive/Failover/Shared/Key_Shared) |
+| Read-side scaling within a range | Key_Shared-style dispatch within a 
segment (StreamConsumer mode) |
+| Client API | New API namespace; type-safe separate consumer interfaces |
+| Metadata store | Oxia (preferred for Pulsar 5.0); enables streaming watch 
sessions |
+| Delivery | Phased: 4.3.0 → 4.4.0 → 5.0 LTS |
+
+---
+
+## Challenges and Preliminary Solutions
+
+This section summarizes the key challenges that scalable topics introduce. 
Each is addressed at a high level here; detailed solutions are deferred to 
sub-PIPs. The presented solutions are subject to change as the design evolves.
+
+### Scaling Writes
+
+When a single range segment becomes a write bottleneck, the system should 
allow splitting the range segment. The split must be coordinated: the current 
segment is sealed, child segments are created, and producers are redirected. 
The key-to-range mapping must update atomically from the client's perspective. 
Both automatic (I/O-threshold-based) and manual (admin API) triggers should be 
supported.
+
+Splitting and merging are fundamentally producer-side decisions: they affect 
where new messages are routed. In tailing reads mode — where consumers follow 
closely behind producers — consumer-side I/O metrics could also inform scaling 
decisions, and the number of splits could adapt to the number of available 
consumers. Once a segment is sealed, its topology is fixed.
+
+Auto-merge is the symmetric case: when I/O on adjacent segments drops below a 
low-water mark, those segments become candidates for merging. This is harder in 
practice than auto-split. Merging requires cross-broker coordination, and the 
buddy algorithm constraint means only segments that share split ancestry are 
eligible. There is also a risk of thrashing under a fluctuating load if the 
threshold is naive. Manual merge via the admin API is also supported, which 
makes it possible to drive  [...]
+
+### Scaling Reads
+
+When segments are split, the system must make newly created segments available 
to additional consumer instances. The consumer controller handles rebalancing. 
In container orchestration environments such as Kubernetes, the current segment 
count can be exposed as a scaling metric so that the consumer deployment scales 
out and in automatically.
+
+### Scaling Reads beyond the Segment Count
+
+Segment count is determined by produce-time topology and is fixed once 
segments are sealed. To scale reads independently of writes, the StreamConsumer 
supports a **Key_Shared-style dispatch mode within a single segment**, fanning 
out consumption to multiple consumers without requiring the segment to be split.
+
+### Message Entry Format: Batch Routing and Key_Shared-style Dispatch
+
+Current Pulsar Key_Shared consumption requires that producers use 
`BatcherBuilder.KEY_BASED` so the broker can route batch entries to the correct 
consumer. For scalable topics, eliminating this producer-side requirement is 
necessary.
+
+The new message entry format must allow the broker to route or filter 
individual messages within a batch entry without requiring the producer and 
broker to perform costly operations. It must also support selective 
decompression and decryption so that only the relevant portion of an entry is 
processed by each consumer or replication path. The format should support both 
keyed and non-keyed (total-order) modes, and its design affects the `MessageId` 
representation, the acknowledgement model [...]
+
+### Key-Ordered Consumption and Exactly-Once Semantics
+
+Key-ordered processing must be maintained across splits and merges. When a 
segment is split, consumers must finish processing all messages in the parent 
segment before reading from its children — this is the happens-before guarantee 
encoded by the segment DAG. Consumers that traverse sealed segments during 
catch-up reads must follow this parent-before-child order to preserve 
correctness.
+
+Exactly-once messaging semantics depend on three mechanisms: idempotent 
producers (sequence-number-based deduplication at the broker), broker-side 
deduplication state, and transactions for atomic multi-segment writes. Each of 
these is currently scoped to a single topic or partition. Scalable topics 
introduce a dynamic segment lifecycle where segments are created, sealed, and 
replaced over time. Deduplication state must be carried over correctly during 
splits. Transactions that span multi [...]
+
+Key_Shared-style dispatch within a segment may require independent per-range 
cursors rather than the single shared cursor used in the current Key_Shared 
model, allowing the consumer controller to track and rebalance progress at 
hash-range granularity. Key distribution imbalance — where a small number of 
hot keys dominate throughput — is a known challenge in the current Key_Shared 
implementation; the consumer controller's ability to reassign and drain 
individual hash ranges addresses this [...]
+
+### Consumer Controller Failure
+
+If the controller broker fails, a new controller is elected and restores the 
persisted assignment state. Consumers can continue consuming from their 
already-assigned segments until the new controller is ready. On startup, the 
new controller treats all sessions as disconnected and immediately starts the 
grace period for each. Consumers that reconnect within the grace period have 
their sessions restored and receive the same segment assignments; those that do 
not reconnect in time have thei [...]
+
+### Acknowledgment Model
+
+The StreamConsumer could use cumulative acknowledgements when a single 
consumer processes a segment sequentially, reducing overhead and metadata store 
load. However, when Key_Shared-style dispatch fans out messages from a single 
segment to multiple consumers, messages may be processed out of order, 
requiring individual acknowledgements. The QueueConsumer also requires 
individual acknowledgements for the same reason. An additional complication is 
that a single entry can be delivered to tw [...]
+
+### Delayed Messages
+
+Delayed message delivery requires tracking timer state across all active 
(non-sealed) range segments, since delayed messages can be spread across 
multiple segments. The delay timer tracking and dispatch mechanism must account 
for the multi-segment topology. One approach is to maintain an active 
subscription and consumer on every segment that still contains undelivered 
delayed messages.
+
+### Pulsar Transactions
+
+Pulsar's transaction system is currently partition/topic-oriented. Extending 
transactions to span records across multiple range segments of the same 
scalable topic requires careful coordination, especially if a segment is split 
or merged during the transaction's lifetime. Transaction support is deferred to 
Phase 4.
+
+### Stream Processing Integration (Flink / Apache Beam)
+
+Stream processing frameworks manage their own state and require stable, 
restorable read positions. When a range is split or merged, the Flink source 
connector must handle the new segment topology and map restored checkpoints — 
which may reference now-sealed segments — to correct positions in the successor 
segments. Watermark propagation must account for a dynamically changing number 
of input streams. A clean integration with Flink's `Source`/`SplitEnumerator` 
model is a design goal; the  [...]
+
+### Geo-Replication
+
+A foundational design decision for geo-replication is that **range segments 
are independent per cluster** — the segment topology in one cluster is not 
required to match, or be coordinated with, the topology in another. A cluster 
is free to split or merge its own segments based on its local traffic patterns, 
independently of what remote clusters do. Coupling segment topologies across 
clusters would reintroduce a global coordination requirement that scalable 
topics are specifically designe [...]
+
+For message replication, this independence means that a replicator cannot 
assume that a segment with a given identifier exists on the remote cluster. 
Instead, replication must route each message to whichever active segment on the 
remote cluster covers the message's key hash at the time of replication — the 
same routing logic used by producers.
+
+**Replicated subscriptions** require a fundamentally new mechanism. The 
existing approach assumes a subscription position is a single point in a linear 
stream; with scalable topics, it is a set of positions across multiple active 
range segments whose DAGs can diverge independently between clusters. The 
detailed design is deferred to a dedicated sub-PIP.
+
+Geo-replication also requires the new message entry format described in 
[Message Entry 
Format](#message-entry-format-batch-routing-and-key_shared-style-dispatch), 
allowing the receiving broker to route an incoming batch to the correct 
destination segment without decompressing or decrypting the payload. The 
interaction between batch boundaries, compression, and encryption at the 
replication layer will be addressed jointly by the message entry format sub-PIP 
and the geo-replication sub-PIP.
+
+---
+
+## Public-Facing Changes
+
+This section gives a high-level overview of the changes visible to operators 
and application developers. Each area is intentionally kept brief here — the 
precise API surface, configuration keys, metric names, and protocol details are 
defined in the dedicated sub-PIPs.
+
+### New Topic Type
+
+A new topic type — scalable topic — is introduced and registered in Pulsar's 
topic type system. It is created via the admin API with a new topic type 
identifier and is addressable using the `topic://` URL scheme.
+
+### Admin API
+
+New admin API endpoints for scalable topics:
+
+- Create / delete a scalable topic
+- List scalable topics in a namespace
+- Get topic metadata (segment DAG, active segments)
+- Trigger a manual split of a range segment
+- Trigger a manual merge of adjacent range segments
+- Get subscription-specific segment assignment state
+
+### Binary Protocol
+
+The existing Pulsar binary protocol topic lookup command is not used for 
scalable topics. Broker discovery is handled entirely through the two new 
bidirectional streams described below — the DAG watch session for segment 
topology and broker addresses, and the consumer controller stream for 
assignment coordination.
+
+Two new bidirectional protocol streams are introduced:
+
+- **DAG watch session**: initiated by any client (producer or consumer) 
against any broker. The broker returns the initial topic state (segment DAG, 
active segment broker addresses, controller broker address) and then pushes 
updates whenever the segment topology changes. The same response message type 
is reused for both the initial reply and later pushes.
+- **Consumer controller stream**: initiated by StreamConsumer and 
CheckpointConsumer clients against the elected controller broker. The client 
registers its identity; the controller pushes segment assignment updates; the 
client sends graceful unregistration on close.
+
+Additional protocol changes:
+
+- Extended `MessageId` format that includes the range segment identifier. 
Support for the new message entry format also impacts the message ID format.
+- New `segment://` topic domain alongside the existing `topic://` domain
+
+### Client API
+
+A new client module (`pulsar-client-v5`) introduces type-safe interfaces for 
scalable topics:
+
+- `ScalableTopicProducer`: opens a DAG watch session; routes each message to 
the active segment covering the message's key hash; adapts automatically as the 
topology changes.
+- `StreamConsumer`: registers with the controller broker; receives exclusive 
segment assignments; consumes in key order within each assigned segment; uses 
subscription cursors for position tracking.
+- `QueueConsumer`: subscribes to all active segments with shared dispatch; no 
controller interaction; messages delivered without cross-segment ordering 
guarantees.
+- `CheckpointConsumer`: registers with the controller like a StreamConsumer, 
but tracks positions externally via a `Checkpoint` snapshot instead of 
subscription cursors. Requires a `group` identifier to coordinate parallel 
instances. Designed for Flink and similar stream processing frameworks.
+- Dedicated builders for each type, exposing only the configuration options 
applicable to that type.
+
+### Configuration
+
+Each sub-PIP introduces its own configuration options. The examples below are 
illustrative; the complete list will grow as sub-PIPs are written. All options 
are expected to be configurable at broker level, with selected options also 
overridable at namespace and topic level.
+
+Examples of anticipated configuration options:
+
+- Consumer controller session grace period (default: ~1 minute; the time a 
disconnected consumer's segment assignments are held in reserve before being 
redistributed)
+- Auto-split I/O threshold
+- Segment count target for auto-split
+
+### Metrics
+
+Each sub-PIP defines the metrics relevant to its area. The examples below are 
illustrative; the complete set will be specified in a dedicated metrics sub-PIP.
+
+Examples of anticipated metrics:
+
+- Per-topic: active segment count, total message rate, total throughput
+- Per-segment: message rate in/out, backlog, consumer count
+- Controller: assignment events, rebalance events, controller failover count
+- Segment lifecycle: split count, merge count, seal events
+
+---
+
+## Backward & Forward Compatibility
+
+### Upgrade
+
+Scalable topics are introduced as a new, distinct topic type. No changes are 
made to the existing partitioned or non-partitioned topic implementation. 
Existing clients and topics continue to function without modification with 
partitioned or non-partitioned topics.
+
+Scalable topics require a compatible broker and client version. Clients 
connecting to older brokers will receive an error when attempting to create or 
connect to a scalable topic, and older clients cannot connect to scalable 
topics even on a compatible broker.
+
+### Downgrade / Rollback
+
+Scalable topics are not usable on broker versions that predate this feature. 
Rolling back to an older broker version is possible; existing non-partitioned 
and partitioned topics continue to work normally. Scalable topic data and 
metadata should be retained in BookKeeper and the metadata store during a 
rollback so that upgrading again to a compatible broker version resumes access 
to scalable topics.
+
+### Pulsar Geo-Replication
+
+Geo-replication for scalable topics is a new protocol path. Clusters 
participating in geo-replication of scalable topics must all run a compatible 
broker version. Replicating scalable topics to a cluster running an older 
Pulsar version is not supported. Detailed upgrade considerations will be 
specified in the geo-replication sub-PIP.
+
+---
+
+## Security Considerations
+
+Scalable topics follow the same tenant/namespace/topic authorization model as 
existing Pulsar topics. The new admin API endpoints for segment operations 
(split, merge, metadata read) will require the same permissions as equivalent 
topic-level admin operations.
+
+The watch session and the consumer controller stream are authenticated using 
the same mechanisms as existing Pulsar binary protocol connections (TLS, token 
authentication, mutual TLS).
+
+Multi-tenancy isolation is preserved: a client can only watch or interact with 
segments that belong to topics it has permission to access. The controller does 
not expose the assignment state across subscriptions or tenants.
+
+---
+
+## Sub-PIPs
+
+This is a parent PIP. The following sub-PIPs are planned to cover each major 
design area in detail. Each sub-PIP will include its own public-facing changes, 
detailed design, and backward compatibility sections. A single sub-PIP may 
address multiple related areas from this list if they are tightly coupled.
+
+| Sub-PIP | Area                                                               
                       |
+| ------- 
|-------------------------------------------------------------------------------------------|
+| TBD | High-level architecture: functionality, capabilities, concepts, 
components, interactions, contracts, and design decisions |
+| TBD | Range segment data model and metadata store schema                     
                   |
+| TBD | StreamConsumer API and protocol                                        
                   |
+| TBD | QueueConsumer API and protocol                                         
                   |
+| TBD | CheckpointConsumer API and protocol                                    
                   |
+| TBD | Migration strategy                                                     
                   |
+| TBD | Producer protocol: watch sessions, range routing, topology change 
handling                |
+| TBD | Consumer controller: leader election, assignment protocol, persistent 
state, grace period |
+| TBD | Range splitting: broker-side protocol, admin API, auto-split triggers  
                   |
+| TBD | Range merging: cross-broker coordination protocol, admin API           
                   |
+| TBD | Message entry format for Scalable Topics                               
                   |
+| TBD | Scalable topic metrics                                                 
                   |
+| TBD | Transaction support across range segments                              
                   |
+| TBD | Geo-replication for scalable topics                                    
                   |
+| TBD | Replicated Subscriptions for scalable topics                           
                   |
+| TBD | Stream processing connector for scalable topics (Flink / Apache Beam)  
                   |
+
+---
+
+## Alternatives
+
+### Continue with Partitioned Topics
+
+Partitioned topics could be improved incrementally (e.g., online 
repartitioning with a migration period, smarter placement). However, 
modulo-based routing is fundamentally incompatible with transparent 
key-preserving repartitioning. A clean-break design is necessary to deliver the 
desired semantics without carrying forward the modulo constraint.
+
+---
+
+## General Notes
+
+The scalable topic feature is targeted at Pulsar 5.0 as the LTS milestone. The 
phased delivery path through 4.3.0 and 4.4.0 is designed to collect 
implementation experience and community feedback before the LTS commitment.
+
+---
+
+## Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/kthcmtxzv678d1gn9f5vo3psmwz4j3yr
+* Mailing List voting thread: 
https://lists.apache.org/thread/jd9kklo3qlbmt2cxg1s7oynof14gmsn4


Reply via email to