This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 18653513c50 [feat][pip] PIP-460: Scalable Topics (Topics v5) (#25315)
18653513c50 is described below
commit 18653513c50443f483b1e502c3965ee0c222e3fd
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 1 16:48:28 2026 +0300
[feat][pip] PIP-460: Scalable Topics (Topics v5) (#25315)
---
pip/pip-460.md | 426 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 426 insertions(+)
diff --git a/pip/pip-460.md b/pip/pip-460.md
new file mode 100644
index 00000000000..2dbb25af74f
--- /dev/null
+++ b/pip/pip-460.md
@@ -0,0 +1,426 @@
+# PIP-460: Scalable Topics (Topics v5)
+## Motivation
+
+Pulsar's partitioned topic model has three fundamental limitations that make
it unsuitable as the sole topic type for applications requiring elastic,
transparent scaling.
+
+### Key ordering breaks when partition count changes
+
+Many applications rely on key-based message ordering: all messages for the
same key must be processed in sequence, guaranteeing correctness for
deduplication, session aggregation, stateful stream processing, and similar use
cases. Key-to-partition routing uses modulo hashing: `hash(key) %
numPartitions`. When the partition count changes — say from 4 to 8 — the modulo
mapping changes completely. A key previously routed to partition 2 may now hash
to partition 6. Messages produced after th [...]
+
+### Partitions can never decrease
+
+Pulsar does not support reducing the number of partitions. Once a topic is
scaled up to 64 partitions, it remains at 64 partitions permanently — even if
traffic drops to a level where 4 partitions would suffice. Decreasing
partitions would require re-routing keys (breaking ordering), migrating
unconsumed messages from removed partitions, and updating every active
consumer's assignment. No safe mechanism exists for this today. The result is
**partition count drift**: topics accumulate mor [...]
+
+### The cost of getting the partition count wrong
+
+Because partitions cannot shrink and growing them breaks ordering, operators
must predict the right partition count at topic creation time.
Over-provisioning wastes resources; under-provisioning leads to write-hot
partitions that eventually require re-creating the topic with a higher
partition count. Neither outcome is acceptable for a system that aims to be
operationally simple at scale.
+
+---
+
+## Background Knowledge
+
+### Partitioned Topics Today
+
+Apache Pulsar scales topics by pre-configuring a fixed number of
**partitions**. Each partition is an independent persistent topic backed by a
single managed ledger on a single broker. A managed ledger is Pulsar's
abstraction over Apache BookKeeper, providing an append-only log with
cursor-based consumption tracking.
+
+For key-ordered messaging, producers route messages using `hash(key) %
numPartitions`, guaranteeing that all messages with the same key land on the
same partition and are consumed in order. Consumer groups are partition-aware:
each consumer in a group is assigned one or more partitions, and the assignment
is stable as long as the partition count does not change.
+
+### The Managed Ledger Layer
+
+A managed ledger is a sequence of BookKeeper ledgers forming a single logical
stream. Subscriptions track their position in this stream using cursors. The
managed ledger layer handles ledger rollover (sealing a full ledger and opening
a new one), cursor advancement, and interaction with BookKeeper for durable
storage.
+
+Pulsar's existing topic termination feature allows a managed ledger to be
permanently sealed — a capability that is relevant to scalable topics when a
range segment is split or merged.
+
+### Relevant Existing PIPs
+
+- **[PIP-379](pip-379.md)**: Key_Shared Draining Hashes — improves per-key
ordering guarantees during consumer changes. The "draining hashes" concept and
its mechanisms are directly relevant to scalable topics, where a
Key_Shared-style dispatch mode is used to fan out consumption within a single
range segment.
+- **[PIP-335](pip-335.md)**: Oxia Metadata Store — Oxia is the preferred
metadata store backend for Pulsar 5.0 and is particularly relevant for scalable
topics because it supports streaming watch sessions, which the scalable topics
lookup mechanism is designed to use.
+
+---
+
+## Goals
+
+### In Scope
+
+- Introduce **Scalable Topics** as a new, distinct topic type in Pulsar
(referred to as Topics v5 in the codebase).
+- Support **transparent range splitting** — increasing the number of parallel
write and read streams without disrupting in-flight messages or breaking key
ordering.
+- Support **range merging** — reducing the number of parallel streams when
traffic decreases, recovering resources without requiring a topic re-creation.
+- Preserve **key ordering guarantees** across splits and merges through
range-based routing (replacing modulo hashing with range-hash assignment).
+- Support **Key_Shared-style dispatch** within a single range segment,
allowing multiple consumers to share a segment while preserving key-ordered
delivery.
+- Introduce a **new client API** designed from the ground up for scalable
topics, with separate type-safe interfaces for each consumer type.
+- Expose a **persistent, push-based lookup session** so clients receive
topology updates without polling.
+- Introduce a **consumer controller** with leader election and persistent
assignment state for coordinated, stable segment assignment.
+- Improve the **consumer model** to reduce out-of-order and duplicate message
processing in clients by introducing a persistent lease and consumer session
model.
+- Improve the **producer protocol** to support application-level flow control,
removing the dependency on TCP/IP flow control.
+- Change the **message entry format** to support scalable topics and future
use cases.
+- Define a **phased delivery path** (Pulsar 4.x → 5.0 LTS) that allows
incremental delivery and community feedback before the LTS commitment.
+- This PIP defines the overall design direction. High-level architecture and
detailed design for each subproblem is covered by dedicated sub-PIPs (see
[Sub-PIPs](#sub-pips)).
+
+### Postponed after Pulsar 5.0 release
+
+In some of the above areas, the core design must accommodate features that may
be deferred beyond the Pulsar 5.0 LTS timeline so that the implementation can
be extended later without a redesign. Potentially deferred features include:
+
+- Full Geo-replication support including Replicated Subscriptions
+- Transaction support
+- Stream processing connector implementations (Flink, Beam)
+- Tooling for migrating existing partitioned topics to scalable topics
+- Any in-scope features that are lower priority and cannot be completed before
the Pulsar 5.0 LTS release due to time constraints.
+
+---
+
+## Prior Art
+
+### Northguard (LinkedIn)
+
+Northguard introduces a **range-based** data model. A topic is a named
collection of ranges that together cover the full keyspace. Each range is a
sequence of segments (the unit of replication), where each segment is assigned
to a different set of brokers — a technique called **log striping** that keeps
the cluster balanced by design.
+
+Scaling is achieved through **range splitting and merging** using a **buddy
algorithm**: a range can be split into exactly two child ranges, and only two
ranges previously split from the same parent can be merged back. This provides
clean ordering guarantees — all records in a parent range happen-before records
in any child range, and all records in sibling ranges happen-before records in
their merged parent. It also naturally aligns keyspaces across different
topics, eliminating the nee [...]
+
+Unlike indexed partitions, which require a global synchronization barrier when
the partition count changes, range splits only interrupt producers writing to
the specific range being split.
+
+### Pravega
+
+Pravega uses a similar segment-based model with splitting and merging.
Neighboring keyspace ranges can be merged, which is slightly more flexible than
Northguard's strict buddy constraint. Pravega's consumer-side abstractions
include:
+
+- **Reader Groups** that balance segments across readers (consumers).
+- **Reader scaling** where applications (or Flink integration) can dynamically
adjust the number of reader tasks.
+- **Checkpoints** that store the state of all active positions across a reader
group.
+- **Transactions** for atomic writes across segments.
+
+Pravega's Flink connector experience highlights a key challenge: when range
topology changes, the stream processing connector must handle checkpoint
restoration across segment boundaries and coordinate watermarks correctly
across a dynamic number of streams. For key-ordered processing, Pravega routes
events to segments by routing key, ensuring all events for a given key are read
by the same parallel Flink instance. However, stateful downstream operators
that require key-based ordering st [...]
+
+---
+
+## Preliminary Design Direction
+
+> **Note:** This section does not contain the high-level design for Scalable
Topics. The full high-level design — covering functionality, capabilities and
quality characteristics, concepts, components, interactions, contracts, and
related design decisions — will be defined in a dedicated sub-PIP once PIP-460
has received high-level acceptance from the Pulsar community to proceed with
planning.
+>
+> PIP-460 is intentionally a vision and problem statement document. Its
purpose is to establish the motivation, goals, and key challenges that need to
be resolved and to seek community alignment in the direction before detailed
design work begins.
+
+The sections below sketch some preliminary design directions to illustrate the
problem space. These are subject to change as the detailed design evolves in
sub-PIPs.
+
+### Design Principles
+
+1. **Works on existing Pulsar architecture.** Scalable topics layer on top of
Pulsar's existing broker, managed ledger, and BookKeeper stack rather than
replacing them. The range segment maps directly to one managed ledger.
+2. **Simplicity for users.** The user mental model — a single topic name that
scales transparently — should be simpler than partitioned topics, not more
complex.
+3. **Long-term default.** The long-term vision is that scalable topics
eventually become the default topic type, replacing the current partitioned and
non-partitioned topic model.
+4. **Clean break for the client API.** Existing Pulsar clients are not
compatible with scalable topics. This is intentional: it creates the
opportunity to revisit the client API, revisit the binary protocol, and change
the message entry format without carrying forward legacy constraints.
+
+### Concepts/Vocabulary (Preliminary)
+
+| Concept | Description
|
+|-----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Scalable Topic** | A new topic type identified by the
`topic://` URL scheme. Scales transparently through range splitting and merging
without requiring recreation or downtime.
|
+| **Range Segment** | The fundamental unit of a scalable
topic. Covers a contiguous portion of the keyspace (a hash range) and is backed
by exactly one managed ledger on one broker. Often referred to as "range" for
brevity.
|
+| **Keyspace** | The full space of possible key hashes.
Each range segment is assigned a non-overlapping slice of the keyspace;
together all active segments cover the full keyspace.
|
+| **Segment DAG** | The directed acyclic graph that records
the full history of range segments for a topic. Parent-child edges are added
when segments are split or merged. Encodes a strict happens-before
relationship: all messages in a parent segment precede messages in its
children. |
+| **Split** | An operation that seals a range segment
and creates two child segments, each covering a sub-portion of the parent's
keyspace. A local operation on the owning broker.
|
+| **Merge** | An operation that seals two or more
adjacent range segments and creates a single child segment covering their
combined keyspace. Requires cross-broker coordination.
|
+| **Merge Leader** | The broker that coordinates a merge
operation, responsible for sealing all involved segments and creating the
merged child.
|
+| **Watch Session** | A persistent connection between a client
and a broker through which the broker pushes the initial topic state and
subsequent DAG topology updates. Replaces one-shot lookup requests.
|
+| **Consumer Controller** | A broker elected via leader election
that manages segment-to-consumer assignments for a subscription. Persists
assignment state to survive broker failures.
|
+| **Consumer Controller Session** | A persistent bidirectional stream
between a consumer and the controller broker used to push assignments and
receive progress reports.
|
+| **Consumer Identity** | A stable name chosen by the client that
identifies a consumer controller session. Allows the session to be restored
after a reconnect within the grace period.
|
+| **Consumer Session Grace Period** | Acts as a persistent lease on segment
assignments. When a consumer disconnects, its assignments are held in reserve
for the duration of the grace period rather than immediately redistributed. If
the consumer reconnects within this window, the lease is renewed and
assignments are restored unchanged. |
+| **StreamConsumer** | A consumer type registered with the
controller that receives exclusive segment assignments and processes messages
in key order within each segment.
|
+| **QueueConsumer** | A consumer type that subscribes to all
active segments with shared dispatch. No cross-segment ordering guarantees.
|
+| **CheckpointConsumer** | A consumer type registered with the
controller that tracks read positions externally via a serializable
`Checkpoint` snapshot. Designed for stream processing frameworks such as Flink.
|
+| **Checkpoint** | A serializable snapshot of read
positions across all assigned segments, used by the CheckpointConsumer to
restore processing state after a restart or topology change.
|
+| **Tailing Reads** | A consumption pattern where consumers
follow closely behind producers on active (non-sealed) segments, making
consumer-side I/O metrics potentially useful for scaling decisions.
|
+
+### Range Segments and the Segment Directed Acyclic Graph (DAG)
+
+A scalable topic is a **DAG of range segments**. A topic starts with one or
more range segments. Each range segment covers a contiguous portion of the
keyspace (expressed as a hash range). At produce time, a keyed message is
routed to the active segment whose hash range covers the message's key hash.
+
+A range segment can be **split** into two child segments, each inheriting a
sub-portion of the parent's keyspace. Adjacent range segments can be **merged**
into a single child segment covering the combined keyspace. These operations
extend the DAG by adding parent-child edges.
+
+The DAG encodes a strict happens-before relationship: all messages in a parent
segment happen-before messages in any of its child segments. Consumers must
traverse the DAG in this order to maintain correctness across splits and merges.
+
+A "range segment" is often referred to as a "range" for brevity. Each range is
backed by exactly one managed ledger and is owned by exactly one broker at any
given time.
+
+### Splitting and Merging Protocol
+
+**Splitting** does not require cross-broker coordination — since a range is
owned by a single broker, the split is a local operation on the owning broker:
+
+1. The current segment is sealed. A termination marker entry is written to the
managed ledger.
+2. Two child segments are created with defined keyspace boundaries.
+3. The updated DAG is pushed to all connected clients via their watch sessions.
+4. Producers begin routing to the appropriate child segment. The previous
consumer for the parent segment is preferentially assigned to one of the
children to minimize rebalancing.
+
+**Merging** requires cross-broker coordination since the segments being merged
may be owned by different brokers. One broker acts as the **merge leader**,
coordinating the seal of all involved segments and the creation of the merged
child. Merging multiple adjacent segments at once is supported. As with
splitting, the updated DAG is pushed to clients via their active watch sessions.
+
+### Topic URL Scheme
+
+Scalable topics use a dedicated URL scheme to make the topic type unambiguous
at the API and routing level:
+
+- **Topic URL:** `topic://{tenant}/{namespace}/{name}`
+- **Range segment URL:** `segment://{tenant}/{namespace}/{name}/{segment-id}`
+
+Using a distinct scheme — rather than a prefix in the topic name — avoids
ambiguity with existing `persistent://` and `non-persistent://` URLs and
enables topic type detection without an extra metadata round-trip.
+
+### Lookup: Persistent Watch Sessions
+
+Rather than a one-shot request/response, the lookup for a scalable topic
establishes a **persistent watch session** between the client and a broker. The
broker returns the initial topic state — current active segments, their
keyspace boundaries, and broker addresses — and then **proactively pushes
updates** whenever the segment DAG changes due to a split or merge. This avoids
polling and the need for clients to re-issue lookups on topology changes.
+
+For producers, any broker can serve the watch session, since segment metadata
lives in the shared metadata store. For controller-coordinated consumers
(StreamConsumer and CheckpointConsumer), the watch session additionally
delivers the address of the elected **controller broker** for the subscription.
+
+### Consumer Models
+
+Three consumption patterns are supported:
+
+- **StreamConsumer:** Registered with the controller; receives exclusive
assignment of a subset of range segments; processes messages in key order
within each assigned segment. Analogous to Kafka's consumer group model,
Pravega's reader groups, or Pulsar's existing exclusive and failover
subscription models. Can additionally use a Key_Shared-style dispatch within a
single segment to scale the number of consumers beyond the segment count,
without requiring a split.
+- **QueueConsumer:** Subscribes independently to all active segments using
shared dispatch; no controller interaction required. Messages are delivered
without cross-segment ordering guarantees, analogous to Pulsar's existing
shared subscription model.
+- **CheckpointConsumer:** Registered with the controller; receives segment
assignments like the StreamConsumer, but tracks read positions externally via a
serializable `Checkpoint` snapshot rather than subscription cursors. Designed
for use by stream processing frameworks (Flink, Beam) that manage their own
state and require restorable read positions across topology changes.
+
+### Consumer Controller and Leader Election
+
+StreamConsumer and CheckpointConsumer require coordinated segment assignment.
A single broker, elected as the **controller** via leader election in the
metadata store, manages segment-to-consumer assignments for a given
subscription. The controller persists the assignment state so that it can be
restored after a broker failure without requiring consumers to rebalance from
scratch.
+
+Consumers register with the controller using a **persistent bidirectional
stream**, establishing a **consumer controller session**. The controller pushes
segment assignment updates through this stream, and consumers report segment
completions and acknowledgment positions back. Each session is identified by a
stable **consumer identity** (a name chosen by the client). If the underlying
connection drops, the session enters a **grace period** during which the
consumer's assigned segments ar [...]
+
+### New Client API
+
+A new API namespace is introduced (e.g., `org.apache.pulsar.client.api.v5`)
designed from the ground up for scalable topics. The existing API remains
unchanged and fully supported in 5.0 LTS releases. Deprecation of the existing
API would be planned for a later LTS release (6.0 LTS) and removed in the next
LTS release after that (7.0 LTS).
+
+The new API introduces **type-safe, separate consumer interfaces** —
`StreamConsumer`, `QueueConsumer`, and `CheckpointConsumer` — each exposing
only the operations valid for that consumption pattern. This makes invalid
operations unrepresentable at compile time rather than producing runtime
exceptions. Each interface has its own builder exposing only the configuration
options that apply to it.
+
+This replaces Pulsar's existing subscription type model (Exclusive, Failover,
Shared, Key_Shared), where a single `Consumer` interface exposes operations
that are silently no-ops depending on the subscription type.
+
+New client API implementations are required for all supported language client
SDKs, such as Java, Python, Go, C++, DotNet, and Node.js.
+
+### Phased Delivery
+
+Given the Pulsar 5.0 LTS timeline (targeting October 2026) and the strategy of
shipping pre-LTS features in 4.x for community feedback:
+
+1. **Phase 1 (4.3.0 or 4.4.0):** Introduce the new client API, range segment
abstraction, a basic scalable topic type starting with a single segment, and
manual splitting via admin API. Validates the storage and metadata model
end-to-end.
+2. **Phase 2 (4.4.0):** Consumer controller, stream consumer rebalancing on
segment changes, auto-split based on I/O thresholds, and the queue consumer
model. Limited Geo-Replication support (no replicated subscriptions).
+3. **Phase 3 (5.0 LTS):** Range merging and finalized client API
+4. **Phase 4 (5.1 and later):** Replicated subscriptions, transaction support
across ranges, stream processing connector integration.
+
+The above phases are subject to change as the design evolves.
+
+---
+
+## Key Design Decisions (Preliminary)
+
+| Decision | Approach |
+|----------|----------|
+| Scaling mechanism | Range splitting and merging via a segment DAG |
+| Storage per range | One managed ledger per range segment |
+| Key routing | Range-based hash assignment (replaces `hash(key) %
numPartitions`) |
+| Split coordination | Single-broker operation (range is owned by one broker) |
+| Merge coordination | Cross-broker, leader-based; supports merging multiple
adjacent ranges at once |
+| Lookup | Persistent watch session with server-pushed DAG updates |
+| Consumer coordination | Elected controller broker; persistent bidirectional
stream; persisted assignment state |
+| Consumer controller session | Identified by consumer identity; grace period
(~1 min) holds assigned segments in reserve on disconnect, allowing
reconnection without rebalancing |
+| Consumer model | StreamConsumer + QueueConsumer + CheckpointConsumer
(replaces Exclusive/Failover/Shared/Key_Shared) |
+| Read-side scaling within a range | Key_Shared-style dispatch within a
segment (StreamConsumer mode) |
+| Client API | New API namespace; type-safe separate consumer interfaces |
+| Metadata store | Oxia (preferred for Pulsar 5.0); enables streaming watch
sessions |
+| Delivery | Phased: 4.3.0 → 4.4.0 → 5.0 LTS |
+
+---
+
+## Challenges and Preliminary Solutions
+
+This section summarizes the key challenges that scalable topics introduce.
Each is addressed at a high level here; detailed solutions are deferred to
sub-PIPs. The presented solutions are subject to change as the design evolves.
+
+### Scaling Writes
+
+When a single range segment becomes a write bottleneck, the system should
allow splitting the range segment. The split must be coordinated: the current
segment is sealed, child segments are created, and producers are redirected.
The key-to-range mapping must update atomically from the client's perspective.
Both automatic (I/O-threshold-based) and manual (admin API) triggers should be
supported.
+
+Splitting and merging are fundamentally producer-side decisions: they affect
where new messages are routed. In tailing reads mode — where consumers follow
closely behind producers — consumer-side I/O metrics could also inform scaling
decisions, and the number of splits could adapt to the number of available
consumers. Once a segment is sealed, its topology is fixed.
+
+Auto-merge is the symmetric case: when I/O on adjacent segments drops below a
low-water mark, those segments become candidates for merging. This is harder in
practice than auto-split. Merging requires cross-broker coordination, and the
buddy algorithm constraint means only segments that share split ancestry are
eligible. There is also a risk of thrashing under a fluctuating load if the
threshold is naive. Manual merge via the admin API is also supported, which
makes it possible to drive [...]
+
+### Scaling Reads
+
+When segments are split, the system must make newly created segments available
to additional consumer instances. The consumer controller handles rebalancing.
In container orchestration environments such as Kubernetes, the current segment
count can be exposed as a scaling metric so that the consumer deployment scales
out and in automatically.
+
+### Scaling Reads beyond the Segment Count
+
+Segment count is determined by produce-time topology and is fixed once
segments are sealed. To scale reads independently of writes, the StreamConsumer
supports a **Key_Shared-style dispatch mode within a single segment**, fanning
out consumption to multiple consumers without requiring the segment to be split.
+
+### Message Entry Format: Batch Routing and Key_Shared-style Dispatch
+
+Current Pulsar Key_Shared consumption requires that producers use
`BatcherBuilder.KEY_BASED` so the broker can route batch entries to the correct
consumer. For scalable topics, eliminating this producer-side requirement is
necessary.
+
+The new message entry format must allow the broker to route or filter
individual messages within a batch entry without requiring the producer and
broker to perform costly operations. It must also support selective
decompression and decryption so that only the relevant portion of an entry is
processed by each consumer or replication path. The format should support both
keyed and non-keyed (total-order) modes, and its design affects the `MessageId`
representation, the acknowledgement model [...]
+
+### Key-Ordered Consumption and Exactly-Once Semantics
+
+Key-ordered processing must be maintained across splits and merges. When a
segment is split, consumers must finish processing all messages in the parent
segment before reading from its children — this is the happens-before guarantee
encoded by the segment DAG. Consumers that traverse sealed segments during
catch-up reads must follow this parent-before-child order to preserve
correctness.
+
+Exactly-once messaging semantics depend on three mechanisms: idempotent
producers (sequence-number-based deduplication at the broker), broker-side
deduplication state, and transactions for atomic multi-segment writes. Each of
these is currently scoped to a single topic or partition. Scalable topics
introduce a dynamic segment lifecycle where segments are created, sealed, and
replaced over time. Deduplication state must be carried over correctly during
splits. Transactions that span multi [...]
+
+Key_Shared-style dispatch within a segment may require independent per-range
cursors rather than the single shared cursor used in the current Key_Shared
model, allowing the consumer controller to track and rebalance progress at
hash-range granularity. Key distribution imbalance — where a small number of
hot keys dominate throughput — is a known challenge in the current Key_Shared
implementation; the consumer controller's ability to reassign and drain
individual hash ranges addresses this [...]
+
+### Consumer Controller Failure
+
+If the controller broker fails, a new controller is elected and restores the
persisted assignment state. Consumers can continue consuming from their
already-assigned segments until the new controller is ready. On startup, the
new controller treats all sessions as disconnected and immediately starts the
grace period for each. Consumers that reconnect within the grace period have
their sessions restored and receive the same segment assignments; those that do
not reconnect in time have thei [...]
+
+### Acknowledgment Model
+
+The StreamConsumer could use cumulative acknowledgements when a single
consumer processes a segment sequentially, reducing overhead and metadata store
load. However, when Key_Shared-style dispatch fans out messages from a single
segment to multiple consumers, messages may be processed out of order,
requiring individual acknowledgements. The QueueConsumer also requires
individual acknowledgements for the same reason. An additional complication is
that a single entry can be delivered to tw [...]
+
+### Delayed Messages
+
+Delayed message delivery requires tracking timer state across all active
(non-sealed) range segments, since delayed messages can be spread across
multiple segments. The delay timer tracking and dispatch mechanism must account
for the multi-segment topology. One approach is to maintain an active
subscription and consumer on every segment that still contains undelivered
delayed messages.
+
+### Pulsar Transactions
+
+Pulsar's transaction system is currently partition/topic-oriented. Extending
transactions to span records across multiple range segments of the same
scalable topic requires careful coordination, especially if a segment is split
or merged during the transaction's lifetime. Transaction support is deferred to
Phase 4.
+
+### Stream Processing Integration (Flink / Apache Beam)
+
+Stream processing frameworks manage their own state and require stable,
restorable read positions. When a range is split or merged, the Flink source
connector must handle the new segment topology and map restored checkpoints —
which may reference now-sealed segments — to correct positions in the successor
segments. Watermark propagation must account for a dynamically changing number
of input streams. A clean integration with Flink's `Source`/`SplitEnumerator`
model is a design goal; the [...]
+
+### Geo-Replication
+
+A foundational design decision for geo-replication is that **range segments
are independent per cluster** — the segment topology in one cluster is not
required to match, or be coordinated with, the topology in another. A cluster
is free to split or merge its own segments based on its local traffic patterns,
independently of what remote clusters do. Coupling segment topologies across
clusters would reintroduce a global coordination requirement that scalable
topics are specifically designe [...]
+
+For message replication, this independence means that a replicator cannot
assume that a segment with a given identifier exists on the remote cluster.
Instead, replication must route each message to whichever active segment on the
remote cluster covers the message's key hash at the time of replication — the
same routing logic used by producers.
+
+**Replicated subscriptions** require a fundamentally new mechanism. The
existing approach assumes a subscription position is a single point in a linear
stream; with scalable topics, it is a set of positions across multiple active
range segments whose DAGs can diverge independently between clusters. The
detailed design is deferred to a dedicated sub-PIP.
+
+Geo-replication also requires the new message entry format described in
[Message Entry
Format](#message-entry-format-batch-routing-and-key_shared-style-dispatch),
allowing the receiving broker to route an incoming batch to the correct
destination segment without decompressing or decrypting the payload. The
interaction between batch boundaries, compression, and encryption at the
replication layer will be addressed jointly by the message entry format sub-PIP
and the geo-replication sub-PIP.
+
+---
+
+## Public-Facing Changes
+
+This section gives a high-level overview of the changes visible to operators
and application developers. Each area is intentionally kept brief here — the
precise API surface, configuration keys, metric names, and protocol details are
defined in the dedicated sub-PIPs.
+
+### New Topic Type
+
+A new topic type — scalable topic — is introduced and registered in Pulsar's
topic type system. It is created via the admin API with a new topic type
identifier and is addressable using the `topic://` URL scheme.
+
+### Admin API
+
+New admin API endpoints for scalable topics:
+
+- Create / delete a scalable topic
+- List scalable topics in a namespace
+- Get topic metadata (segment DAG, active segments)
+- Trigger a manual split of a range segment
+- Trigger a manual merge of adjacent range segments
+- Get subscription-specific segment assignment state
+
+### Binary Protocol
+
+The existing Pulsar binary protocol topic lookup command is not used for
scalable topics. Broker discovery is handled entirely through the two new
bidirectional streams described below — the DAG watch session for segment
topology and broker addresses, and the consumer controller stream for
assignment coordination.
+
+Two new bidirectional protocol streams are introduced:
+
+- **DAG watch session**: initiated by any client (producer or consumer)
against any broker. The broker returns the initial topic state (segment DAG,
active segment broker addresses, controller broker address) and then pushes
updates whenever the segment topology changes. The same response message type
is reused for both the initial reply and later pushes.
+- **Consumer controller stream**: initiated by StreamConsumer and
CheckpointConsumer clients against the elected controller broker. The client
registers its identity; the controller pushes segment assignment updates; the
client sends graceful unregistration on close.
+
+Additional protocol changes:
+
+- Extended `MessageId` format that includes the range segment identifier.
Support for the new message entry format also impacts the message ID format.
+- New `segment://` topic domain alongside the existing `topic://` domain
+
+### Client API
+
+A new client module (`pulsar-client-v5`) introduces type-safe interfaces for
scalable topics:
+
+- `ScalableTopicProducer`: opens a DAG watch session; routes each message to
the active segment covering the message's key hash; adapts automatically as the
topology changes.
+- `StreamConsumer`: registers with the controller broker; receives exclusive
segment assignments; consumes in key order within each assigned segment; uses
subscription cursors for position tracking.
+- `QueueConsumer`: subscribes to all active segments with shared dispatch; no
controller interaction; messages delivered without cross-segment ordering
guarantees.
+- `CheckpointConsumer`: registers with the controller like a StreamConsumer,
but tracks positions externally via a `Checkpoint` snapshot instead of
subscription cursors. Requires a `group` identifier to coordinate parallel
instances. Designed for Flink and similar stream processing frameworks.
+- Dedicated builders for each type, exposing only the configuration options
applicable to that type.
+
+### Configuration
+
+Each sub-PIP introduces its own configuration options. The examples below are
illustrative; the complete list will grow as sub-PIPs are written. All options
are expected to be configurable at broker level, with selected options also
overridable at namespace and topic level.
+
+Examples of anticipated configuration options:
+
+- Consumer controller session grace period (default: ~1 minute; the time a
disconnected consumer's segment assignments are held in reserve before being
redistributed)
+- Auto-split I/O threshold
+- Segment count target for auto-split
+
+### Metrics
+
+Each sub-PIP defines the metrics relevant to its area. The examples below are
illustrative; the complete set will be specified in a dedicated metrics sub-PIP.
+
+Examples of anticipated metrics:
+
+- Per-topic: active segment count, total message rate, total throughput
+- Per-segment: message rate in/out, backlog, consumer count
+- Controller: assignment events, rebalance events, controller failover count
+- Segment lifecycle: split count, merge count, seal events
+
+---
+
+## Backward & Forward Compatibility
+
+### Upgrade
+
+Scalable topics are introduced as a new, distinct topic type. No changes are
made to the existing partitioned or non-partitioned topic implementation.
Existing clients and topics continue to function without modification with
partitioned or non-partitioned topics.
+
+Scalable topics require a compatible broker and client version. Clients
connecting to older brokers will receive an error when attempting to create or
connect to a scalable topic, and older clients cannot connect to scalable
topics even on a compatible broker.
+
+### Downgrade / Rollback
+
+Scalable topics are not usable on broker versions that predate this feature.
Rolling back to an older broker version is possible; existing non-partitioned
and partitioned topics continue to work normally. Scalable topic data and
metadata should be retained in BookKeeper and the metadata store during a
rollback so that upgrading again to a compatible broker version resumes access
to scalable topics.
+
+### Pulsar Geo-Replication
+
+Geo-replication for scalable topics is a new protocol path. Clusters
participating in geo-replication of scalable topics must all run a compatible
broker version. Replicating scalable topics to a cluster running an older
Pulsar version is not supported. Detailed upgrade considerations will be
specified in the geo-replication sub-PIP.
+
+---
+
+## Security Considerations
+
+Scalable topics follow the same tenant/namespace/topic authorization model as
existing Pulsar topics. The new admin API endpoints for segment operations
(split, merge, metadata read) will require the same permissions as equivalent
topic-level admin operations.
+
+The watch session and the consumer controller stream are authenticated using
the same mechanisms as existing Pulsar binary protocol connections (TLS, token
authentication, mutual TLS).
+
+Multi-tenancy isolation is preserved: a client can only watch or interact with
segments that belong to topics it has permission to access. The controller does
not expose the assignment state across subscriptions or tenants.
+
+---
+
+## Sub-PIPs
+
+This is a parent PIP. The following sub-PIPs are planned to cover each major
design area in detail. Each sub-PIP will include its own public-facing changes,
detailed design, and backward compatibility sections. A single sub-PIP may
address multiple related areas from this list if they are tightly coupled.
+
+| Sub-PIP | Area
|
+| -------
|-------------------------------------------------------------------------------------------|
+| TBD | High-level architecture: functionality, capabilities, concepts,
components, interactions, contracts, and design decisions |
+| TBD | Range segment data model and metadata store schema
|
+| TBD | StreamConsumer API and protocol
|
+| TBD | QueueConsumer API and protocol
|
+| TBD | CheckpointConsumer API and protocol
|
+| TBD | Migration strategy
|
+| TBD | Producer protocol: watch sessions, range routing, topology change
handling |
+| TBD | Consumer controller: leader election, assignment protocol, persistent
state, grace period |
+| TBD | Range splitting: broker-side protocol, admin API, auto-split triggers
|
+| TBD | Range merging: cross-broker coordination protocol, admin API
|
+| TBD | Message entry format for Scalable Topics
|
+| TBD | Scalable topic metrics
|
+| TBD | Transaction support across range segments
|
+| TBD | Geo-replication for scalable topics
|
+| TBD | Replicated Subscriptions for scalable topics
|
+| TBD | Stream processing connector for scalable topics (Flink / Apache Beam)
|
+
+---
+
+## Alternatives
+
+### Continue with Partitioned Topics
+
+Partitioned topics could be improved incrementally (e.g., online
repartitioning with a migration period, smarter placement). However,
modulo-based routing is fundamentally incompatible with transparent
key-preserving repartitioning. A clean-break design is necessary to deliver the
desired semantics without carrying forward the modulo constraint.
+
+---
+
+## General Notes
+
+The scalable topic feature is targeted at Pulsar 5.0 as the LTS milestone. The
phased delivery path through 4.3.0 and 4.4.0 is designed to collect
implementation experience and community feedback before the LTS commitment.
+
+---
+
+## Links
+
+* Mailing List discussion thread:
https://lists.apache.org/thread/kthcmtxzv678d1gn9f5vo3psmwz4j3yr
+* Mailing List voting thread:
https://lists.apache.org/thread/jd9kklo3qlbmt2cxg1s7oynof14gmsn4