This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9e4e7e1277a [feat] [pip] PIP-466: New Java Client API (V5) with
Scalable Topic Support (#25455)
9e4e7e1277a is described below
commit 9e4e7e1277af795bf7a93294944563b4083ba7af
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 20 10:31:54 2026 -0700
[feat] [pip] PIP-466: New Java Client API (V5) with Scalable Topic Support
(#25455)
---
pip/pip-466.md | 505 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 505 insertions(+)
diff --git a/pip/pip-466.md b/pip/pip-466.md
new file mode 100644
index 00000000000..b222f5f6b14
--- /dev/null
+++ b/pip/pip-466.md
@@ -0,0 +1,505 @@
+# PIP-466: New Java Client API (V5) with Scalable Topic Support
+
+# Background Knowledge
+
+Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary
interface for Java
+applications since Pulsar's inception. The API surface has grown organically
over the years to
+support partitioned topics, multiple subscription types, transactions, schema
evolution, and more.
+
+**API versioning precedent.** Pulsar already went through a breaking API
redesign in the 2.0
+release, where the old API was moved into a separate compatibility module
+(`pulsar-client-1x-base`) and the main module was replaced with the new API.
This PIP takes
+a less disruptive approach: the existing `pulsar-client-api` and
`pulsar-client` modules stay
+completely unchanged, and the new API is introduced in an additional
`pulsar-client-v5` module.
+Existing applications are unaffected; new applications can opt in to the V5
API.
+
+**Partitioned topics** are Pulsar's current mechanism for topic-level
parallelism. A partitioned
+topic is a collection of N independent internal topics (partitions), each
backed by a separate
+managed ledger. The client is responsible for routing messages to partitions
(via `MessageRouter`)
+and is exposed to partition-level details through `TopicMessageId`,
`getPartitionsForTopic()`,
+and partition-specific consumers.
+
+**Subscription types** control how messages are distributed to consumers.
Pulsar supports four
+types — Exclusive, Failover, Shared, and Key_Shared — all accessed through a
single `Consumer`
+interface. The interface exposes all operations regardless of which
subscription type is in use,
+even though some operations (e.g., `acknowledgeCumulative()` on Shared) throw
at runtime.
+
+**Scalable topics**
([PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a
new
+server-side mechanism where a topic is composed of a DAG of hash-range
segments that can be
+dynamically split and merged by the broker. Unlike partitioned topics, the
number of segments
+is invisible to the client and can change at runtime without application
awareness. The client
+receives segment layout updates via a dedicated protocol command (DAG watch
session) and routes
+messages based on hash-range matching. This PIP defines the client API
designed to work with
+scalable topics; the broker-side design is covered by PIP-460.
+
+
+# Motivation
+
+## 1. Remove partitioning from the client API
+
+The current API leaks partitioning everywhere: `TopicMessageId` carries
partition indexes,
+`getPartitionsForTopic()` exposes the count, `MessageRouter` forces the
application to make
+routing decisions, and consumers can be bound to specific partitions. This
forces application
+code to deal with what is fundamentally a server-side scalability concern.
+
+With scalable topics, parallelism is achieved via hash-range segments managed
entirely by the
+broker. The client should treat topics as opaque endpoints — per-key ordering
is guaranteed
+when a key is specified, but the underlying parallelism (how many segments,
which broker owns
+each) is invisible and dynamic.
+
+The current API cannot cleanly support this model because partitioning is
baked into the type
+system (`TopicMessageId`), the builder API (`MessageRouter`,
`MessageRoutingMode`), and the
+consumer model (partition-specific consumers, `getPartitionsForTopic()`).
+
+## 2. Simplify an oversized API
+
+After years of organic growth, the API surface has accumulated significant
baggage:
+
+- `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek,
pause, unsubscribe,
+ stats, get topic name, is connected, etc.)
+- `ConsumerBuilder` has 40+ configuration methods with overlapping semantics
+- Timeouts use `(long, TimeUnit)` in some places and `long` millis in others
+- Nullable returns vs empty — inconsistent across the API
+- `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used,
clutters the API
+- SPI via reflection hack (`DefaultImplementation`) instead of standard
`ServiceLoader`
+
+A new module can start with a clean, minimal surface using modern Java idioms.
+
+## 3. Separate streaming vs queuing consumption
+
+The current `Consumer` mixes all four subscription types behind a single
interface:
+
+- `acknowledgeCumulative()` is available but throws at runtime for Shared
subscriptions
+- `negativeAcknowledge()` semantics differ between modes
+- `seek()` behavior varies depending on subscription type
+- Dead-letter policy only applies to Shared/Key_Shared
+
+This design means the compiler cannot help you — you discover misuse at
runtime. Splitting into
+purpose-built consumer types where each exposes only the operations that make
sense for its model
+improves both usability and correctness.
+
+## 4. Native support for connector frameworks
+
+Connector frameworks like Apache Flink and Apache Spark need to manage their
own offsets across
+all segments of a topic, take atomic snapshots, and seek back to a checkpoint
on recovery. The
+current API has no first-class support for this — connectors resort to
low-level `Reader` plus
+manual partition tracking plus brittle offset management.
+
+A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint`
objects provides a clean
+integration point.
+
+
+## Relationship to PIP-460 and long-term vision
+
+This PIP is a companion to [PIP-460: Scalable
Topics](https://github.com/apache/pulsar/blob/master/pip/pip-460.md),
+which defines the broker-side segment management, metadata storage, and admin
APIs. The V5
+client API is the primary interface for applications to use scalable topics —
while the
+protocol commands and segment routing could theoretically be added to the v4
client, the V5
+API was designed from the ground up to support the opaque,
dynamically-segmented topic model
+that scalable topics provide.
+
+The V5 API is designed to support all use cases currently supported by the
existing API:
+producing messages, consuming with ordered/shared/key-shared semantics,
transactions, schema
+evolution, and end-to-end encryption. It is not a subset — it is a full
replacement API. It
+also works with existing partitioned and non-partitioned topics, so
applications can adopt the
+new API without changing their topic infrastructure.
+
+The long-term vision is for scalable topics and the V5 API to become the
primary model,
+eventually deprecating partitioned/non-partitioned topics and the v4 API.
However, this
+deprecation is explicitly **not** planned for the 5.0 release. The 5.0 release
will ship both
+APIs side by side, with the V5 API recommended for new applications. A
subsequent PIP will
+detail the migration path and deprecation timeline.
+
+While this PIP covers the Java client, the same API model (purpose-built
consumer types, opaque
+topics, checkpoint-based connector support) will also be introduced in
non-Java client SDKs
+(Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will
mirror the same
+concepts and follow the same approach of supporting both old and new topic
types side by side.
+The non-Java SDKs will be covered by separate PIPs.
+
+
+# Goals
+
+## In Scope
+
+- A new `pulsar-client-api-v5` module with new Java interfaces for Producer,
StreamConsumer,
+ QueueConsumer, CheckpointConsumer, and PulsarClient
+- A new `pulsar-client-v5` implementation module that wraps the existing v4
client transport
+ and adds scalable topic routing
+- Support for all use cases currently supported by the existing API (produce,
consume with
+ ordered/shared/key-shared semantics, transactions, schema, encryption)
+- Purpose-built consumer types that separate streaming (ordered, cumulative
ack) from queuing
+ (parallel, individual ack) from checkpoint (unmanaged, framework-driven)
+- Opaque topic model where partition/segment details are hidden from the
application
+- Modern Java API conventions: `Duration`, `Instant`, `Optional`, records,
`ServiceLoader`
+- First-class transaction support in the main package
+- DAG watch protocol integration for live segment layout updates
+
+## Out of Scope
+
+- Changes to the existing `pulsar-client-api` — it remains fully supported and
unchanged
+- Changes to the wire protocol beyond what is needed for scalable topic DAG
watch
+- Broker-side scalable topic management (split/merge algorithms, load
balancing) — covered by
+ [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md) and
subsequent more
+ specific PIPs
+- Migration path from v4 to v5 API — will be detailed in a subsequent PIP
+- Implementation details — this PIP focuses on the public API surface
+- Deprecation of the existing API or partitioned/non-partitioned topic types
+- TableView equivalent in v5 — may be added in a follow-up PIP
+
+
+# High Level Design
+
+The V5 client API is shipped as two new modules alongside the existing client:
+
+```
+pulsar-client-api-v5 (interfaces and value types only — no implementation)
+pulsar-client-v5 (implementation, depends on pulsar-client for
transport)
+```
+
+The existing `pulsar-client-api` and `pulsar-client` modules are unchanged.
Applications
+can use v4 and v5 in the same JVM.
+
+## Entry point
+
+```java
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .build();
+```
+
+The `PulsarClient` interface provides builder methods for all
producer/consumer types:
+
+```
+PulsarClient
+ .newProducer(schema) -> ProducerBuilder -> Producer<T>
+ .newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T>
+ .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T>
+ .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder ->
CheckpointConsumer<T>
+ .newTransaction() -> Transaction
+```
+
+## Consumer types
+
+Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API
provides three
+distinct consumer types:
+
+```mermaid
+graph TD
+ A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
+ A -->|newQueueConsumer| C[QueueConsumer]
+ A -->|newCheckpointConsumer| D[CheckpointConsumer]
+
+ B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered pipelines]
+ C -->|"Parallel, individual ack"| F[Work queues, task processing]
+ D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]
+```
+
+**StreamConsumer** — Ordered consumption with cumulative acknowledgment. Maps
to
+Exclusive/Failover subscription semantics. Messages are delivered in order
(per-key when keyed).
+
+**QueueConsumer** — Unordered parallel consumption with individual
acknowledgment. Maps to
+Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack
timeout, and
+redelivery backoff.
+
+**CheckpointConsumer** — Unmanaged consumption for connector frameworks. No
subscription, no
+ack — position tracking is entirely external. Provides `checkpoint()` for
atomic position
+snapshots and `seek(Checkpoint)` for recovery.
+
+## Scalable topic integration
+
+When a V5 client connects to a `topic://` domain topic, it establishes a DAG
watch session
+with the broker. The broker sends the current segment layout (which segments
exist, their
+hash ranges, and which broker owns each) and pushes updates when the layout
changes (splits,
+merges).
+
+```mermaid
+sequenceDiagram
+ participant Client as V5 Client
+ participant Broker as Broker
+ participant Meta as Metadata Store
+
+ Client->>Broker: ScalableTopicLookup(topic)
+ Broker->>Meta: Watch topic DAG
+ Broker-->>Client: ScalableTopicUpdate(DAG)
+ Note over Client: Create per-segment producers/consumers
+
+ Meta-->>Broker: Segment split notification
+ Broker-->>Client: ScalableTopicUpdate(new DAG)
+ Note over Client: Add new segments, drain old
+```
+
+The `Producer` hashes message keys to determine which segment to send to,
maintaining one
+internal producer per active segment. When segments split or merge, the client
transparently
+creates new internal producers and drains old ones.
+
+## Sync/async model
+
+All types are sync-first with an `.async()` accessor:
+
+```java
+Producer<T> -> producer.async() -> AsyncProducer<T>
+StreamConsumer<T> -> consumer.async() -> AsyncStreamConsumer<T>
+QueueConsumer<T> -> consumer.async() -> AsyncQueueConsumer<T>
+CheckpointConsumer<T> -> consumer.async() -> AsyncCheckpointConsumer<T>
+Transaction -> txn.async() -> AsyncTransaction
+```
+
+Both views share the same underlying resources.
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Module structure
+
+```
+pulsar-client-api-v5/
+ org.apache.pulsar.client.api.v5
+ ├── PulsarClient, PulsarClientBuilder, PulsarClientException
+ ├── Producer, ProducerBuilder
+ ├── StreamConsumer, StreamConsumerBuilder
+ ├── QueueConsumer, QueueConsumerBuilder
+ ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
+ ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
+ ├── Transaction
+ ├── async/ (AsyncProducer, AsyncMessageBuilder, Async*Consumer,
AsyncTransaction)
+ ├── auth/ (Authentication, AuthenticationData, CryptoKeyReader, ...)
+ ├── config/ (BatchingPolicy, CompressionPolicy, TlsPolicy,
BackoffPolicy, ...)
+ ├── schema/ (Schema, SchemaInfo, SchemaType)
+ └── internal/ (PulsarClientProvider — ServiceLoader SPI)
+
+pulsar-client-v5/
+ org.apache.pulsar.client.impl.v5
+ ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
+ ├── ScalableTopicProducer, ProducerBuilderV5
+ ├── ScalableStreamConsumer, StreamConsumerBuilderV5
+ ├── ScalableQueueConsumer, QueueConsumerBuilderV5
+ ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
+ ├── DagWatchClient, ClientSegmentLayout, SegmentRouter
+ ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
+ ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
+ └── Async*V5 wrappers
+```
+
+### Key types
+
+**`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared
between sync and
+async message sending:
+
+```java
+interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> {
+ BUILDER value(T value);
+ BUILDER key(String key);
+ BUILDER property(String name, String value);
+ BUILDER eventTime(Instant eventTime);
+ BUILDER deliverAfter(Duration delay);
+ BUILDER deliverAt(Instant timestamp);
+ BUILDER transaction(Transaction txn);
+}
+```
+
+`MessageBuilder<T>` extends it with `MessageId send()`.
+`AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId> send()`.
+
+**`Checkpoint`** — Opaque, serializable position vector across all segments:
+
+```java
+interface Checkpoint {
+ byte[] toByteArray();
+ Instant creationTime();
+
+ static Checkpoint earliest();
+ static Checkpoint latest();
+ static Checkpoint atTimestamp(Instant timestamp);
+ static Checkpoint fromByteArray(byte[] data);
+}
+```
+
+Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment IDs
to positions.
+The format is forward-compatible — checkpoints saved with fewer segments can
be applied after
+splits/merges.
+
+**Configuration records** — Immutable records with static factories:
+
+| Record | Purpose | Example |
+|--------|---------|---------|
+| `BatchingPolicy` | Batching config |
`BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` |
+| `CompressionPolicy` | Compression codec |
`CompressionPolicy.of(CompressionType.ZSTD)` |
+| `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` |
+| `BackoffPolicy` | Retry backoff |
`BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))` |
+| `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` |
+| `EncryptionPolicy` | E2E encryption |
`EncryptionPolicy.forProducer(keyReader, "mykey")` |
+| `ChunkingPolicy` | Large msg chunking |
`ChunkingPolicy.of(MemorySize.ofMB(10))` |
+
+### SPI discovery
+
+Implementation is loaded via `java.util.ServiceLoader`:
+
+```java
+// In pulsar-client-api-v5
+public interface PulsarClientProvider {
+ PulsarClientBuilder newClientBuilder();
+ <T> Schema<T> jsonSchema(Class<T> clazz);
+ // ... factory methods for all SPI types
+}
+
+// In pulsar-client-v5
+//
META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
+// -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5
+```
+
+This replaces the reflection-based `DefaultImplementation` approach used in
the current API.
+
+## Public-facing Changes
+
+### Public API
+
+This PIP introduces a new public Java API. The existing `pulsar-client-api` is
unchanged.
+
+**New modules:**
+- `pulsar-client-api-v5` — interfaces and value types (compile dependency for
applications)
+- `pulsar-client-v5` — implementation (runtime dependency)
+
+**New interfaces (summary):**
+
+| Interface | Methods | Description |
+|-----------|---------|-------------|
+| `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`,
`newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`, `close()`
| Entry point |
+| `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send
messages |
+| `StreamConsumer<T>` | `receive()`, `receive(Duration)`,
`acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption |
+| `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`,
`negativeAcknowledge()`, `close()`, `async()` | Parallel consumption |
+| `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`, `checkpoint()`,
`seek()`, `close()`, `async()` | Framework consumption |
+
+### Configuration
+
+No new broker configuration is introduced by this PIP. The V5 client reuses
the existing
+`ClientConfigurationData` internally.
+
+### CLI
+
+No new CLI commands specific to the V5 API.
+
+### Metrics
+
+No new metrics are introduced by the V5 client API itself. The underlying v4
producers and
+consumers continue to emit their existing metrics.
+
+
+# Monitoring
+
+The V5 client wraps v4 producers and consumers internally, so existing
producer/consumer
+metrics (publish rate, latency, backlog, etc.) continue to work. Each internal
segment
+producer/consumer appears as a separate instance in metrics, identified by the
segment topic
+name.
+
+Operators should monitor:
+- Per-segment publish rates to detect hot segments (candidates for splitting)
+- DAG watch session reconnections (indicates broker restarts or network issues)
+- Segment producer creation/closure events in client logs during split/merge
operations
+
+
+# Security Considerations
+
+The V5 client API does not introduce new security mechanisms. It delegates all
authentication
+and authorization to the underlying v4 client:
+
+- Authentication is configured via `PulsarClientBuilder.authentication()` and
delegated to the
+ v4 `AuthenticationProvider` framework via `AuthenticationAdapter`
+- Topic-level authorization applies to the parent `topic://` name — accessing
the underlying
+ `segment://` topics uses the same tenant/namespace permissions
+- End-to-end encryption is supported via `EncryptionPolicy` on
`ProducerBuilder`, delegated to
+ the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter`
+- The new `CommandScalableTopicLookup` protocol command is sent only after the
connection is
+ authenticated and in the `Connected` state, consistent with other lookup
commands
+
+No new REST endpoints are introduced by the client API itself.
+
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+The V5 API is a new, additive module. No changes are required to existing
applications.
+This follows the same approach as the 2.0 API redesign, where the old API was
preserved in a
+separate compatibility module — except this time there is no breaking change
at all. The
+existing API is unchanged and existing applications require no modifications.
+
+- Applications using `pulsar-client-api` (v4) continue to work without
modification
+- New applications can adopt the V5 API by depending on `pulsar-client-v5`
+- The V5 API works with all topic types: scalable topics, partitioned topics,
and
+ non-partitioned topics — applications can migrate to the new API without
changing their
+ topic infrastructure
+- Both APIs can coexist in the same JVM — the V5 implementation wraps the v4
transport
+ internally
+- A detailed migration path from v4 to v5 will be provided in a subsequent PIP
+- A seamless migration path to convert existing partitioned and
non-partitioned topics to
+ scalable topics will also be provided, allowing applications to transition
their topic
+ infrastructure without data loss or downtime
+
+To adopt the V5 API, applications add `pulsar-client-v5` as a dependency and
use
+`PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` package.
+
+## Downgrade / Rollback
+
+Since the V5 API is a separate module, rollback is simply removing the
dependency and reverting
+to v4 API calls. No broker-side changes are required.
+
+Applications using `CheckpointConsumer` should note that saved `Checkpoint`
byte arrays are
+specific to the V5 implementation and cannot be used with v4
`Reader`/`Consumer`.
+
+## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+
+The V5 client API itself does not introduce geo-replication concerns — it
connects to whichever
+cluster it is configured for. However, geo-replication of scalable topics has
specific
+considerations (segment layout synchronization across clusters, cross-cluster
split/merge
+coordination) that will be detailed in a subsequent PIP.
+
+
+# Alternatives
+
+## Extend the existing Consumer interface
+
+We considered adding scalable topic support to the existing `Consumer`
interface by adding new
+methods for checkpoint/seek and hiding segment details internally. This was
rejected because:
+
+- The existing interface already has 60+ methods and is difficult to evolve
+- Adding checkpoint semantics alongside ack semantics would further confuse
the API
+- The type system cannot prevent misuse (e.g., calling
`acknowledgeCumulative()` on a Shared
+ subscription)
+- Removing partition-related methods (`TopicMessageId`, `MessageRouter`) would
break backward
+ compatibility
+
+## Builder-per-subscription-type on existing API
+
+We considered keeping a single `Consumer` type but using different builder
types per subscription
+mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted
methods). This was
+rejected because the returned `Consumer` would still expose all methods at the
type level — the
+restriction would only be in documentation, not enforced by the compiler.
+
+## Separate module vs extending existing module
+
+We chose a separate module (`pulsar-client-api-v5`) rather than adding new
interfaces to
+`pulsar-client-api` because:
+
+- The v5 API uses different naming conventions (`value()` vs `getValue()`),
different types
+ (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` vs
nullable)
+- Having both conventions in the same package would be confusing
+- A clean module boundary makes it clear which API generation an application
is using
+- The v4 API can eventually be deprecated without affecting v5 users
+
+
+# General Notes
+
+The V5 API targets Java 17, the same as the rest of Pulsar.
+
+The implementation module (`pulsar-client-v5`) wraps the existing v4
`pulsar-client` for all
+transport-level operations. This means bug fixes and performance improvements
to the v4 client
+automatically benefit V5 users, and the V5 module itself is relatively thin —
primarily routing
+logic and API adaptation.
+
+
+# Links
+
+* Mailing List discussion thread:
+* Mailing List voting thread: