This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch pip/pause-subscription-dispatching in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 55ec4602cb487146468f2fe89f4a666fb5666b1a Author: mattisonchao <[email protected]> AuthorDate: Tue Mar 24 15:44:29 2026 +0800 [PIP-459] Admin API to pause and resume subscription dispatching --- pip/pip-459.md | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/pip/pip-459.md b/pip/pip-459.md new file mode 100644 index 00000000000..f3eceba926a --- /dev/null +++ b/pip/pip-459.md @@ -0,0 +1,184 @@ +# PIP-459: Admin API to Pause and Resume Subscription Dispatching + +# Background knowledge + +Pulsar dispatchers are responsible for reading entries from a topic's managed ledger and delivering them to consumers of a subscription. The broker already has several internal mechanisms that can halt or slow dispatching: + +- **Unacked message blocking**: Stops dispatch when unacknowledged messages exceed the configured limit. +- **Ack state persistence pause (PIP-299)**: Stops dispatch when cursor metadata exceeds persistence limits. +- **Dispatch rate limiting**: Throttles delivery to a configured msgs/sec or bytes/sec. +- **Delayed delivery tracker**: Pauses reads when delayed messages need to be held back. + +All of these are automatic and reactive. None of them provide operators with a direct, intentional way to say "stop delivering messages to this subscription." + +The client-side `Consumer.pause()` API only suppresses permit flow from a single consumer instance. It does not prevent the broker from reading entries or dispatching to other consumers in the same subscription, and it requires coordination across all consumer instances. + +# Motivation + +Pulsar has broker-side controls for rate limiting, delayed delivery, and unacked message blocking — but no simple admin command to say "stop delivering to this subscription right now." + +Operators dealing with downstream incidents, consumer deployments, or backlog investigations need an immediate, atomic way to halt dispatch from a single admin command — without disconnecting consumers, losing cursor position, or coordinating across client instances. + +**Concrete scenarios:** + +1. **Incident response** — A downstream system is degraded. Every message dispatched triggers retries, DLQ entries, and more load. The operator needs to stop the bleeding immediately from one command. +2. **Consumer deployment** — Rolling out a new consumer version with a schema change. The operator wants to drain in-flight messages, pause dispatch, deploy, then resume. +3. **Backlog investigation** — While analyzing backlog composition via `analyzeBacklog` or peek, ongoing dispatch changes the state being inspected. + +# Goals + +## In Scope + +- Admin API (REST, Java client, CLI) to pause and resume dispatching for a specific subscription. +- Pause state persisted in cursor metadata so it survives broker restarts and topic ownership transfers. +- Connected consumers remain connected but receive no new messages while paused. +- Paused state visible in subscription stats. + +## Out of Scope + +- Topic-level pause (pausing all subscriptions at once). +- Automatic pause triggers (e.g., pause when error rate exceeds a threshold). +- Per-consumer pause within a shared subscription. + +# High Level Design + +A `paused` flag is added to the subscription, stored as a cursor property (`__paused`). When an operator calls the pause endpoint, this flag is set and the dispatcher's read loop stops initiating new reads. Connected consumers remain attached but receive no new messages. On resume, the flag is cleared and the dispatcher restarts its read loop. No messages are skipped or lost — the cursor position is unchanged throughout. + +# Detailed Design + +## Design & Implementation Details + +### Subscription Interface + +Add three methods to the `Subscription` interface: + +- `CompletableFuture<Void> pauseDispatching()` — Sets the paused flag and persists it. +- `CompletableFuture<Void> resumeDispatching()` — Clears the paused flag and triggers the dispatcher to resume reading. +- `boolean isDispatchingPaused()` — Returns the current paused state. + +### PersistentSubscription + +The paused state is stored as a cursor property with key `__paused`. On subscription load, the state is restored from cursor metadata. On resume, `dispatcher.checkAndResumeIfPaused()` is called to restart the read loop. + +### Dispatcher Changes + +In `PersistentDispatcherMultipleConsumers.readMoreEntries()` and `PersistentDispatcherSingleActiveConsumer.readMoreEntries()`, a check for `subscription.isDispatchingPaused()` is added at the top of the method, before any cursor read is initiated. + +### In-flight Messages + +Messages already dispatched to consumers before the pause call will still be processed. The pause only prevents new reads from the cursor. + +## Public-facing Changes + +### Public API + +**Pause subscription dispatching:** + +- **Path:** `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/dispatching/pause` +- **Query parameters:** None +- **HTTP body:** None +- **Response codes:** + - `204 No Content` — Dispatching paused successfully. + - `401 Unauthorized` — Client is not authenticated. + - `403 Forbidden` — Client does not have permission to pause dispatching for this subscription. + - `404 Not Found` — Topic or subscription does not exist. + - `412 Precondition Failed` — Topic is not a persistent topic. + +**Resume subscription dispatching:** + +- **Path:** `PUT /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/dispatching/resume` +- **Query parameters:** None +- **HTTP body:** None +- **Response codes:** + - `204 No Content` — Dispatching resumed successfully. + - `401 Unauthorized` — Client is not authenticated. + - `403 Forbidden` — Client does not have permission to resume dispatching for this subscription. + - `404 Not Found` — Topic or subscription does not exist. + - `412 Precondition Failed` — Topic is not a persistent topic. + +**Stats (existing endpoint, new field):** + +- **Path:** `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/stats` +- **New field in subscription stats:** + ```json + { + "subscriptions": { + "my-sub": { + "dispatchingPaused": true + } + } + } + ``` + +### Binary protocol + +No changes. + +### Configuration + +No new broker configuration. The feature uses existing subscription properties (cursor metadata) infrastructure. + +### CLI + +``` +pulsar-admin topics pause-subscription \ + persistent://tenant/ns/topic \ + --subscription my-sub +``` + +``` +pulsar-admin topics resume-subscription \ + persistent://tenant/ns/topic \ + --subscription my-sub +``` + +### Metrics + +| Name | Description | Labels | Unit | +|---|---|---|---| +| `pulsar_subscription_dispatching_paused` | Whether dispatching is paused for this subscription | `tenant`, `namespace`, `topic`, `subscription` | boolean (0/1) | + +# Monitoring + +Operators can set up alerts on `pulsar_subscription_dispatching_paused == 1` to detect subscriptions that have been paused longer than expected (e.g., someone paused during an incident and forgot to resume). The `dispatchingPaused` field in subscription stats can also be checked via the admin API for dashboards. + +# Security Considerations + +Pause and resume follow the same authorization model as other subscription admin operations. The endpoints require the caller to have admin permissions on the topic's namespace or tenant. No new permissions or roles are introduced. + +Multi-tenancy is preserved: a tenant can only pause/resume subscriptions on topics they have admin access to. The cursor property key `__paused` uses a reserved prefix to prevent collisions with user-defined subscription properties. + +# Backward & Forward Compatibility + +## Upgrade + +No special upgrade steps required. The feature is additive — new API endpoints and a new cursor property. Existing subscriptions are unaffected. + +## Downgrade / Rollback + +If rolling back to a version without this feature: + +1. Resume all paused subscriptions before downgrading. +2. If a subscription is left paused, the `__paused` cursor property will remain in metadata but will be ignored by older brokers — dispatching will resume normally. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +Cursor properties are local to each cluster. Pausing a subscription in one cluster does not affect the replicated subscription in another cluster. Each cluster must be paused/resumed independently. + +# Alternatives + +| Alternative | Limitation | +|---|---| +| Client-side `Consumer.pause()` | Requires coordinating all consumer instances. Does not prevent broker-side reads. Not practical during incidents. | +| Set dispatch rate to 0 or -1 | Rate = 0 means unlimited in Pulsar. Using -1 as a magic value conflates rate limiting with lifecycle control. Original rate setting is lost. | +| Unsubscribe + resubscribe | Destroys cursor position and consumer state. Requires re-creating consumers. | +| Disconnect all consumers | Causes reconnection storms and consumer rebalancing. Dispatching resumes immediately on reconnect. | + +# General Notes + +This PIP intentionally keeps the scope narrow — single subscription, admin-initiated, no automatic triggers — to minimize risk and review surface. Topic-level pause and automatic pause triggers can be follow-up PIPs built on top of this foundation. + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread:
