+1 (binding) -Lari
On 2026/04/06 16:17:01 Matteo Merli wrote: > Matteo Merli <[email protected]> > Wed, Apr 1, 9:45 AM (5 days ago) > to Dev > https://github.com/apache/pulsar/pull/25455 > > # PIP-466: New Java Client API (V5) with Scalable Topic Support > > # Background Knowledge > > Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary > interface for Java > applications since Pulsar's inception. The API surface has grown > organically over the years to > support partitioned topics, multiple subscription types, transactions, > schema evolution, and more. > > **API versioning precedent.** Pulsar already went through a breaking API > redesign in the 2.0 > release, where the old API was moved into a separate compatibility module > (`pulsar-client-1x-base`) and the main module was replaced with the new > API. This PIP takes > a less disruptive approach: the existing `pulsar-client-api` and > `pulsar-client` modules stay > completely unchanged, and the new API is introduced in an additional > `pulsar-client-v5` module. > Existing applications are unaffected; new applications can opt in to the V5 > API. > > **Partitioned topics** are Pulsar's current mechanism for topic-level > parallelism. A partitioned > topic is a collection of N independent internal topics (partitions), each > backed by a separate > managed ledger. The client is responsible for routing messages to > partitions (via `MessageRouter`) > and is exposed to partition-level details through `TopicMessageId`, > `getPartitionsForTopic()`, > and partition-specific consumers. > > **Subscription types** control how messages are distributed to consumers. > Pulsar supports four > types — Exclusive, Failover, Shared, and Key_Shared — all accessed through > a single `Consumer` > interface. The interface exposes all operations regardless of which > subscription type is in use, > even though some operations (e.g., `acknowledgeCumulative()` on Shared) > throw at runtime. > > **Scalable topics** ([PIP-460]( > https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a new > server-side mechanism where a topic is composed of a DAG of hash-range > segments that can be > dynamically split and merged by the broker. Unlike partitioned topics, the > number of segments > is invisible to the client and can change at runtime without application > awareness. The client > receives segment layout updates via a dedicated protocol command (DAG watch > session) and routes > messages based on hash-range matching. This PIP defines the client API > designed to work with > scalable topics; the broker-side design is covered by PIP-460. > > > # Motivation > > ## 1. Remove partitioning from the client API > > The current API leaks partitioning everywhere: `TopicMessageId` carries > partition indexes, > `getPartitionsForTopic()` exposes the count, `MessageRouter` forces the > application to make > routing decisions, and consumers can be bound to specific partitions. This > forces application > code to deal with what is fundamentally a server-side scalability concern. > > With scalable topics, parallelism is achieved via hash-range segments > managed entirely by the > broker. The client should treat topics as opaque endpoints — per-key > ordering is guaranteed > when a key is specified, but the underlying parallelism (how many segments, > which broker owns > each) is invisible and dynamic. > > The current API cannot cleanly support this model because partitioning is > baked into the type > system (`TopicMessageId`), the builder API (`MessageRouter`, > `MessageRoutingMode`), and the > consumer model (partition-specific consumers, `getPartitionsForTopic()`). > > ## 2. Simplify an oversized API > > After years of organic growth, the API surface has accumulated significant > baggage: > > - `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek, > pause, unsubscribe, > stats, get topic name, is connected, etc.) > - `ConsumerBuilder` has 40+ configuration methods with overlapping semantics > - Timeouts use `(long, TimeUnit)` in some places and `long` millis in others > - Nullable returns vs empty — inconsistent across the API > - `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used, > clutters the API > - SPI via reflection hack (`DefaultImplementation`) instead of standard > `ServiceLoader` > > A new module can start with a clean, minimal surface using modern Java > idioms. > > ## 3. Separate streaming vs queuing consumption > > The current `Consumer` mixes all four subscription types behind a single > interface: > > - `acknowledgeCumulative()` is available but throws at runtime for Shared > subscriptions > - `negativeAcknowledge()` semantics differ between modes > - `seek()` behavior varies depending on subscription type > - Dead-letter policy only applies to Shared/Key_Shared > > This design means the compiler cannot help you — you discover misuse at > runtime. Splitting into > purpose-built consumer types where each exposes only the operations that > make sense for its model > improves both usability and correctness. > > ## 4. Native support for connector frameworks > > Connector frameworks like Apache Flink and Apache Spark need to manage > their own offsets across > all segments of a topic, take atomic snapshots, and seek back to a > checkpoint on recovery. The > current API has no first-class support for this — connectors resort to > low-level `Reader` plus > manual partition tracking plus brittle offset management. > > A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint` > objects provides a clean > integration point. > > > ## Relationship to PIP-460 and long-term vision > > This PIP is a companion to [PIP-460: Scalable Topics]( > https://github.com/apache/pulsar/blob/master/pip/pip-460.md), > which defines the broker-side segment management, metadata storage, and > admin APIs. The V5 > client API is the primary interface for applications to use scalable topics > — while the > protocol commands and segment routing could theoretically be added to the > v4 client, the V5 > API was designed from the ground up to support the opaque, > dynamically-segmented topic model > that scalable topics provide. > > The V5 API is designed to support all use cases currently supported by the > existing API: > producing messages, consuming with ordered/shared/key-shared semantics, > transactions, schema > evolution, and end-to-end encryption. It is not a subset — it is a full > replacement API. It > also works with existing partitioned and non-partitioned topics, so > applications can adopt the > new API without changing their topic infrastructure. > > The long-term vision is for scalable topics and the V5 API to become the > primary model, > eventually deprecating partitioned/non-partitioned topics and the v4 API. > However, this > deprecation is explicitly **not** planned for the 5.0 release. The 5.0 > release will ship both > APIs side by side, with the V5 API recommended for new applications. A > subsequent PIP will > detail the migration path and deprecation timeline. > > While this PIP covers the Java client, the same API model (purpose-built > consumer types, opaque > topics, checkpoint-based connector support) will also be introduced in > non-Java client SDKs > (Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will > mirror the same > concepts and follow the same approach of supporting both old and new topic > types side by side. > The non-Java SDKs will be covered by separate PIPs. > > > # Goals > > ## In Scope > > - A new `pulsar-client-api-v5` module with new Java interfaces for > Producer, StreamConsumer, > QueueConsumer, CheckpointConsumer, and PulsarClient > - A new `pulsar-client-v5` implementation module that wraps the existing v4 > client transport > and adds scalable topic routing > - Support for all use cases currently supported by the existing API > (produce, consume with > ordered/shared/key-shared semantics, transactions, schema, encryption) > - Purpose-built consumer types that separate streaming (ordered, cumulative > ack) from queuing > (parallel, individual ack) from checkpoint (unmanaged, framework-driven) > - Opaque topic model where partition/segment details are hidden from the > application > - Modern Java API conventions: `Duration`, `Instant`, `Optional`, records, > `ServiceLoader` > - First-class transaction support in the main package > - DAG watch protocol integration for live segment layout updates > > ## Out of Scope > > - Changes to the existing `pulsar-client-api` — it remains fully supported > and unchanged > - Changes to the wire protocol beyond what is needed for scalable topic DAG > watch > - Broker-side scalable topic management (split/merge algorithms, load > balancing) — covered by > [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md) > and subsequent more > specific PIPs > - Migration path from v4 to v5 API — will be detailed in a subsequent PIP > - Implementation details — this PIP focuses on the public API surface > - Deprecation of the existing API or partitioned/non-partitioned topic types > - TableView equivalent in v5 — may be added in a follow-up PIP > > > # High Level Design > > The V5 client API is shipped as two new modules alongside the existing > client: > > ``` > pulsar-client-api-v5 (interfaces and value types only — no > implementation) > pulsar-client-v5 (implementation, depends on pulsar-client for > transport) > ``` > > The existing `pulsar-client-api` and `pulsar-client` modules are unchanged. > Applications > can use v4 and v5 in the same JVM. > > ## Entry point > > ```java > PulsarClient client = PulsarClient.builder() > .serviceUrl("pulsar://localhost:6650") > .build(); > ``` > > The `PulsarClient` interface provides builder methods for all > producer/consumer types: > > ``` > PulsarClient > .newProducer(schema) -> ProducerBuilder -> Producer<T> > .newStreamConsumer(schema) -> StreamConsumerBuilder -> > StreamConsumer<T> > .newQueueConsumer(schema) -> QueueConsumerBuilder -> > QueueConsumer<T> > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder -> > CheckpointConsumer<T> > .newTransaction() -> Transaction > ``` > > ## Consumer types > > Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API > provides three > distinct consumer types: > > ```mermaid > graph TD > A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer] > A -->|newQueueConsumer| C[QueueConsumer] > A -->|newCheckpointConsumer| D[CheckpointConsumer] > > B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered > pipelines] > C -->|"Parallel, individual ack"| F[Work queues, task processing] > D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors] > ``` > > **StreamConsumer** — Ordered consumption with cumulative acknowledgment. > Maps to > Exclusive/Failover subscription semantics. Messages are delivered in order > (per-key when keyed). > > **QueueConsumer** — Unordered parallel consumption with individual > acknowledgment. Maps to > Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack > timeout, and > redelivery backoff. > > **CheckpointConsumer** — Unmanaged consumption for connector frameworks. No > subscription, no > ack — position tracking is entirely external. Provides `checkpoint()` for > atomic position > snapshots and `seek(Checkpoint)` for recovery. > > ## Scalable topic integration > > When a V5 client connects to a `topic://` domain topic, it establishes a > DAG watch session > with the broker. The broker sends the current segment layout (which > segments exist, their > hash ranges, and which broker owns each) and pushes updates when the layout > changes (splits, > merges). > > ```mermaid > sequenceDiagram > participant Client as V5 Client > participant Broker as Broker > participant Meta as Metadata Store > > Client->>Broker: ScalableTopicLookup(topic) > Broker->>Meta: Watch topic DAG > Broker-->>Client: ScalableTopicUpdate(DAG) > Note over Client: Create per-segment producers/consumers > > Meta-->>Broker: Segment split notification > Broker-->>Client: ScalableTopicUpdate(new DAG) > Note over Client: Add new segments, drain old > ``` > > The `Producer` hashes message keys to determine which segment to send to, > maintaining one > internal producer per active segment. When segments split or merge, the > client transparently > creates new internal producers and drains old ones. > > ## Sync/async model > > All types are sync-first with an `.async()` accessor: > > ```java > Producer<T> -> producer.async() -> AsyncProducer<T> > StreamConsumer<T> -> consumer.async() -> AsyncStreamConsumer<T> > QueueConsumer<T> -> consumer.async() -> AsyncQueueConsumer<T> > CheckpointConsumer<T> -> consumer.async() -> AsyncCheckpointConsumer<T> > Transaction -> txn.async() -> AsyncTransaction > ``` > > Both views share the same underlying resources. > > > # Detailed Design > > ## Design & Implementation Details > > ### Module structure > > ``` > pulsar-client-api-v5/ > org.apache.pulsar.client.api.v5 > ├── PulsarClient, PulsarClientBuilder, PulsarClientException > ├── Producer, ProducerBuilder > ├── StreamConsumer, StreamConsumerBuilder > ├── QueueConsumer, QueueConsumerBuilder > ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint > ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder > ├── Transaction > ├── async/ (AsyncProducer, AsyncMessageBuilder, Async*Consumer, > AsyncTransaction) > ├── auth/ (Authentication, AuthenticationData, CryptoKeyReader, > ...) > ├── config/ (BatchingPolicy, CompressionPolicy, TlsPolicy, > BackoffPolicy, ...) > ├── schema/ (Schema, SchemaInfo, SchemaType) > └── internal/ (PulsarClientProvider — ServiceLoader SPI) > > pulsar-client-v5/ > org.apache.pulsar.client.impl.v5 > ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5 > ├── ScalableTopicProducer, ProducerBuilderV5 > ├── ScalableStreamConsumer, StreamConsumerBuilderV5 > ├── ScalableQueueConsumer, QueueConsumerBuilderV5 > ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5 > ├── DagWatchClient, ClientSegmentLayout, SegmentRouter > ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter > ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5 > └── Async*V5 wrappers > ``` > > ### Key types > > **`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared > between sync and > async message sending: > > ```java > interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> { > BUILDER value(T value); > BUILDER key(String key); > BUILDER property(String name, String value); > BUILDER eventTime(Instant eventTime); > BUILDER deliverAfter(Duration delay); > BUILDER deliverAt(Instant timestamp); > BUILDER transaction(Transaction txn); > } > ``` > > `MessageBuilder<T>` extends it with `MessageId send()`. > `AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId> > send()`. > > **`Checkpoint`** — Opaque, serializable position vector across all segments: > > ```java > interface Checkpoint { > byte[] toByteArray(); > Instant creationTime(); > > static Checkpoint earliest(); > static Checkpoint latest(); > static Checkpoint atTimestamp(Instant timestamp); > static Checkpoint fromByteArray(byte[] data); > } > ``` > > Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment > IDs to positions. > The format is forward-compatible — checkpoints saved with fewer segments > can be applied after > splits/merges. > > **Configuration records** — Immutable records with static factories: > > | Record | Purpose | Example | > |--------|---------|---------| > | `BatchingPolicy` | Batching config | > `BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` | > | `CompressionPolicy` | Compression codec | > `CompressionPolicy.of(CompressionType.ZSTD)` | > | `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` | > | `BackoffPolicy` | Retry backoff | > `BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))` > | > | `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` | > | `EncryptionPolicy` | E2E encryption | > `EncryptionPolicy.forProducer(keyReader, "mykey")` | > | `ChunkingPolicy` | Large msg chunking | > `ChunkingPolicy.of(MemorySize.ofMB(10))` | > > ### SPI discovery > > Implementation is loaded via `java.util.ServiceLoader`: > > ```java > // In pulsar-client-api-v5 > public interface PulsarClientProvider { > PulsarClientBuilder newClientBuilder(); > <T> Schema<T> jsonSchema(Class<T> clazz); > // ... factory methods for all SPI types > } > > // In pulsar-client-v5 > // > META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider > // -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5 > ``` > > This replaces the reflection-based `DefaultImplementation` approach used in > the current API. > > ## Public-facing Changes > > ### Public API > > This PIP introduces a new public Java API. The existing `pulsar-client-api` > is unchanged. > > **New modules:** > - `pulsar-client-api-v5` — interfaces and value types (compile dependency > for applications) > - `pulsar-client-v5` — implementation (runtime dependency) > > **New interfaces (summary):** > > | Interface | Methods | Description | > |-----------|---------|-------------| > | `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`, > `newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`, > `close()` | Entry point | > | `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send > messages | > | `StreamConsumer<T>` | `receive()`, `receive(Duration)`, > `acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption | > | `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`, > `negativeAcknowledge()`, `close()`, `async()` | Parallel consumption | > | `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`, > `checkpoint()`, `seek()`, `close()`, `async()` | Framework consumption | > > ### Configuration > > No new broker configuration is introduced by this PIP. The V5 client reuses > the existing > `ClientConfigurationData` internally. > > ### CLI > > No new CLI commands specific to the V5 API. > > ### Metrics > > No new metrics are introduced by the V5 client API itself. The underlying > v4 producers and > consumers continue to emit their existing metrics. > > > # Monitoring > > The V5 client wraps v4 producers and consumers internally, so existing > producer/consumer > metrics (publish rate, latency, backlog, etc.) continue to work. Each > internal segment > producer/consumer appears as a separate instance in metrics, identified by > the segment topic > name. > > Operators should monitor: > - Per-segment publish rates to detect hot segments (candidates for > splitting) > - DAG watch session reconnections (indicates broker restarts or network > issues) > - Segment producer creation/closure events in client logs during > split/merge operations > > > # Security Considerations > > The V5 client API does not introduce new security mechanisms. It delegates > all authentication > and authorization to the underlying v4 client: > > - Authentication is configured via `PulsarClientBuilder.authentication()` > and delegated to the > v4 `AuthenticationProvider` framework via `AuthenticationAdapter` > - Topic-level authorization applies to the parent `topic://` name — > accessing the underlying > `segment://` topics uses the same tenant/namespace permissions > - End-to-end encryption is supported via `EncryptionPolicy` on > `ProducerBuilder`, delegated to > the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter` > - The new `CommandScalableTopicLookup` protocol command is sent only after > the connection is > authenticated and in the `Connected` state, consistent with other lookup > commands > > No new REST endpoints are introduced by the client API itself. > > > # Backward & Forward Compatibility > > ## Upgrade > > The V5 API is a new, additive module. No changes are required to existing > applications. > This follows the same approach as the 2.0 API redesign, where the old API > was preserved in a > separate compatibility module — except this time there is no breaking > change at all. The > existing API is unchanged and existing applications require no > modifications. > > - Applications using `pulsar-client-api` (v4) continue to work without > modification > - New applications can adopt the V5 API by depending on `pulsar-client-v5` > - The V5 API works with all topic types: scalable topics, partitioned > topics, and > non-partitioned topics — applications can migrate to the new API without > changing their > topic infrastructure > - Both APIs can coexist in the same JVM — the V5 implementation wraps the > v4 transport > internally > - A detailed migration path from v4 to v5 will be provided in a subsequent > PIP > - A seamless migration path to convert existing partitioned and > non-partitioned topics to > scalable topics will also be provided, allowing applications to > transition their topic > infrastructure without data loss or downtime > > To adopt the V5 API, applications add `pulsar-client-v5` as a dependency > and use > `PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` package. > > ## Downgrade / Rollback > > Since the V5 API is a separate module, rollback is simply removing the > dependency and reverting > to v4 API calls. No broker-side changes are required. > > Applications using `CheckpointConsumer` should note that saved `Checkpoint` > byte arrays are > specific to the V5 implementation and cannot be used with v4 > `Reader`/`Consumer`. > > ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations > > The V5 client API itself does not introduce geo-replication concerns — it > connects to whichever > cluster it is configured for. However, geo-replication of scalable topics > has specific > considerations (segment layout synchronization across clusters, > cross-cluster split/merge > coordination) that will be detailed in a subsequent PIP. > > > # Alternatives > > ## Extend the existing Consumer interface > > We considered adding scalable topic support to the existing `Consumer` > interface by adding new > methods for checkpoint/seek and hiding segment details internally. This was > rejected because: > > - The existing interface already has 60+ methods and is difficult to evolve > - Adding checkpoint semantics alongside ack semantics would further confuse > the API > - The type system cannot prevent misuse (e.g., calling > `acknowledgeCumulative()` on a Shared > subscription) > - Removing partition-related methods (`TopicMessageId`, `MessageRouter`) > would break backward > compatibility > > ## Builder-per-subscription-type on existing API > > We considered keeping a single `Consumer` type but using different builder > types per subscription > mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted > methods). This was > rejected because the returned `Consumer` would still expose all methods at > the type level — the > restriction would only be in documentation, not enforced by the compiler. > > ## Separate module vs extending existing module > > We chose a separate module (`pulsar-client-api-v5`) rather than adding new > interfaces to > `pulsar-client-api` because: > > - The v5 API uses different naming conventions (`value()` vs `getValue()`), > different types > (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` vs > nullable) > - Having both conventions in the same package would be confusing > - A clean module boundary makes it clear which API generation an > application is using > - The v4 API can eventually be deprecated without affecting v5 users > > > # General Notes > > The V5 API targets Java 17, the same as the rest of Pulsar. > > The implementation module (`pulsar-client-v5`) wraps the existing v4 > `pulsar-client` for all > transport-level operations. This means bug fixes and performance > improvements to the v4 client > automatically benefit V5 users, and the V5 module itself is relatively thin > — primarily routing > logic and API adaptation. > > > -- > Matteo Merli > <[email protected]> >
