https://github.com/apache/pulsar/pull/25455

# PIP-466: New Java Client API (V5) with Scalable Topic Support

# Background Knowledge

Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary
interface for Java
applications since Pulsar's inception. The API surface has grown
organically over the years to
support partitioned topics, multiple subscription types, transactions,
schema evolution, and more.

**API versioning precedent.** Pulsar already went through a breaking API
redesign in the 2.0
release, where the old API was moved into a separate compatibility module
(`pulsar-client-1x-base`) and the main module was replaced with the new
API. This PIP takes
a less disruptive approach: the existing `pulsar-client-api` and
`pulsar-client` modules stay
completely unchanged, and the new API is introduced in an additional
`pulsar-client-v5` module.
Existing applications are unaffected; new applications can opt in to the V5
API.

**Partitioned topics** are Pulsar's current mechanism for topic-level
parallelism. A partitioned
topic is a collection of N independent internal topics (partitions), each
backed by a separate
managed ledger. The client is responsible for routing messages to
partitions (via `MessageRouter`)
and is exposed to partition-level details through `TopicMessageId`,
`getPartitionsForTopic()`,
and partition-specific consumers.

**Subscription types** control how messages are distributed to consumers.
Pulsar supports four
types — Exclusive, Failover, Shared, and Key_Shared — all accessed through
a single `Consumer`
interface. The interface exposes all operations regardless of which
subscription type is in use,
even though some operations (e.g., `acknowledgeCumulative()` on Shared)
throw at runtime.

**Scalable topics** ([PIP-460](
https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a new
server-side mechanism where a topic is composed of a DAG of hash-range
segments that can be
dynamically split and merged by the broker. Unlike partitioned topics, the
number of segments
is invisible to the client and can change at runtime without application
awareness. The client
receives segment layout updates via a dedicated protocol command (DAG watch
session) and routes
messages based on hash-range matching. This PIP defines the client API
designed to work with
scalable topics; the broker-side design is covered by PIP-460.


# Motivation

## 1. Remove partitioning from the client API

The current API leaks partitioning everywhere: `TopicMessageId` carries
partition indexes,
`getPartitionsForTopic()` exposes the count, `MessageRouter` forces the
application to make
routing decisions, and consumers can be bound to specific partitions. This
forces application
code to deal with what is fundamentally a server-side scalability concern.

With scalable topics, parallelism is achieved via hash-range segments
managed entirely by the
broker. The client should treat topics as opaque endpoints — per-key
ordering is guaranteed
when a key is specified, but the underlying parallelism (how many segments,
which broker owns
each) is invisible and dynamic.

The current API cannot cleanly support this model because partitioning is
baked into the type
system (`TopicMessageId`), the builder API (`MessageRouter`,
`MessageRoutingMode`), and the
consumer model (partition-specific consumers, `getPartitionsForTopic()`).

## 2. Simplify an oversized API

After years of organic growth, the API surface has accumulated significant
baggage:

- `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek,
pause, unsubscribe,
  stats, get topic name, is connected, etc.)
- `ConsumerBuilder` has 40+ configuration methods with overlapping semantics
- Timeouts use `(long, TimeUnit)` in some places and `long` millis in others
- Nullable returns vs empty — inconsistent across the API
- `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used,
clutters the API
- SPI via reflection hack (`DefaultImplementation`) instead of standard
`ServiceLoader`

A new module can start with a clean, minimal surface using modern Java
idioms.

## 3. Separate streaming vs queuing consumption

The current `Consumer` mixes all four subscription types behind a single
interface:

- `acknowledgeCumulative()` is available but throws at runtime for Shared
subscriptions
- `negativeAcknowledge()` semantics differ between modes
- `seek()` behavior varies depending on subscription type
- Dead-letter policy only applies to Shared/Key_Shared

This design means the compiler cannot help you — you discover misuse at
runtime. Splitting into
purpose-built consumer types where each exposes only the operations that
make sense for its model
improves both usability and correctness.

## 4. Native support for connector frameworks

Connector frameworks like Apache Flink and Apache Spark need to manage
their own offsets across
all segments of a topic, take atomic snapshots, and seek back to a
checkpoint on recovery. The
current API has no first-class support for this — connectors resort to
low-level `Reader` plus
manual partition tracking plus brittle offset management.

A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint`
objects provides a clean
integration point.


## Relationship to PIP-460 and long-term vision

This PIP is a companion to [PIP-460: Scalable Topics](
https://github.com/apache/pulsar/blob/master/pip/pip-460.md),
which defines the broker-side segment management, metadata storage, and
admin APIs. The V5
client API is the primary interface for applications to use scalable topics
— while the
protocol commands and segment routing could theoretically be added to the
v4 client, the V5
API was designed from the ground up to support the opaque,
dynamically-segmented topic model
that scalable topics provide.

The V5 API is designed to support all use cases currently supported by the
existing API:
producing messages, consuming with ordered/shared/key-shared semantics,
transactions, schema
evolution, and end-to-end encryption. It is not a subset — it is a full
replacement API. It
also works with existing partitioned and non-partitioned topics, so
applications can adopt the
new API without changing their topic infrastructure.

The long-term vision is for scalable topics and the V5 API to become the
primary model,
eventually deprecating partitioned/non-partitioned topics and the v4 API.
However, this
deprecation is explicitly **not** planned for the 5.0 release. The 5.0
release will ship both
APIs side by side, with the V5 API recommended for new applications. A
subsequent PIP will
detail the migration path and deprecation timeline.

While this PIP covers the Java client, the same API model (purpose-built
consumer types, opaque
topics, checkpoint-based connector support) will also be introduced in
non-Java client SDKs
(Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will
mirror the same
concepts and follow the same approach of supporting both old and new topic
types side by side.
The non-Java SDKs will be covered by separate PIPs.


# Goals

## In Scope

- A new `pulsar-client-api-v5` module with new Java interfaces for
Producer, StreamConsumer,
  QueueConsumer, CheckpointConsumer, and PulsarClient
- A new `pulsar-client-v5` implementation module that wraps the existing v4
client transport
  and adds scalable topic routing
- Support for all use cases currently supported by the existing API
(produce, consume with
  ordered/shared/key-shared semantics, transactions, schema, encryption)
- Purpose-built consumer types that separate streaming (ordered, cumulative
ack) from queuing
  (parallel, individual ack) from checkpoint (unmanaged, framework-driven)
- Opaque topic model where partition/segment details are hidden from the
application
- Modern Java API conventions: `Duration`, `Instant`, `Optional`, records,
`ServiceLoader`
- First-class transaction support in the main package
- DAG watch protocol integration for live segment layout updates

## Out of Scope

- Changes to the existing `pulsar-client-api` — it remains fully supported
and unchanged
- Changes to the wire protocol beyond what is needed for scalable topic DAG
watch
- Broker-side scalable topic management (split/merge algorithms, load
balancing) — covered by
  [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md)
and subsequent more
  specific PIPs
- Migration path from v4 to v5 API — will be detailed in a subsequent PIP
- Implementation details — this PIP focuses on the public API surface
- Deprecation of the existing API or partitioned/non-partitioned topic types
- TableView equivalent in v5 — may be added in a follow-up PIP


# High Level Design

The V5 client API is shipped as two new modules alongside the existing
client:

```
pulsar-client-api-v5    (interfaces and value types only — no
implementation)
pulsar-client-v5        (implementation, depends on pulsar-client for
transport)
```

The existing `pulsar-client-api` and `pulsar-client` modules are unchanged.
Applications
can use v4 and v5 in the same JVM.

## Entry point

```java
PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
```

The `PulsarClient` interface provides builder methods for all
producer/consumer types:

```
PulsarClient
  .newProducer(schema)            -> ProducerBuilder    -> Producer<T>
  .newStreamConsumer(schema)      -> StreamConsumerBuilder ->
StreamConsumer<T>
  .newQueueConsumer(schema)       -> QueueConsumerBuilder  ->
QueueConsumer<T>
  .newCheckpointConsumer(schema)  -> CheckpointConsumerBuilder ->
CheckpointConsumer<T>
  .newTransaction()               -> Transaction
```

## Consumer types

Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API
provides three
distinct consumer types:

```mermaid
graph TD
    A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
    A -->|newQueueConsumer| C[QueueConsumer]
    A -->|newCheckpointConsumer| D[CheckpointConsumer]

    B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered
pipelines]
    C -->|"Parallel, individual ack"| F[Work queues, task processing]
    D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]
```

**StreamConsumer** — Ordered consumption with cumulative acknowledgment.
Maps to
Exclusive/Failover subscription semantics. Messages are delivered in order
(per-key when keyed).

**QueueConsumer** — Unordered parallel consumption with individual
acknowledgment. Maps to
Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack
timeout, and
redelivery backoff.

**CheckpointConsumer** — Unmanaged consumption for connector frameworks. No
subscription, no
ack — position tracking is entirely external. Provides `checkpoint()` for
atomic position
snapshots and `seek(Checkpoint)` for recovery.

## Scalable topic integration

When a V5 client connects to a `topic://` domain topic, it establishes a
DAG watch session
with the broker. The broker sends the current segment layout (which
segments exist, their
hash ranges, and which broker owns each) and pushes updates when the layout
changes (splits,
merges).

```mermaid
sequenceDiagram
    participant Client as V5 Client
    participant Broker as Broker
    participant Meta as Metadata Store

    Client->>Broker: ScalableTopicLookup(topic)
    Broker->>Meta: Watch topic DAG
    Broker-->>Client: ScalableTopicUpdate(DAG)
    Note over Client: Create per-segment producers/consumers

    Meta-->>Broker: Segment split notification
    Broker-->>Client: ScalableTopicUpdate(new DAG)
    Note over Client: Add new segments, drain old
```

The `Producer` hashes message keys to determine which segment to send to,
maintaining one
internal producer per active segment. When segments split or merge, the
client transparently
creates new internal producers and drains old ones.

## Sync/async model

All types are sync-first with an `.async()` accessor:

```java
Producer<T>            -> producer.async()   -> AsyncProducer<T>
StreamConsumer<T>      -> consumer.async()   -> AsyncStreamConsumer<T>
QueueConsumer<T>       -> consumer.async()   -> AsyncQueueConsumer<T>
CheckpointConsumer<T>  -> consumer.async()   -> AsyncCheckpointConsumer<T>
Transaction            -> txn.async()        -> AsyncTransaction
```

Both views share the same underlying resources.


# Detailed Design

## Design & Implementation Details

### Module structure

```
pulsar-client-api-v5/
  org.apache.pulsar.client.api.v5
  ├── PulsarClient, PulsarClientBuilder, PulsarClientException
  ├── Producer, ProducerBuilder
  ├── StreamConsumer, StreamConsumerBuilder
  ├── QueueConsumer, QueueConsumerBuilder
  ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
  ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
  ├── Transaction
  ├── async/       (AsyncProducer, AsyncMessageBuilder, Async*Consumer,
AsyncTransaction)
  ├── auth/        (Authentication, AuthenticationData, CryptoKeyReader,
...)
  ├── config/      (BatchingPolicy, CompressionPolicy, TlsPolicy,
BackoffPolicy, ...)
  ├── schema/      (Schema, SchemaInfo, SchemaType)
  └── internal/    (PulsarClientProvider — ServiceLoader SPI)

pulsar-client-v5/
  org.apache.pulsar.client.impl.v5
  ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
  ├── ScalableTopicProducer, ProducerBuilderV5
  ├── ScalableStreamConsumer, StreamConsumerBuilderV5
  ├── ScalableQueueConsumer, QueueConsumerBuilderV5
  ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
  ├── DagWatchClient, ClientSegmentLayout, SegmentRouter
  ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
  ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
  └── Async*V5 wrappers
```

### Key types

**`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared
between sync and
async message sending:

```java
interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> {
    BUILDER value(T value);
    BUILDER key(String key);
    BUILDER property(String name, String value);
    BUILDER eventTime(Instant eventTime);
    BUILDER deliverAfter(Duration delay);
    BUILDER deliverAt(Instant timestamp);
    BUILDER transaction(Transaction txn);
}
```

`MessageBuilder<T>` extends it with `MessageId send()`.
`AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId>
send()`.

**`Checkpoint`** — Opaque, serializable position vector across all segments:

```java
interface Checkpoint {
    byte[] toByteArray();
    Instant creationTime();

    static Checkpoint earliest();
    static Checkpoint latest();
    static Checkpoint atTimestamp(Instant timestamp);
    static Checkpoint fromByteArray(byte[] data);
}
```

Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment
IDs to positions.
The format is forward-compatible — checkpoints saved with fewer segments
can be applied after
splits/merges.

**Configuration records** — Immutable records with static factories:

| Record | Purpose | Example |
|--------|---------|---------|
| `BatchingPolicy` | Batching config |
`BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` |
| `CompressionPolicy` | Compression codec |
`CompressionPolicy.of(CompressionType.ZSTD)` |
| `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` |
| `BackoffPolicy` | Retry backoff |
`BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))`
|
| `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` |
| `EncryptionPolicy` | E2E encryption |
`EncryptionPolicy.forProducer(keyReader, "mykey")` |
| `ChunkingPolicy` | Large msg chunking |
`ChunkingPolicy.of(MemorySize.ofMB(10))` |

### SPI discovery

Implementation is loaded via `java.util.ServiceLoader`:

```java
// In pulsar-client-api-v5
public interface PulsarClientProvider {
    PulsarClientBuilder newClientBuilder();
    <T> Schema<T> jsonSchema(Class<T> clazz);
    // ... factory methods for all SPI types
}

// In pulsar-client-v5
//
META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
// -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5
```

This replaces the reflection-based `DefaultImplementation` approach used in
the current API.

## Public-facing Changes

### Public API

This PIP introduces a new public Java API. The existing `pulsar-client-api`
is unchanged.

**New modules:**
- `pulsar-client-api-v5` — interfaces and value types (compile dependency
for applications)
- `pulsar-client-v5` — implementation (runtime dependency)

**New interfaces (summary):**

| Interface | Methods | Description |
|-----------|---------|-------------|
| `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`,
`newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`,
`close()` | Entry point |
| `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send
messages |
| `StreamConsumer<T>` | `receive()`, `receive(Duration)`,
`acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption |
| `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`,
`negativeAcknowledge()`, `close()`, `async()` | Parallel consumption |
| `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`,
`checkpoint()`, `seek()`, `close()`, `async()` | Framework consumption |

### Configuration

No new broker configuration is introduced by this PIP. The V5 client reuses
the existing
`ClientConfigurationData` internally.

### CLI

No new CLI commands specific to the V5 API.

### Metrics

No new metrics are introduced by the V5 client API itself. The underlying
v4 producers and
consumers continue to emit their existing metrics.


# Monitoring

The V5 client wraps v4 producers and consumers internally, so existing
producer/consumer
metrics (publish rate, latency, backlog, etc.) continue to work. Each
internal segment
producer/consumer appears as a separate instance in metrics, identified by
the segment topic
name.

Operators should monitor:
- Per-segment publish rates to detect hot segments (candidates for
splitting)
- DAG watch session reconnections (indicates broker restarts or network
issues)
- Segment producer creation/closure events in client logs during
split/merge operations


# Security Considerations

The V5 client API does not introduce new security mechanisms. It delegates
all authentication
and authorization to the underlying v4 client:

- Authentication is configured via `PulsarClientBuilder.authentication()`
and delegated to the
  v4 `AuthenticationProvider` framework via `AuthenticationAdapter`
- Topic-level authorization applies to the parent `topic://` name —
accessing the underlying
  `segment://` topics uses the same tenant/namespace permissions
- End-to-end encryption is supported via `EncryptionPolicy` on
`ProducerBuilder`, delegated to
  the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter`
- The new `CommandScalableTopicLookup` protocol command is sent only after
the connection is
  authenticated and in the `Connected` state, consistent with other lookup
commands

No new REST endpoints are introduced by the client API itself.


# Backward & Forward Compatibility

## Upgrade

The V5 API is a new, additive module. No changes are required to existing
applications.
This follows the same approach as the 2.0 API redesign, where the old API
was preserved in a
separate compatibility module — except this time there is no breaking
change at all. The
existing API is unchanged and existing applications require no
modifications.

- Applications using `pulsar-client-api` (v4) continue to work without
modification
- New applications can adopt the V5 API by depending on `pulsar-client-v5`
- The V5 API works with all topic types: scalable topics, partitioned
topics, and
  non-partitioned topics — applications can migrate to the new API without
changing their
  topic infrastructure
- Both APIs can coexist in the same JVM — the V5 implementation wraps the
v4 transport
  internally
- A detailed migration path from v4 to v5 will be provided in a subsequent
PIP
- A seamless migration path to convert existing partitioned and
non-partitioned topics to
  scalable topics will also be provided, allowing applications to
transition their topic
  infrastructure without data loss or downtime

To adopt the V5 API, applications add `pulsar-client-v5` as a dependency
and use
`PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` package.

## Downgrade / Rollback

Since the V5 API is a separate module, rollback is simply removing the
dependency and reverting
to v4 API calls. No broker-side changes are required.

Applications using `CheckpointConsumer` should note that saved `Checkpoint`
byte arrays are
specific to the V5 implementation and cannot be used with v4
`Reader`/`Consumer`.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

The V5 client API itself does not introduce geo-replication concerns — it
connects to whichever
cluster it is configured for. However, geo-replication of scalable topics
has specific
considerations (segment layout synchronization across clusters,
cross-cluster split/merge
coordination) that will be detailed in a subsequent PIP.


# Alternatives

## Extend the existing Consumer interface

We considered adding scalable topic support to the existing `Consumer`
interface by adding new
methods for checkpoint/seek and hiding segment details internally. This was
rejected because:

- The existing interface already has 60+ methods and is difficult to evolve
- Adding checkpoint semantics alongside ack semantics would further confuse
the API
- The type system cannot prevent misuse (e.g., calling
`acknowledgeCumulative()` on a Shared
  subscription)
- Removing partition-related methods (`TopicMessageId`, `MessageRouter`)
would break backward
  compatibility

## Builder-per-subscription-type on existing API

We considered keeping a single `Consumer` type but using different builder
types per subscription
mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted
methods). This was
rejected because the returned `Consumer` would still expose all methods at
the type level — the
restriction would only be in documentation, not enforced by the compiler.

## Separate module vs extending existing module

We chose a separate module (`pulsar-client-api-v5`) rather than adding new
interfaces to
`pulsar-client-api` because:

- The v5 API uses different naming conventions (`value()` vs `getValue()`),
different types
  (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` vs
nullable)
- Having both conventions in the same package would be confusing
- A clean module boundary makes it clear which API generation an
application is using
- The v4 API can eventually be deprecated without affecting v5 users


# General Notes

The V5 API targets Java 17, the same as the rest of Pulsar.

The implementation module (`pulsar-client-v5`) wraps the existing v4
`pulsar-client` for all
transport-level operations. This means bug fixes and performance
improvements to the v4 client
automatically benefit V5 users, and the V5 module itself is relatively thin
— primarily routing
logic and API adaptation.

--
Matteo Merli
<[email protected]>

Reply via email to