This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab1331e02c [improve][broker] PIP-483: Scalable Topic Auto Split/Merge 
(#25938)
1ab1331e02c is described below

commit 1ab1331e02ccc0a365cdaa3ed0383a11343637d8
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 11 09:39:21 2026 -0700

    [improve][broker] PIP-483: Scalable Topic Auto Split/Merge (#25938)
---
 pip/pip-483.md | 285 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 285 insertions(+)

diff --git a/pip/pip-483.md b/pip/pip-483.md
new file mode 100644
index 00000000000..2575c541c02
--- /dev/null
+++ b/pip/pip-483.md
@@ -0,0 +1,285 @@
+# PIP-483: Scalable Topic Auto Split/Merge
+
+*Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)*
+
+## Motivation
+
+[PIP-468](pip-468.md) gives the Scalable Topic Controller the ability to 
**split** and **merge** segments, but only on explicit operator request via the 
admin API. An operator has to watch the topic, decide it is hot, and issue a 
split — and later notice it has gone cold and issue a merge. This is the same 
operational toil that partition-count management imposes on classic Pulsar 
topics, which scalable topics were meant to eliminate.
+
+This PIP adds an **auto-scaling policy** to the controller: the controller 
leader observes per-segment load and per-subscription consumer pressure, and 
autonomously splits hot segments and merges cold ones, within hard caps that 
prevent runaway growth and split/merge flip-flopping.
+
+The design is built around three principles that came out of the design 
discussion:
+
+1. **Splits are fast; merges are lazy.** A split protects throughput and 
latency under load, so it fires quickly — with only a short cooldown to 
coalesce bursts of near-simultaneous triggers (e.g. a group of consumers 
connecting in rapid succession). A merge is purely an efficiency reclaim, so it 
can wait, be rate-limited, and be skipped when in doubt.
+2. **The controller reacts, it does not poll.** New stream/checkpoint 
consumers register directly with the controller, so consumer-count changes are 
handled event-driven within seconds. Load data is *pushed* into the metadata 
store by each segment's owning broker (only when it changes materially) and 
read by the controller leader. The controller never fans out RPCs to segment 
owners.
+3. **The decision is a pure function.** Given a snapshot of load + layout + 
policy, the split/merge decision is deterministic and unit-testable in 
isolation from all I/O.
+
+---
+
+## Goals
+
+- Automatically increase segment count when a topic is under ingest/dispatch 
load or has more stream/checkpoint consumers than segments.
+- Automatically decrease segment count when load subsides, reclaiming broker 
resources.
+- Bound growth (`maxSegments`) and bound split↔merge churn (`maxDagDepth`, 
asymmetric cooldown).
+- Default-on cluster-wide, with per-namespace and per-topic overrides, 
following Pulsar's existing policy-resolution conventions.
+
+### Non-goals
+
+- **Broker placement / rebalancing.** Which broker owns a segment's bundle is 
the load balancer's job; this PIP only changes *how many* segments exist.
+- **Key-aware or non-midpoint splits.** Splits use the existing midpoint-split 
mechanism from PIP-468.
+- **Cross-topic global optimization.** Each topic's controller decides 
independently.
+
+---
+
+## Design
+
+### Overview
+
+```
+┌────────────────────────────────────────────────────────────────┐
+│ Segment-owning broker (per active segment)                      │
+│   SegmentLoadReporter                                            │
+│   - samples the segment topic's TopicStats                      │
+│   - writes SegmentLoadStats to metadata ONLY on material change │
+└───────────────────────────────┬────────────────────────────────┘
+                                 │ (metadata store, push-on-change)
+                                 ▼
+┌────────────────────────────────────────────────────────────────┐
+│ Controller leader (per scalable topic)                          │
+│                                                                  │
+│  Event-driven — within seconds:                                 │
+│    on STREAM/CHECKPOINT consumer register/unregister            │
+│      (consumers already register with the controller — no poll) │
+│      → evaluate the consumer-count split rule immediately       │
+│                                                                  │
+│  Periodic AutoScaleTick — traffic, default 60s:                 │
+│    1. read SegmentLoadStats for all active segments             │
+│    2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None   │
+│    3. dispatch to existing splitSegment / mergeSegments         │
+└────────────────────────────────────────────────────────────────┘
+```
+
+The two trigger sources reflect their different latency needs: a new consumer 
should get its own segment **within seconds**, so it is handled the instant the 
consumer registers with the controller; traffic shifts up or down over **a 
minute or more**, so they are evaluated on a slower periodic tick. The only new 
persistent state is `SegmentLoadStats`. The split/merge *mechanics* are 
entirely reused from PIP-468.
+
+### Load reporting: push-to-metadata, not pull-per-tick
+
+Each segment's owning broker runs a **`SegmentLoadReporter`** for every ACTIVE 
`segment://` topic it hosts. The broker writes `SegmentLoadStats` **directly to 
the metadata store** — it already has the rates in memory, so there is no REST 
round-trip and no controller-initiated pull. On a fixed sampling interval it 
compares the current rates to the last ones written and writes **only when a 
rate changes by more than a significant threshold** (default ±25%) since the 
last write. A steady-st [...]
+
+#### `SegmentLoadStats` (new metadata record)
+
+Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`:
+
+```json
+{
+  "msgRateIn": 12000.0,
+  "bytesRateIn": 64000000.0,
+  "msgRateOut": 48000.0,
+  "bytesRateOut": 256000000.0
+}
+```
+
+| Field | Source on the owning broker | Meaning for auto split/merge |
+|-------|------------------------------|---------------------------|
+| `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s rolling) | 
ingest load |
+| `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` | dispatch/fanout 
load (high for topics with many subscriptions) |
+
+The record carries no timestamp of its own: the metadata store's `Stat` for 
the znode already exposes creation and last-modified timestamps, and the 
controller uses the **modified timestamp** for windowing. A record that still 
reads "cold" with an old modified time proves the segment has been cold for 
`now − modifiedAt` — so split/merge **windows derive from the store's `Stat`** 
with no per-tick history buffer and no extra field.
+
+#### Significant-change threshold
+
+To avoid rewriting on every minor wobble, the reporter only writes when a rate 
has moved by more than `scalableTopicLoadReportRateChangeThreshold` (default 
25%) relative to the last value written for that segment. Sampling cadence is 
`scalableTopicLoadReportInterval` (default 10s). Both are configurable via 
`broker.conf`.
+
+### Subscription types and what each load type drives
+
+Recall from PIP-468 that scalable-topic subscriptions are `STREAM` 
(controller-managed, 1:1 segment↔consumer assignment; covers both 
StreamConsumer and CheckpointConsumer) or `QUEUE` (controller-bypassing; every 
consumer attaches to every segment and the broker round-robins).
+
+| Trigger | STREAM subscriptions | QUEUE subscriptions |
+|---------|----------------------|----------------------|
+| Consumer-count scale-up | **Yes** — more segments give more 1:1 parallelism 
| **No** — queue consumers share segments; more segments don't add parallelism 
for them |
+| Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still loads the 
segment's broker and counts toward the per-segment rate |
+
+So a topic with only QUEUE subscriptions never splits on consumer count, but 
still splits when any segment's in/out rate crosses threshold.
+
+### The decision: `AutoScalePolicyEvaluator`
+
+A pure function with no I/O:
+
+```
+decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
+    → Split(segmentId) | Merge(segmentId1, segmentId2) | None
+```
+
+It runs in two passes — **split first (short cooldown), then merge (long 
cooldown)** — and emits at most one action per invocation.
+
+#### Pass 1 — SPLIT (fast, lightly coalesced)
+
+Splits fire as soon as conditions are met, bounded by `maxSegments`, an 
in-flight-operation guard, and a **short `splitCooldown` (default 1 min)**. The 
cooldown is deliberately short: it exists only to coalesce a burst of 
near-simultaneous triggers — e.g. a group of consumers connecting in rapid 
succession should cause one split, not N — while still letting a genuinely 
growing topic split again on the next minute.
+
+```
+if activeSegments >= maxSegments: skip split pass
+if now - lastSplitAtMs < splitCooldown: skip split pass
+
+(a) Consumer-driven:
+      required = max over STREAM subscriptions of consumerCount   // 
per-subscription max
+      if required > activeSegments:
+          → Split(busiest active segment by msgRateIn)
+
+(b) Load-driven (if (a) didn't fire):
+      candidate segments = active segments where ANY of:
+        - msgRateIn   > splitMsgRateInThreshold
+        - bytesRateIn > splitBytesRateInThreshold
+        - msgRateOut  > splitMsgRateOutThreshold
+        - bytesRateOut> splitBytesRateOutThreshold
+      if candidates non-empty:
+          → Split(most-overloaded candidate); set lastSplitAtMs = now
+```
+
+The consumer-driven rule (a) is what the **event-driven path** evaluates the 
moment a consumer registers, so a new consumer gets a segment within seconds 
(subject to `splitCooldown`). The load-driven rule (b) runs on the periodic 
tick. Because `msgRateIn` etc. are already 60-second rolling averages on the 
broker, a value over threshold already represents *sustained* load — no extra 
split window is needed to filter transient spikes.
+
+#### Pass 2 — MERGE (lazy, rate-limited)
+
+Merges run only if no split fired this tick, the topic is not within 
`mergeCooldown` of its last merge, and the result respects `maxDagDepth`.
+
+```
+if a split fired this tick: skip merge pass
+if now - lastMergeAtMs < mergeCooldown: skip merge pass
+if activeSegments <= minSegments: skip merge pass
+
+candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
+                  for at least mergeWindow (checked via the store Stat's 
modified time):
+    - msgRateIn   < mergeMsgRateInThreshold
+    - bytesRateIn < mergeBytesRateInThreshold
+    - msgRateOut  < mergeMsgRateOutThreshold
+    - bytesRateOut< mergeBytesRateOutThreshold
+  AND neither segment's lineage is already at maxDagDepth merges
+
+if candidate pairs non-empty:
+    → Merge(coldest pair by combined rate); set lastMergeAtMs = now
+```
+
+Adjacency is required because the existing `mergeSegments` API only merges 
hash-range-adjacent active segments.
+
+### Anti-flip-flop: three independent guards
+
+1. **Threshold gap (hysteresis).** Split thresholds are well above merge 
thresholds for every metric. The dead-band between them is what prevents a 
just-merged segment from immediately re-qualifying for a split.
+2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1 min) 
that only coalesces bursts. Merges: a longer `mergeCooldown` (default 5 min) 
plus a `mergeWindow` (default 5 min) during which the segment must have stayed 
cold. A pair must be *durably* cold to merge, but a segment can split again 
within a minute of getting hot.
+3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how many 
merges a given lineage can accumulate. Once reached, that lineage stops being a 
merge candidate — **but load-driven splits still fire.** This bounds the number 
of split↔merge cycles a hash range can churn through while never blocking a 
split that throughput requires.
+
+> **Design note — direction of the depth cap.** The cap restricts *merges*, 
not splits. Splits are needed for correctness/performance and must always be 
available; merges are the optional efficiency step and are the ones that, 
combined with splits, could oscillate. `dagDepth` therefore counts **merges in 
a segment's lineage**, derived from the existing `parentIds`/`childIds` DAG in 
`ScalableTopicMetadata` — splits do not consume depth budget.
+
+### Caps
+
+| Cap | Default | Effect |
+|-----|---------|--------|
+| `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. |
+| `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. |
+| `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits 
unaffected. |
+
+### Manual operations and cooldown
+
+- Manual `admin.scalableTopics().splitSegment(...)` **sets `lastSplitAtMs`**, 
so a manual split also starts the short auto-split cooldown.
+- Manual `admin.scalableTopics().mergeSegments(...)` **sets `lastMergeAtMs`**, 
so the operator's manual efficiency action also rate-limits the controller's 
automatic merges.
+
+### Evaluation triggers
+
+The controller leader evaluates auto split/merge from two sources:
+
+- **Event-driven (within seconds)** — when a STREAM/CHECKPOINT consumer 
registers with or unregisters from the controller, it immediately evaluates the 
consumer-count split rule. No polling: consumer registration already flows 
through the controller (PIP-468).
+- **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC tick 
from PIP-468), default cadence `scalableTopicAutoScaleInterval = 60s`, 
evaluates the traffic-driven split rules and the merge pass. Per tick it does 
one metadata batch-read of the topic's `segments/*/load` records (or maintains 
a watch cache), evaluates, and dispatches.
+
+Both sources call the same `AutoScalePolicyEvaluator`; the event-driven path 
only needs the consumer-count rule, so it is cheap. Both are cancelled on 
leadership loss / close.
+
+---
+
+## Public-Facing Changes
+
+### Configuration (`broker.conf`)
+
+Auto-scaling is **default-on cluster-wide**; these are the defaults applied to 
every scalable topic that does not override them.
+
+| Property | Default | Description |
+|----------|---------|-------------|
+| `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto 
split/merge. |
+| `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic) evaluation 
cadence. Consumer-count changes are handled event-driven, independent of this. |
+| `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. |
+| `scalableTopicMinSegments` | `1` | Hard floor on active segments. |
+| `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before merges 
are disabled for it. |
+| `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic splits 
on a topic (coalesces bursts). |
+| `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic merges 
on a topic. |
+| `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold before 
merging. |
+| `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest split 
trigger. |
+| `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest split 
trigger. |
+| `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch split 
trigger. |
+| `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s dispatch split 
trigger. |
+| `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge 
trigger. |
+| `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest merge 
trigger. |
+| `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch merge 
trigger. |
+| `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s dispatch merge 
trigger. |
+| `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling interval. 
|
+| `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate change 
since the last write that triggers a new `SegmentLoadStats` write. |
+
+### Policy resolution (namespace + topic overrides)
+
+Following the existing `autoTopicCreationOverride` pattern, an 
`AutoScalePolicyOverride` can be set at two levels; resolution is 
most-specific-wins, falling back to `broker.conf`:
+
+1. **Per-topic** — a new `autoScalePolicy` field on `ScalableTopicMetadata`, 
set via `admin.scalableTopics().setAutoScalePolicy(topic, policy)` / 
`getAutoScalePolicy(topic)`.
+2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on 
`Policies`, set via `admin.namespaces().setScalableTopicAutoScalePolicy(ns, 
policy)`.
+
+`AutoScalePolicyOverride` carries the same knobs as the broker config (all 
optional; unset fields fall through). Setting `enabled = false` opts a topic or 
namespace out entirely.
+
+### Admin Client API
+
+```java
+interface ScalableTopics {
+    // ... existing ...
+    void setAutoScalePolicy(String topic, AutoScalePolicyOverride policy) 
throws PulsarAdminException;
+    AutoScalePolicyOverride getAutoScalePolicy(String topic) throws 
PulsarAdminException;
+    void removeAutoScalePolicy(String topic) throws PulsarAdminException;
+}
+
+interface Namespaces {
+    // ... existing ...
+    void setScalableTopicAutoScalePolicy(String namespace, 
AutoScalePolicyOverride policy) ...;
+    AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) 
...;
+    void removeScalableTopicAutoScalePolicy(String namespace) ...;
+}
+```
+
+### Metadata Store Paths
+
+| Path | Content | Writer |
+|------|---------|--------|
+| `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` | 
`SegmentLoadStats` JSON | segment-owning broker |
+
+(`autoScalePolicy` rides inside the existing `ScalableTopicMetadata` blob; the 
namespace override rides inside `Policies`. No other new paths.)
+
+### Observability
+
+- New per-topic metrics: `pulsar_scalable_topic_active_segments`, 
`pulsar_scalable_topic_auto_splits_total`, 
`pulsar_scalable_topic_auto_merges_total`, 
`pulsar_scalable_topic_split_suppressed_max_segments_total`, 
`pulsar_scalable_topic_merge_suppressed_max_depth_total`.
+- The existing `ScalableTopicStats` is extended with the most recent 
`SegmentLoadStats` per segment and the resolved effective policy, so operators 
can see *why* the controller did or did not act.
+
+---
+
+## Operational Safety
+
+The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and threshold-gap 
guards together bound both the rate and the total amount of structural change a 
topic can undergo, so enabling auto split/merge cannot cause unbounded segment 
growth or split/merge storms.
+
+Operators who want manual-only control set 
`scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false` override 
(namespace/topic).
+
+> **Compatibility:** scalable topics are a new, as-yet-unreleased feature 
([PIP-460](pip-460.md)), so there is no backward/forward compatibility to 
consider — `SegmentLoadStats`, the policy fields, and the config knobs all ship 
together with the rest of the scalable-topic feature.
+
+---
+
+## Security Considerations
+
+`setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace variants) 
require the same admin permissions as the corresponding existing scalable-topic 
and namespace policy operations. `SegmentLoadStats` is written by brokers via 
their authenticated internal identity and is not client-writable.
+
+---
+
+## Links
+
+- Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md)
+- Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md)
+- V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
+- Mailing List discussion thread: TBD
+- Mailing List voting thread: TBD

Reply via email to