This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e4e7e1277a [feat] [pip] PIP-466: New Java Client API (V5) with 
Scalable Topic Support (#25455)
9e4e7e1277a is described below

commit 9e4e7e1277af795bf7a93294944563b4083ba7af
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 20 10:31:54 2026 -0700

    [feat] [pip] PIP-466: New Java Client API (V5) with Scalable Topic Support 
(#25455)
---
 pip/pip-466.md | 505 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 505 insertions(+)

diff --git a/pip/pip-466.md b/pip/pip-466.md
new file mode 100644
index 00000000000..b222f5f6b14
--- /dev/null
+++ b/pip/pip-466.md
@@ -0,0 +1,505 @@
+# PIP-466: New Java Client API (V5) with Scalable Topic Support
+
+# Background Knowledge
+
+Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary 
interface for Java
+applications since Pulsar's inception. The API surface has grown organically 
over the years to
+support partitioned topics, multiple subscription types, transactions, schema 
evolution, and more.
+
+**API versioning precedent.** Pulsar already went through a breaking API 
redesign in the 2.0
+release, where the old API was moved into a separate compatibility module
+(`pulsar-client-1x-base`) and the main module was replaced with the new API. 
This PIP takes
+a less disruptive approach: the existing `pulsar-client-api` and 
`pulsar-client` modules stay
+completely unchanged, and the new API is introduced in an additional 
`pulsar-client-v5` module.
+Existing applications are unaffected; new applications can opt in to the V5 
API.
+
+**Partitioned topics** are Pulsar's current mechanism for topic-level 
parallelism. A partitioned
+topic is a collection of N independent internal topics (partitions), each 
backed by a separate
+managed ledger. The client is responsible for routing messages to partitions 
(via `MessageRouter`)
+and is exposed to partition-level details through `TopicMessageId`, 
`getPartitionsForTopic()`,
+and partition-specific consumers.
+
+**Subscription types** control how messages are distributed to consumers. 
Pulsar supports four
+types — Exclusive, Failover, Shared, and Key_Shared — all accessed through a 
single `Consumer`
+interface. The interface exposes all operations regardless of which 
subscription type is in use,
+even though some operations (e.g., `acknowledgeCumulative()` on Shared) throw 
at runtime.
+
+**Scalable topics** 
([PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a 
new
+server-side mechanism where a topic is composed of a DAG of hash-range 
segments that can be
+dynamically split and merged by the broker. Unlike partitioned topics, the 
number of segments
+is invisible to the client and can change at runtime without application 
awareness. The client
+receives segment layout updates via a dedicated protocol command (DAG watch 
session) and routes
+messages based on hash-range matching. This PIP defines the client API 
designed to work with
+scalable topics; the broker-side design is covered by PIP-460.
+
+
+# Motivation
+
+## 1. Remove partitioning from the client API
+
+The current API leaks partitioning everywhere: `TopicMessageId` carries 
partition indexes,
+`getPartitionsForTopic()` exposes the count, `MessageRouter` forces the 
application to make
+routing decisions, and consumers can be bound to specific partitions. This 
forces application
+code to deal with what is fundamentally a server-side scalability concern.
+
+With scalable topics, parallelism is achieved via hash-range segments managed 
entirely by the
+broker. The client should treat topics as opaque endpoints — per-key ordering 
is guaranteed
+when a key is specified, but the underlying parallelism (how many segments, 
which broker owns
+each) is invisible and dynamic.
+
+The current API cannot cleanly support this model because partitioning is 
baked into the type
+system (`TopicMessageId`), the builder API (`MessageRouter`, 
`MessageRoutingMode`), and the
+consumer model (partition-specific consumers, `getPartitionsForTopic()`).
+
+## 2. Simplify an oversized API
+
+After years of organic growth, the API surface has accumulated significant 
baggage:
+
+- `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek, 
pause, unsubscribe,
+  stats, get topic name, is connected, etc.)
+- `ConsumerBuilder` has 40+ configuration methods with overlapping semantics
+- Timeouts use `(long, TimeUnit)` in some places and `long` millis in others
+- Nullable returns vs empty — inconsistent across the API
+- `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used, 
clutters the API
+- SPI via reflection hack (`DefaultImplementation`) instead of standard 
`ServiceLoader`
+
+A new module can start with a clean, minimal surface using modern Java idioms.
+
+## 3. Separate streaming vs queuing consumption
+
+The current `Consumer` mixes all four subscription types behind a single 
interface:
+
+- `acknowledgeCumulative()` is available but throws at runtime for Shared 
subscriptions
+- `negativeAcknowledge()` semantics differ between modes
+- `seek()` behavior varies depending on subscription type
+- Dead-letter policy only applies to Shared/Key_Shared
+
+This design means the compiler cannot help you — you discover misuse at 
runtime. Splitting into
+purpose-built consumer types where each exposes only the operations that make 
sense for its model
+improves both usability and correctness.
+
+## 4. Native support for connector frameworks
+
+Connector frameworks like Apache Flink and Apache Spark need to manage their 
own offsets across
+all segments of a topic, take atomic snapshots, and seek back to a checkpoint 
on recovery. The
+current API has no first-class support for this — connectors resort to 
low-level `Reader` plus
+manual partition tracking plus brittle offset management.
+
+A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint` 
objects provides a clean
+integration point.
+
+
+## Relationship to PIP-460 and long-term vision
+
+This PIP is a companion to [PIP-460: Scalable 
Topics](https://github.com/apache/pulsar/blob/master/pip/pip-460.md),
+which defines the broker-side segment management, metadata storage, and admin 
APIs. The V5
+client API is the primary interface for applications to use scalable topics — 
while the
+protocol commands and segment routing could theoretically be added to the v4 
client, the V5
+API was designed from the ground up to support the opaque, 
dynamically-segmented topic model
+that scalable topics provide.
+
+The V5 API is designed to support all use cases currently supported by the 
existing API:
+producing messages, consuming with ordered/shared/key-shared semantics, 
transactions, schema
+evolution, and end-to-end encryption. It is not a subset — it is a full 
replacement API. It
+also works with existing partitioned and non-partitioned topics, so 
applications can adopt the
+new API without changing their topic infrastructure.
+
+The long-term vision is for scalable topics and the V5 API to become the 
primary model,
+eventually deprecating partitioned/non-partitioned topics and the v4 API. 
However, this
+deprecation is explicitly **not** planned for the 5.0 release. The 5.0 release 
will ship both
+APIs side by side, with the V5 API recommended for new applications. A 
subsequent PIP will
+detail the migration path and deprecation timeline.
+
+While this PIP covers the Java client, the same API model (purpose-built 
consumer types, opaque
+topics, checkpoint-based connector support) will also be introduced in 
non-Java client SDKs
+(Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will 
mirror the same
+concepts and follow the same approach of supporting both old and new topic 
types side by side.
+The non-Java SDKs will be covered by separate PIPs.
+
+
+# Goals
+
+## In Scope
+
+- A new `pulsar-client-api-v5` module with new Java interfaces for Producer, 
StreamConsumer,
+  QueueConsumer, CheckpointConsumer, and PulsarClient
+- A new `pulsar-client-v5` implementation module that wraps the existing v4 
client transport
+  and adds scalable topic routing
+- Support for all use cases currently supported by the existing API (produce, 
consume with
+  ordered/shared/key-shared semantics, transactions, schema, encryption)
+- Purpose-built consumer types that separate streaming (ordered, cumulative 
ack) from queuing
+  (parallel, individual ack) from checkpoint (unmanaged, framework-driven)
+- Opaque topic model where partition/segment details are hidden from the 
application
+- Modern Java API conventions: `Duration`, `Instant`, `Optional`, records, 
`ServiceLoader`
+- First-class transaction support in the main package
+- DAG watch protocol integration for live segment layout updates
+
+## Out of Scope
+
+- Changes to the existing `pulsar-client-api` — it remains fully supported and 
unchanged
+- Changes to the wire protocol beyond what is needed for scalable topic DAG 
watch
+- Broker-side scalable topic management (split/merge algorithms, load 
balancing) — covered by
+  [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md) and 
subsequent more
+  specific PIPs
+- Migration path from v4 to v5 API — will be detailed in a subsequent PIP
+- Implementation details — this PIP focuses on the public API surface
+- Deprecation of the existing API or partitioned/non-partitioned topic types
+- TableView equivalent in v5 — may be added in a follow-up PIP
+
+
+# High Level Design
+
+The V5 client API is shipped as two new modules alongside the existing client:
+
+```
+pulsar-client-api-v5    (interfaces and value types only — no implementation)
+pulsar-client-v5        (implementation, depends on pulsar-client for 
transport)
+```
+
+The existing `pulsar-client-api` and `pulsar-client` modules are unchanged. 
Applications
+can use v4 and v5 in the same JVM.
+
+## Entry point
+
+```java
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+```
+
+The `PulsarClient` interface provides builder methods for all 
producer/consumer types:
+
+```
+PulsarClient
+  .newProducer(schema)            -> ProducerBuilder    -> Producer<T>
+  .newStreamConsumer(schema)      -> StreamConsumerBuilder -> StreamConsumer<T>
+  .newQueueConsumer(schema)       -> QueueConsumerBuilder  -> QueueConsumer<T>
+  .newCheckpointConsumer(schema)  -> CheckpointConsumerBuilder -> 
CheckpointConsumer<T>
+  .newTransaction()               -> Transaction
+```
+
+## Consumer types
+
+Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API 
provides three
+distinct consumer types:
+
+```mermaid
+graph TD
+    A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
+    A -->|newQueueConsumer| C[QueueConsumer]
+    A -->|newCheckpointConsumer| D[CheckpointConsumer]
+
+    B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered pipelines]
+    C -->|"Parallel, individual ack"| F[Work queues, task processing]
+    D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]
+```
+
+**StreamConsumer** — Ordered consumption with cumulative acknowledgment. Maps 
to
+Exclusive/Failover subscription semantics. Messages are delivered in order 
(per-key when keyed).
+
+**QueueConsumer** — Unordered parallel consumption with individual 
acknowledgment. Maps to
+Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack 
timeout, and
+redelivery backoff.
+
+**CheckpointConsumer** — Unmanaged consumption for connector frameworks. No 
subscription, no
+ack — position tracking is entirely external. Provides `checkpoint()` for 
atomic position
+snapshots and `seek(Checkpoint)` for recovery.
+
+## Scalable topic integration
+
+When a V5 client connects to a `topic://` domain topic, it establishes a DAG 
watch session
+with the broker. The broker sends the current segment layout (which segments 
exist, their
+hash ranges, and which broker owns each) and pushes updates when the layout 
changes (splits,
+merges).
+
+```mermaid
+sequenceDiagram
+    participant Client as V5 Client
+    participant Broker as Broker
+    participant Meta as Metadata Store
+
+    Client->>Broker: ScalableTopicLookup(topic)
+    Broker->>Meta: Watch topic DAG
+    Broker-->>Client: ScalableTopicUpdate(DAG)
+    Note over Client: Create per-segment producers/consumers
+
+    Meta-->>Broker: Segment split notification
+    Broker-->>Client: ScalableTopicUpdate(new DAG)
+    Note over Client: Add new segments, drain old
+```
+
+The `Producer` hashes message keys to determine which segment to send to, 
maintaining one
+internal producer per active segment. When segments split or merge, the client 
transparently
+creates new internal producers and drains old ones.
+
+## Sync/async model
+
+All types are sync-first with an `.async()` accessor:
+
+```java
+Producer<T>            -> producer.async()   -> AsyncProducer<T>
+StreamConsumer<T>      -> consumer.async()   -> AsyncStreamConsumer<T>
+QueueConsumer<T>       -> consumer.async()   -> AsyncQueueConsumer<T>
+CheckpointConsumer<T>  -> consumer.async()   -> AsyncCheckpointConsumer<T>
+Transaction            -> txn.async()        -> AsyncTransaction
+```
+
+Both views share the same underlying resources.
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Module structure
+
+```
+pulsar-client-api-v5/
+  org.apache.pulsar.client.api.v5
+  ├── PulsarClient, PulsarClientBuilder, PulsarClientException
+  ├── Producer, ProducerBuilder
+  ├── StreamConsumer, StreamConsumerBuilder
+  ├── QueueConsumer, QueueConsumerBuilder
+  ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
+  ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
+  ├── Transaction
+  ├── async/       (AsyncProducer, AsyncMessageBuilder, Async*Consumer, 
AsyncTransaction)
+  ├── auth/        (Authentication, AuthenticationData, CryptoKeyReader, ...)
+  ├── config/      (BatchingPolicy, CompressionPolicy, TlsPolicy, 
BackoffPolicy, ...)
+  ├── schema/      (Schema, SchemaInfo, SchemaType)
+  └── internal/    (PulsarClientProvider — ServiceLoader SPI)
+
+pulsar-client-v5/
+  org.apache.pulsar.client.impl.v5
+  ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
+  ├── ScalableTopicProducer, ProducerBuilderV5
+  ├── ScalableStreamConsumer, StreamConsumerBuilderV5
+  ├── ScalableQueueConsumer, QueueConsumerBuilderV5
+  ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
+  ├── DagWatchClient, ClientSegmentLayout, SegmentRouter
+  ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
+  ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
+  └── Async*V5 wrappers
+```
+
+### Key types
+
+**`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared 
between sync and
+async message sending:
+
+```java
+interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> {
+    BUILDER value(T value);
+    BUILDER key(String key);
+    BUILDER property(String name, String value);
+    BUILDER eventTime(Instant eventTime);
+    BUILDER deliverAfter(Duration delay);
+    BUILDER deliverAt(Instant timestamp);
+    BUILDER transaction(Transaction txn);
+}
+```
+
+`MessageBuilder<T>` extends it with `MessageId send()`.
+`AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId> send()`.
+
+**`Checkpoint`** — Opaque, serializable position vector across all segments:
+
+```java
+interface Checkpoint {
+    byte[] toByteArray();
+    Instant creationTime();
+
+    static Checkpoint earliest();
+    static Checkpoint latest();
+    static Checkpoint atTimestamp(Instant timestamp);
+    static Checkpoint fromByteArray(byte[] data);
+}
+```
+
+Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment IDs 
to positions.
+The format is forward-compatible — checkpoints saved with fewer segments can 
be applied after
+splits/merges.
+
+**Configuration records** — Immutable records with static factories:
+
+| Record | Purpose | Example |
+|--------|---------|---------|
+| `BatchingPolicy` | Batching config | 
`BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` |
+| `CompressionPolicy` | Compression codec | 
`CompressionPolicy.of(CompressionType.ZSTD)` |
+| `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` |
+| `BackoffPolicy` | Retry backoff | 
`BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))` |
+| `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` |
+| `EncryptionPolicy` | E2E encryption | 
`EncryptionPolicy.forProducer(keyReader, "mykey")` |
+| `ChunkingPolicy` | Large msg chunking | 
`ChunkingPolicy.of(MemorySize.ofMB(10))` |
+
+### SPI discovery
+
+Implementation is loaded via `java.util.ServiceLoader`:
+
+```java
+// In pulsar-client-api-v5
+public interface PulsarClientProvider {
+    PulsarClientBuilder newClientBuilder();
+    <T> Schema<T> jsonSchema(Class<T> clazz);
+    // ... factory methods for all SPI types
+}
+
+// In pulsar-client-v5
+// 
META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
+// -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5
+```
+
+This replaces the reflection-based `DefaultImplementation` approach used in 
the current API.
+
+## Public-facing Changes
+
+### Public API
+
+This PIP introduces a new public Java API. The existing `pulsar-client-api` is 
unchanged.
+
+**New modules:**
+- `pulsar-client-api-v5` — interfaces and value types (compile dependency for 
applications)
+- `pulsar-client-v5` — implementation (runtime dependency)
+
+**New interfaces (summary):**
+
+| Interface | Methods | Description |
+|-----------|---------|-------------|
+| `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`, 
`newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`, `close()` 
| Entry point |
+| `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send 
messages |
+| `StreamConsumer<T>` | `receive()`, `receive(Duration)`, 
`acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption |
+| `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`, 
`negativeAcknowledge()`, `close()`, `async()` | Parallel consumption |
+| `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`, `checkpoint()`, 
`seek()`, `close()`, `async()` | Framework consumption |
+
+### Configuration
+
+No new broker configuration is introduced by this PIP. The V5 client reuses 
the existing
+`ClientConfigurationData` internally.
+
+### CLI
+
+No new CLI commands specific to the V5 API.
+
+### Metrics
+
+No new metrics are introduced by the V5 client API itself. The underlying v4 
producers and
+consumers continue to emit their existing metrics.
+
+
+# Monitoring
+
+The V5 client wraps v4 producers and consumers internally, so existing 
producer/consumer
+metrics (publish rate, latency, backlog, etc.) continue to work. Each internal 
segment
+producer/consumer appears as a separate instance in metrics, identified by the 
segment topic
+name.
+
+Operators should monitor:
+- Per-segment publish rates to detect hot segments (candidates for splitting)
+- DAG watch session reconnections (indicates broker restarts or network issues)
+- Segment producer creation/closure events in client logs during split/merge 
operations
+
+
+# Security Considerations
+
+The V5 client API does not introduce new security mechanisms. It delegates all 
authentication
+and authorization to the underlying v4 client:
+
+- Authentication is configured via `PulsarClientBuilder.authentication()` and 
delegated to the
+  v4 `AuthenticationProvider` framework via `AuthenticationAdapter`
+- Topic-level authorization applies to the parent `topic://` name — accessing 
the underlying
+  `segment://` topics uses the same tenant/namespace permissions
+- End-to-end encryption is supported via `EncryptionPolicy` on 
`ProducerBuilder`, delegated to
+  the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter`
+- The new `CommandScalableTopicLookup` protocol command is sent only after the 
connection is
+  authenticated and in the `Connected` state, consistent with other lookup 
commands
+
+No new REST endpoints are introduced by the client API itself.
+
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+The V5 API is a new, additive module. No changes are required to existing 
applications.
+This follows the same approach as the 2.0 API redesign, where the old API was 
preserved in a
+separate compatibility module — except this time there is no breaking change 
at all. The
+existing API is unchanged and existing applications require no modifications.
+
+- Applications using `pulsar-client-api` (v4) continue to work without 
modification
+- New applications can adopt the V5 API by depending on `pulsar-client-v5`
+- The V5 API works with all topic types: scalable topics, partitioned topics, 
and
+  non-partitioned topics — applications can migrate to the new API without 
changing their
+  topic infrastructure
+- Both APIs can coexist in the same JVM — the V5 implementation wraps the v4 
transport
+  internally
+- A detailed migration path from v4 to v5 will be provided in a subsequent PIP
+- A seamless migration path to convert existing partitioned and 
non-partitioned topics to
+  scalable topics will also be provided, allowing applications to transition 
their topic
+  infrastructure without data loss or downtime
+
+To adopt the V5 API, applications add `pulsar-client-v5` as a dependency and 
use
+`PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` package.
+
+## Downgrade / Rollback
+
+Since the V5 API is a separate module, rollback is simply removing the 
dependency and reverting
+to v4 API calls. No broker-side changes are required.
+
+Applications using `CheckpointConsumer` should note that saved `Checkpoint` 
byte arrays are
+specific to the V5 implementation and cannot be used with v4 
`Reader`/`Consumer`.
+
+## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
+
+The V5 client API itself does not introduce geo-replication concerns — it 
connects to whichever
+cluster it is configured for. However, geo-replication of scalable topics has 
specific
+considerations (segment layout synchronization across clusters, cross-cluster 
split/merge
+coordination) that will be detailed in a subsequent PIP.
+
+
+# Alternatives
+
+## Extend the existing Consumer interface
+
+We considered adding scalable topic support to the existing `Consumer` 
interface by adding new
+methods for checkpoint/seek and hiding segment details internally. This was 
rejected because:
+
+- The existing interface already has 60+ methods and is difficult to evolve
+- Adding checkpoint semantics alongside ack semantics would further confuse 
the API
+- The type system cannot prevent misuse (e.g., calling 
`acknowledgeCumulative()` on a Shared
+  subscription)
+- Removing partition-related methods (`TopicMessageId`, `MessageRouter`) would 
break backward
+  compatibility
+
+## Builder-per-subscription-type on existing API
+
+We considered keeping a single `Consumer` type but using different builder 
types per subscription
+mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted 
methods). This was
+rejected because the returned `Consumer` would still expose all methods at the 
type level — the
+restriction would only be in documentation, not enforced by the compiler.
+
+## Separate module vs extending existing module
+
+We chose a separate module (`pulsar-client-api-v5`) rather than adding new 
interfaces to
+`pulsar-client-api` because:
+
+- The v5 API uses different naming conventions (`value()` vs `getValue()`), 
different types
+  (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` vs 
nullable)
+- Having both conventions in the same package would be confusing
+- A clean module boundary makes it clear which API generation an application 
is using
+- The v4 API can eventually be deprecated without affecting v5 users
+
+
+# General Notes
+
+The V5 API targets Java 17, the same as the rest of Pulsar.
+
+The implementation module (`pulsar-client-v5`) wraps the existing v4 
`pulsar-client` for all
+transport-level operations. This means bug fixes and performance improvements 
to the v4 client
+automatically benefit V5 users, and the V5 module itself is relatively thin — 
primarily routing
+logic and API adaptation.
+
+
+# Links
+
+* Mailing List discussion thread:
+* Mailing List voting thread:

Reply via email to