This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 20dc8f9b089 [improve][broker] PIP-483: scalable topic auto split/merge
(#25980)
20dc8f9b089 is described below
commit 20dc8f9b0892150ebc83032071628d16e5e294f1
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 11 19:34:45 2026 -0700
[improve][broker] PIP-483: scalable topic auto split/merge (#25980)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 146 +++++++++
.../broker/resources/ScalableTopicResources.java | 58 +++-
.../pulsar/broker/service/BrokerService.java | 94 +++++-
.../broker/service/scalable/AutoScaleConfig.java | 146 +++++++++
.../broker/service/scalable/AutoScaleDecision.java | 42 +++
.../service/scalable/AutoScalePolicyEvaluator.java | 257 ++++++++++++++++
.../service/scalable/ScalableTopicController.java | 252 +++++++++++++++-
.../broker/service/scalable/SegmentLayout.java | 41 +++
.../service/scalable/SegmentLoadReporter.java | 141 +++++++++
.../broker/service/scalable/SegmentLoadSample.java | 35 +++
.../service/scalable/AutoScaleConfigTest.java | 125 ++++++++
.../scalable/AutoScalePolicyEvaluatorTest.java | 335 +++++++++++++++++++++
.../ScalableTopicControllerAutoScaleTest.java | 286 ++++++++++++++++++
.../scalable/ScalableTopicControllerTest.java | 7 +
.../service/scalable/ScalableTopicServiceTest.java | 11 +
.../broker/service/scalable/SegmentLayoutTest.java | 33 ++
.../service/scalable/SegmentLoadReporterTest.java | 176 +++++++++++
.../client/api/v5/V5SegmentLoadReporterTest.java | 85 ++++++
.../pulsar/common/scalable/SegmentLoadStats.java | 46 +++
19 files changed, 2310 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ae4ec5777de..71ad014d7d8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1366,6 +1366,152 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int scalableTopicConsumerSessionGracePeriodSeconds = 60;
+ /**** --- Scalable topic auto split/merge (PIP-483). --- ****/
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Cluster-wide default for scalable-topic auto split/merge.
When true, the controller "
+ + "leader automatically splits hot segments and merges
cold ones, within the caps "
+ + "below. Can be overridden per-namespace and per-topic."
+ )
+ private boolean scalableTopicAutoScaleEnabled = true;
+
+ @FieldContext(
+ dynamic = false,
+ category = CATEGORY_POLICIES,
+ doc = "Cadence (seconds) of the controller's periodic
traffic-driven auto split/merge "
+ + "evaluation. Consumer-count changes are handled
event-driven and are not affected "
+ + "by this interval. Read when a controller wins
leadership; not dynamic."
+ )
+ private int scalableTopicAutoScaleIntervalSeconds = 60;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Hard ceiling on the number of active segments a scalable
topic can be auto-scaled to. "
+ + "Splits stop firing once this is reached."
+ )
+ private int scalableTopicMaxSegments = 64;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Hard floor on the number of active segments. Merges stop
firing once this is reached."
+ )
+ private int scalableTopicMinSegments = 1;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Max number of merges allowed in a segment's lineage. Once a
segment reaches this depth "
+ + "it stops being a merge candidate (load-driven splits
are still allowed), bounding "
+ + "split/merge flip-flopping."
+ )
+ private int scalableTopicMaxDagDepth = 10;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Minimum time (seconds) between automatic splits on a topic.
Deliberately short — it "
+ + "only coalesces a burst of near-simultaneous triggers
(e.g. a consumer group "
+ + "connecting at once)."
+ )
+ private int scalableTopicSplitCooldownSeconds = 60;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Minimum time (seconds) between automatic merges on a topic."
+ )
+ private int scalableTopicMergeCooldownSeconds = 300;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "How long (seconds) a segment must continuously stay below
every merge threshold before "
+ + "it becomes merge-eligible."
+ )
+ private int scalableTopicMergeWindowSeconds = 300;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Inbound messages/second above which a segment is split."
+ )
+ private double scalableTopicSplitMsgRateInThreshold = 10_000;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Inbound bytes/second above which a segment is split."
+ )
+ private long scalableTopicSplitBytesRateInThreshold = 50_000_000L;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Outbound (dispatched) messages/second above which a segment
is split."
+ )
+ private double scalableTopicSplitMsgRateOutThreshold = 50_000;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Outbound bytes/second above which a segment is split."
+ )
+ private long scalableTopicSplitBytesRateOutThreshold = 250_000_000L;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Inbound messages/second below which a segment counts as
cold for merging."
+ )
+ private double scalableTopicMergeMsgRateInThreshold = 1_000;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Inbound bytes/second below which a segment counts as cold
for merging."
+ )
+ private long scalableTopicMergeBytesRateInThreshold = 5_000_000L;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Outbound messages/second below which a segment counts as
cold for merging."
+ )
+ private double scalableTopicMergeMsgRateOutThreshold = 5_000;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Outbound bytes/second below which a segment counts as cold
for merging."
+ )
+ private long scalableTopicMergeBytesRateOutThreshold = 25_000_000L;
+
+ @FieldContext(
+ dynamic = false,
+ category = CATEGORY_POLICIES,
+ doc = "Interval (seconds) at which the segment-owning broker
samples its segment topics to "
+ + "report load for auto split/merge. Read at broker start;
not dynamic."
+ )
+ private int scalableTopicLoadReportIntervalSeconds = 10;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Minimum relative change in any segment rate (e.g. 0.25 =
25%) since the last write that "
+ + "triggers a new load record. Keeps metadata write volume
bounded; a steady-state "
+ + "segment writes once and goes quiet.\n"
+ + "Note: the band is anchored at the last written value,
not at the split/merge "
+ + "thresholds. A rate that settles within the band of the
last record is never "
+ + "re-reported, so a segment can sustain up to this factor
beyond a split/merge "
+ + "threshold without triggering — the cost of bounded
write volume. Lower the "
+ + "threshold for tighter tracking at the price of more
metadata writes."
+ )
+ private double scalableTopicLoadReportRateChangeThreshold = 0.25;
+
@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 5647feef52a..8d5999c061b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -32,9 +32,11 @@ import java.util.stream.Collectors;
import lombok.CustomLog;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -64,6 +66,8 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
private static final String SCALABLE_TOPIC_PATH = "/topics";
private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
private static final String CONSUMERS_SEGMENT = "consumers";
+ private static final String SEGMENTS_SEGMENT = "segments";
+ private static final String LOAD_SEGMENT = "load";
/**
* Use the topic's {@code properties} map verbatim as the secondary-index
entries.
@@ -77,6 +81,7 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
private final MetadataCache<SubscriptionMetadata> subscriptionCache;
private final MetadataCache<ConsumerRegistration>
consumerRegistrationCache;
+ private final MetadataCache<SegmentLoadStats> segmentLoadCache;
/**
* Per-path listeners for scalable-topic metadata events. Each listener
watches a
@@ -107,6 +112,7 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
super(store, ScalableTopicMetadata.class, operationTimeoutSec);
this.subscriptionCache =
store.getMetadataCache(SubscriptionMetadata.class);
this.consumerRegistrationCache =
store.getMetadataCache(ConsumerRegistration.class);
+ this.segmentLoadCache = store.getMetadataCache(SegmentLoadStats.class);
// Single shared metadata-store listener fans out to both per-path and
// per-namespace subscribers. Per-subscriber lifecycle goes through the
// register / deregister methods below.
@@ -260,7 +266,10 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
}
public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
- return deleteAsync(topicPath(tn));
+ // Recursive: the topic record has children — the controller leader
lock, the
+ // subscriptions (and their consumer registrations), and the
per-segment load
+ // records — all of which must go with the topic.
+ return getStore().deleteRecursive(topicPath(tn));
}
public CompletableFuture<Boolean> scalableTopicExistsAsync(TopicName tn) {
@@ -432,6 +441,53 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
/**
* Get the metadata store path for the controller leader lock.
*/
+ // --- Segment load records (PIP-483 auto split/merge) ---
+
+ /**
+ * Upsert a segment's load record. Written by the broker that owns the
segment's
+ * {@code segment://} topic, only when the rates have changed materially
since the last
+ * write (the materiality decision lives in {@code SegmentLoadReporter}).
+ *
+ * <p>An identical value is NOT rewritten: the record's {@code Stat}
modification time is
+ * what the controller uses as "cold since" for the merge window, so a
no-op rewrite —
+ * e.g. the first report after segment ownership moved to a broker with an
empty
+ * last-written cache — would spuriously reset the window and starve
merges under
+ * frequent rebalancing.
+ */
+ public CompletableFuture<Void> reportSegmentLoadAsync(TopicName tn, long
segmentId,
+ SegmentLoadStats
stats) {
+ String path = segmentLoadPath(tn, segmentId);
+ return segmentLoadCache.get(path).thenCompose(existing -> {
+ if (existing.isPresent() && existing.get().equals(stats)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return segmentLoadCache.readModifyUpdateOrCreate(path, __ -> stats)
+ .thenApply(__ -> null);
+ });
+ }
+
+ /**
+ * Read a segment's load record together with its metadata {@link Stat} —
the controller's
+ * auto-scaling evaluator uses {@code stat.getModificationTimestamp()} to
tell how long the
+ * segment has held its current load (the "cold for at least mergeWindow"
check).
+ *
+ * @return the value and its stat, or empty if no record has been written
yet
+ */
+ public CompletableFuture<Optional<CacheGetResult<SegmentLoadStats>>>
getSegmentLoadAsync(
+ TopicName tn, long segmentId) {
+ return segmentLoadCache.getWithStats(segmentLoadPath(tn, segmentId));
+ }
+
+ /** Delete a segment's load record (best-effort; tolerates a missing
record). */
+ public CompletableFuture<Void> deleteSegmentLoadAsync(TopicName tn, long
segmentId) {
+ return segmentLoadCache.delete(segmentLoadPath(tn, segmentId))
+ .exceptionally(ignoreMissing());
+ }
+
+ public String segmentLoadPath(TopicName tn, long segmentId) {
+ return joinPath(topicPath(tn), SEGMENTS_SEGMENT,
Long.toString(segmentId), LOAD_SEGMENT);
+ }
+
public String controllerLockPath(TopicName tn) {
return joinPath(topicPath(tn), "controller");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f702ffc1afe..fe4296710e1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -138,6 +138,7 @@ import
org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryCla
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.scalable.SegmentLoadReporter;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -186,6 +187,8 @@ import org.apache.pulsar.common.policies.data.TopicType;
import
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
@@ -286,6 +289,9 @@ public class BrokerService implements Closeable {
private final SingleThreadNonConcurrentFixedRateScheduler
compactionMonitor;
private final SingleThreadNonConcurrentFixedRateScheduler
consumedLedgersMonitor;
private SingleThreadNonConcurrentFixedRateScheduler
deduplicationSnapshotMonitor;
+ /** PIP-483: periodic sweep that writes per-segment load records for auto
split/merge. */
+ private SingleThreadNonConcurrentFixedRateScheduler
segmentLoadReporterMonitor;
+ private SegmentLoadReporter segmentLoadReporter;
protected final PublishRateLimiter brokerPublishRateLimiter;
private final DispatchRateLimiterFactory dispatchRateLimiterFactory;
protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
@@ -683,9 +689,69 @@ public class BrokerService implements Closeable {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
+ this.startSegmentLoadReporter();
this.startClearInvalidateTopicNameCacheTask();
}
+ /**
+ * Start the periodic per-segment load reporter (PIP-483). On each tick
this broker sweeps
+ * the {@code segment://} topics it currently hosts, computes their
ingest/dispatch rates,
+ * and writes a {@link SegmentLoadStats} record to the metadata store —
but only when a rate
+ * changed materially since the last write (see {@link
SegmentLoadReporter}). The controller
+ * leader reads these records to drive auto split/merge.
+ */
+ protected void startSegmentLoadReporter() {
+ ServiceConfiguration conf = pulsar().getConfiguration();
+ if (!conf.isScalableTopicsEnabled()) {
+ return;
+ }
+ var resources =
pulsar().getPulsarResources().getScalableTopicResources();
+ if (resources == null) {
+ return;
+ }
+ int interval = conf.getScalableTopicLoadReportIntervalSeconds();
+ if (interval <= 0) {
+ return;
+ }
+ this.segmentLoadReporter = new SegmentLoadReporter(resources,
+ () ->
pulsar().getConfiguration().getScalableTopicLoadReportRateChangeThreshold());
+ this.segmentLoadReporterMonitor =
+ new
SingleThreadNonConcurrentFixedRateScheduler("scalable-segment-load-reporter");
+ segmentLoadReporterMonitor.scheduleAtFixedRateNonConcurrently(
+ () -> forEachTopic(this::reportSegmentLoad), interval,
interval, TimeUnit.SECONDS);
+ }
+
+ @VisibleForTesting
+ public void runSegmentLoadReportOnceForTest() {
+ forEachTopic(this::reportSegmentLoad);
+ }
+
+ private void reportSegmentLoad(Topic topic) {
+ SegmentLoadReporter reporter = this.segmentLoadReporter;
+ if (reporter == null) {
+ return;
+ }
+ TopicName topicName = TopicName.get(topic.getName());
+ if (topicName.getDomain() != TopicDomain.segment) {
+ return;
+ }
+ try {
+ TopicName parent = SegmentTopicName.getParentTopicName(topicName);
+ long segmentId = SegmentTopicName.getSegmentId(topicName);
+ var stats = topic.getStats(false, false, false);
+ SegmentLoadStats load = new SegmentLoadStats(
+ stats.msgRateIn, stats.msgThroughputIn, stats.msgRateOut,
stats.msgThroughputOut);
+ reporter.reportIfChanged(parent, segmentId, load).exceptionally(ex
-> {
+ log.debug().attr("segment", topicName).exceptionMessage(ex)
+ .log("Failed to report segment load");
+ return null;
+ });
+ } catch (Exception e) {
+ log.debug().attr("segment", topicName).exceptionMessage(e)
+ .log("Failed to sample segment load");
+ }
+ }
+
protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache =
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(
@@ -967,7 +1033,8 @@ public class BrokerService implements Closeable {
consumedLedgersMonitor,
backlogQuotaChecker,
topicOrderedExecutor,
- deduplicationSnapshotMonitor)
+ deduplicationSnapshotMonitor,
+ segmentLoadReporterMonitor)
.handle());
CompletableFuture<Void> combined =
@@ -2707,9 +2774,34 @@ public class BrokerService implements Closeable {
if (compactor != null) {
compactor.getStats().removeTopic(topic);
}
+ forgetSegmentLoad(topic);
topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD,
EventStage.SUCCESS);
}
+ /**
+ * Drop the load reporter's last-written cache entry for a segment topic
this broker no
+ * longer owns (unload / delete). Without this the cache grows unboundedly
with segment
+ * churn, and on a later re-acquire the first sample could be wrongly
suppressed as
+ * immaterial.
+ */
+ private void forgetSegmentLoad(String topic) {
+ SegmentLoadReporter reporter = this.segmentLoadReporter;
+ if (reporter == null) {
+ return;
+ }
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.getDomain() != TopicDomain.segment) {
+ return;
+ }
+ try {
+ reporter.forget(SegmentTopicName.getParentTopicName(topicName),
+ SegmentTopicName.getSegmentId(topicName));
+ } catch (Exception e) {
+ log.debug().attr("segment", topicName).exceptionMessage(e)
+ .log("Failed to forget segment load cache entry");
+ }
+ }
+
public long getNumberOfNamespaceBundles() {
this.numberOfNamespaceBundles = 0;
this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
new file mode 100644
index 00000000000..b739b102a76
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.time.Duration;
+import lombok.Builder;
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+/**
+ * Fully-resolved auto split/merge policy for a single scalable topic
(PIP-483).
+ *
+ * <p>This is the flattened result of merging broker config defaults with any
namespace and
+ * topic overrides. The {@link AutoScalePolicyEvaluator} reads it directly —
it never sees
+ * the partial override objects or the broker config.
+ *
+ * <p>All thresholds are absolute (msg/s and bytes/s). Split thresholds must
sit strictly
+ * above the corresponding merge thresholds: the dead-band between them is the
hysteresis
+ * that prevents a just-merged segment from immediately re-qualifying for a
split.
+ *
+ * @param enabled whether auto split/merge is active for this topic;
when false the
+ * evaluator always returns {@code NoAction}
+ * @param maxSegments hard ceiling on active segments; splits stop once
reached
+ * @param minSegments hard floor on active segments; merges stop once
reached
+ * @param maxDagDepth max merges allowed in a segment's lineage; a pair
is merge-eligible
+ * only while neither side has reached this depth
(splits are unaffected)
+ * @param splitCooldown minimum time between automatic splits on the
topic; short, only to
+ * coalesce a burst of near-simultaneous triggers
+ * @param mergeCooldown minimum time between automatic merges on the topic
+ * @param mergeWindow how long a segment must continuously stay below
every merge threshold
+ * before it becomes merge-eligible (measured from
the load record's
+ * metadata-store last-modified time)
+ * @param splitMsgRateIn inbound msg/s above which a segment is split
+ * @param splitBytesRateIn inbound bytes/s above which a segment is split
+ * @param splitMsgRateOut outbound (dispatched) msg/s above which a segment
is split
+ * @param splitBytesRateOut outbound bytes/s above which a segment is split
+ * @param mergeMsgRateIn inbound msg/s below which a segment counts as cold
for merging
+ * @param mergeBytesRateIn inbound bytes/s below which a segment counts as
cold for merging
+ * @param mergeMsgRateOut outbound msg/s below which a segment counts as
cold for merging
+ * @param mergeBytesRateOut outbound bytes/s below which a segment counts as
cold for merging
+ */
+@Builder(toBuilder = true)
+public record AutoScaleConfig(
+ boolean enabled,
+ int maxSegments,
+ int minSegments,
+ int maxDagDepth,
+ Duration splitCooldown,
+ Duration mergeCooldown,
+ Duration mergeWindow,
+ double splitMsgRateIn,
+ double splitBytesRateIn,
+ double splitMsgRateOut,
+ double splitBytesRateOut,
+ double mergeMsgRateIn,
+ double mergeBytesRateIn,
+ double mergeMsgRateOut,
+ double mergeBytesRateOut
+) {
+
+ /**
+ * Build the cluster-wide default policy from broker configuration.
Per-namespace and
+ * per-topic overrides (when added) are layered on top of this via {@code
toBuilder()}.
+ *
+ * @param conf the broker service configuration
+ * @return the resolved policy reflecting the {@code scalableTopic*}
settings
+ */
+ public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) {
+ return AutoScaleConfig.builder()
+ .enabled(conf.isScalableTopicAutoScaleEnabled())
+ .maxSegments(conf.getScalableTopicMaxSegments())
+ .minSegments(conf.getScalableTopicMinSegments())
+ .maxDagDepth(conf.getScalableTopicMaxDagDepth())
+
.splitCooldown(Duration.ofSeconds(conf.getScalableTopicSplitCooldownSeconds()))
+
.mergeCooldown(Duration.ofSeconds(conf.getScalableTopicMergeCooldownSeconds()))
+
.mergeWindow(Duration.ofSeconds(conf.getScalableTopicMergeWindowSeconds()))
+ .splitMsgRateIn(conf.getScalableTopicSplitMsgRateInThreshold())
+
.splitBytesRateIn(conf.getScalableTopicSplitBytesRateInThreshold())
+
.splitMsgRateOut(conf.getScalableTopicSplitMsgRateOutThreshold())
+
.splitBytesRateOut(conf.getScalableTopicSplitBytesRateOutThreshold())
+ .mergeMsgRateIn(conf.getScalableTopicMergeMsgRateInThreshold())
+
.mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold())
+
.mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold())
+
.mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold())
+ .build()
+ .validated();
+ }
+
+ /**
+ * Validate the invariants the evaluator depends on; returns {@code this}
for chaining.
+ *
+ * <p>In particular every split threshold must be strictly positive — the
evaluator
+ * scores overload as {@code rate / splitThreshold}, and a zero threshold
would make any
+ * positive rate score {@code Infinity} (permanent split pressure) while a
zero rate
+ * scores {@code NaN} (silently ignored). Catching misconfiguration here
surfaces a
+ * clear error at the policy-resolution layer instead.
+ *
+ * @throws IllegalArgumentException if any invariant is violated
+ */
+ public AutoScaleConfig validated() {
+ check(minSegments >= 1, "minSegments must be >= 1");
+ check(maxSegments >= minSegments, "maxSegments must be >=
minSegments");
+ check(maxDagDepth >= 0, "maxDagDepth must be >= 0");
+ check(!splitCooldown.isNegative(), "splitCooldown must not be
negative");
+ check(!mergeCooldown.isNegative(), "mergeCooldown must not be
negative");
+ check(!mergeWindow.isNegative(), "mergeWindow must not be negative");
+ check(splitMsgRateIn > 0, "splitMsgRateInThreshold must be > 0");
+ check(splitBytesRateIn > 0, "splitBytesRateInThreshold must be > 0");
+ check(splitMsgRateOut > 0, "splitMsgRateOutThreshold must be > 0");
+ check(splitBytesRateOut > 0, "splitBytesRateOutThreshold must be > 0");
+ check(mergeMsgRateIn >= 0, "mergeMsgRateInThreshold must be >= 0");
+ check(mergeBytesRateIn >= 0, "mergeBytesRateInThreshold must be >= 0");
+ check(mergeMsgRateOut >= 0, "mergeMsgRateOutThreshold must be >= 0");
+ check(mergeBytesRateOut >= 0, "mergeBytesRateOutThreshold must be >=
0");
+ check(splitMsgRateIn > mergeMsgRateIn,
+ "splitMsgRateInThreshold must be > mergeMsgRateInThreshold
(hysteresis)");
+ check(splitBytesRateIn > mergeBytesRateIn,
+ "splitBytesRateInThreshold must be > mergeBytesRateInThreshold
(hysteresis)");
+ check(splitMsgRateOut > mergeMsgRateOut,
+ "splitMsgRateOutThreshold must be > mergeMsgRateOutThreshold
(hysteresis)");
+ check(splitBytesRateOut > mergeBytesRateOut,
+ "splitBytesRateOutThreshold must be >
mergeBytesRateOutThreshold (hysteresis)");
+ return this;
+ }
+
+ private static void check(boolean condition, String message) {
+ if (!condition) {
+ throw new IllegalArgumentException("Invalid auto split/merge
configuration: " + message);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java
new file mode 100644
index 00000000000..e8a1713e1f0
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleDecision.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+/**
+ * The outcome of one {@link AutoScalePolicyEvaluator} evaluation (PIP-483):
split one
+ * segment, merge two adjacent segments, or do nothing. Each non-{@link
NoAction} variant
+ * carries a short {@code reason} string used for logging and metrics.
+ */
+public sealed interface AutoScaleDecision
+ permits AutoScaleDecision.Split, AutoScaleDecision.Merge,
AutoScaleDecision.NoAction {
+
+ /** Split {@code segmentId} at its midpoint. */
+ record Split(long segmentId, String reason) implements AutoScaleDecision {
+ }
+
+ /** Merge the two adjacent active segments {@code segmentId1} and {@code
segmentId2}. */
+ record Merge(long segmentId1, long segmentId2, String reason) implements
AutoScaleDecision {
+ }
+
+ /** No action this evaluation. */
+ record NoAction() implements AutoScaleDecision {
+ }
+
+ NoAction NONE = new NoAction();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java
new file mode 100644
index 00000000000..bcb48ffd6a9
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluator.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * Pure, side-effect-free decision function for scalable-topic auto
split/merge (PIP-483).
+ *
+ * <p>Given a snapshot of the current layout, per-segment load samples,
per-subscription
+ * stream/checkpoint consumer counts, the resolved policy, and the current
time, it returns
+ * exactly one {@link AutoScaleDecision}. It performs no I/O and holds no
state — the caller
+ * (the controller leader) collects the inputs and dispatches the result.
+ *
+ * <p>It runs two passes and emits at most one action:
+ * <ol>
+ * <li><b>Split</b> (fast, lightly coalesced by {@code splitCooldown}):
consumer-count
+ * scale-up first, then traffic-driven scale-up.</li>
+ * <li><b>Merge</b> (lazy, gated by {@code mergeCooldown} + {@code
mergeWindow} +
+ * {@code maxDagDepth}): only if no split fired.</li>
+ * </ol>
+ */
+public final class AutoScalePolicyEvaluator {
+
+ private AutoScalePolicyEvaluator() {
+ }
+
+ /**
+ * Decide whether to split, merge, or do nothing.
+ *
+ * @param layout the current segment layout
+ * @param loadBySegment per active-segment load sample; a missing
entry is treated
+ * as zero load with no age (never
merge-eligible)
+ * @param streamConsumerCount per-subscription count of STREAM/CHECKPOINT
(controller-managed)
+ * consumers; QUEUE subscriptions are excluded
by the caller
+ * @param config the resolved policy
+ * @param nowMs current wall-clock time, epoch millis
+ * @param lastSplitAtMs epoch millis of the last split on this topic
(manual or auto),
+ * or {@code Long.MIN_VALUE} if none
+ * @param lastMergeAtMs epoch millis of the last merge on this topic
(manual or auto),
+ * or {@code Long.MIN_VALUE} if none
+ * @return the decision
+ */
+ public static AutoScaleDecision decide(
+ SegmentLayout layout,
+ Map<Long, SegmentLoadSample> loadBySegment,
+ Map<String, Integer> streamConsumerCount,
+ AutoScaleConfig config,
+ long nowMs,
+ long lastSplitAtMs,
+ long lastMergeAtMs) {
+
+ if (!config.enabled()) {
+ return AutoScaleDecision.NONE;
+ }
+
+ List<SegmentInfo> active = new
ArrayList<>(layout.getActiveSegments().values());
+
+ AutoScaleDecision split = trySplit(active, loadBySegment,
streamConsumerCount,
+ config, nowMs, lastSplitAtMs);
+ if (!(split instanceof AutoScaleDecision.NoAction)) {
+ return split;
+ }
+
+ return tryMerge(active, layout, loadBySegment, config, nowMs,
lastMergeAtMs);
+ }
+
+ // --- Split pass ---
+
+ private static AutoScaleDecision trySplit(
+ List<SegmentInfo> active,
+ Map<Long, SegmentLoadSample> loadBySegment,
+ Map<String, Integer> streamConsumerCount,
+ AutoScaleConfig config,
+ long nowMs,
+ long lastSplitAtMs) {
+
+ if (active.size() >= config.maxSegments()) {
+ return AutoScaleDecision.NONE;
+ }
+ if (withinCooldown(nowMs, lastSplitAtMs,
config.splitCooldown().toMillis())) {
+ return AutoScaleDecision.NONE;
+ }
+
+ // (a) Consumer-driven: per-subscription max. If any managed
subscription has more
+ // consumers than there are active segments, add a segment so the 1:1
assignment can
+ // give the extra consumer its own segment. Split the busiest segment
by msgRateIn so
+ // the new pair lands where it relieves the most ingest.
+ int requiredConsumers = streamConsumerCount.values().stream()
+ .mapToInt(Integer::intValue).max().orElse(0);
+ if (requiredConsumers > active.size()) {
+ SegmentInfo target = busiestByMsgRateIn(active, loadBySegment);
+ if (target != null) {
+ return new AutoScaleDecision.Split(target.segmentId(),
"consumer-count");
+ }
+ }
+
+ // (b) Load-driven: split the segment with the highest overload score
among those over
+ // at least one split threshold.
+ SegmentInfo hottest = null;
+ double hottestScore = 1.0; // strictly over threshold means a
per-metric ratio > 1.0
+ String hottestReason = null;
+ for (SegmentInfo segment : active) {
+ SegmentLoadStats stats = statsOf(segment.segmentId(),
loadBySegment);
+ double score = 0.0;
+ String reason = null;
+ double[] ratios = {
+ stats.msgRateIn() / config.splitMsgRateIn(),
+ stats.bytesRateIn() / config.splitBytesRateIn(),
+ stats.msgRateOut() / config.splitMsgRateOut(),
+ stats.bytesRateOut() / config.splitBytesRateOut(),
+ };
+ String[] reasons = {"msgRateIn", "bytesRateIn", "msgRateOut",
"bytesRateOut"};
+ for (int i = 0; i < ratios.length; i++) {
+ if (ratios[i] > score) {
+ score = ratios[i];
+ reason = reasons[i];
+ }
+ }
+ if (score > 1.0 && score > hottestScore) {
+ hottestScore = score;
+ hottest = segment;
+ hottestReason = reason;
+ }
+ }
+ if (hottest != null) {
+ return new AutoScaleDecision.Split(hottest.segmentId(),
hottestReason);
+ }
+
+ return AutoScaleDecision.NONE;
+ }
+
+ // --- Merge pass ---
+
+ private static AutoScaleDecision tryMerge(
+ List<SegmentInfo> active,
+ SegmentLayout layout,
+ Map<Long, SegmentLoadSample> loadBySegment,
+ AutoScaleConfig config,
+ long nowMs,
+ long lastMergeAtMs) {
+
+ if (active.size() <= config.minSegments()) {
+ return AutoScaleDecision.NONE;
+ }
+ if (withinCooldown(nowMs, lastMergeAtMs,
config.mergeCooldown().toMillis())) {
+ return AutoScaleDecision.NONE;
+ }
+
+ long mergeWindowMs = config.mergeWindow().toMillis();
+
+ AutoScaleDecision.Merge coldest = null;
+ double coldestCombined = Double.MAX_VALUE;
+ for (int i = 0; i < active.size(); i++) {
+ for (int j = i + 1; j < active.size(); j++) {
+ SegmentInfo a = active.get(i);
+ SegmentInfo b = active.get(j);
+ if (!a.hashRange().isAdjacentTo(b.hashRange())) {
+ continue;
+ }
+ if (layout.mergeDepth(a.segmentId()) >= config.maxDagDepth()
+ || layout.mergeDepth(b.segmentId()) >=
config.maxDagDepth()) {
+ continue;
+ }
+ if (!coldEnough(a.segmentId(), loadBySegment, config, nowMs,
mergeWindowMs)
+ || !coldEnough(b.segmentId(), loadBySegment, config,
nowMs, mergeWindowMs)) {
+ continue;
+ }
+ double combined = combinedRate(a.segmentId(), loadBySegment)
+ + combinedRate(b.segmentId(), loadBySegment);
+ if (combined < coldestCombined) {
+ coldestCombined = combined;
+ coldest = new AutoScaleDecision.Merge(a.segmentId(),
b.segmentId(), "cold");
+ }
+ }
+ }
+ return coldest != null ? coldest : AutoScaleDecision.NONE;
+ }
+
+ /**
+ * A segment is cold enough to merge only if it has a load record that has
stayed below
+ * every merge threshold for at least {@code mergeWindowMs}. A missing
record means we
+ * have no evidence the segment is durably cold, so it is never
merge-eligible.
+ *
+ * <p>Note that {@code nowMs} is the controller broker's clock while the
sample's
+ * {@code modifiedAtMs} is the metadata store's server-side timestamp;
clock skew between
+ * the two shifts the effective window. Acceptable for a lazy-merge
heuristic — skew is
+ * normally seconds against a multi-minute window.
+ */
+ private static boolean coldEnough(long segmentId, Map<Long,
SegmentLoadSample> loadBySegment,
+ AutoScaleConfig config, long nowMs, long
mergeWindowMs) {
+ SegmentLoadSample sample = loadBySegment.get(segmentId);
+ if (sample == null) {
+ return false;
+ }
+ if (nowMs - sample.modifiedAtMs() < mergeWindowMs) {
+ return false;
+ }
+ SegmentLoadStats stats = sample.stats();
+ return stats.msgRateIn() < config.mergeMsgRateIn()
+ && stats.bytesRateIn() < config.mergeBytesRateIn()
+ && stats.msgRateOut() < config.mergeMsgRateOut()
+ && stats.bytesRateOut() < config.mergeBytesRateOut();
+ }
+
+ // --- Helpers ---
+
+ private static boolean withinCooldown(long nowMs, long lastAtMs, long
cooldownMs) {
+ return lastAtMs != Long.MIN_VALUE && nowMs - lastAtMs < cooldownMs;
+ }
+
+ private static SegmentLoadStats statsOf(long segmentId, Map<Long,
SegmentLoadSample> load) {
+ SegmentLoadSample sample = load.get(segmentId);
+ return sample != null ? sample.stats() : SegmentLoadStats.ZERO;
+ }
+
+ private static double combinedRate(long segmentId, Map<Long,
SegmentLoadSample> load) {
+ SegmentLoadStats s = statsOf(segmentId, load);
+ return s.msgRateIn() + s.bytesRateIn() + s.msgRateOut() +
s.bytesRateOut();
+ }
+
+ private static SegmentInfo busiestByMsgRateIn(List<SegmentInfo> active,
+ Map<Long, SegmentLoadSample>
load) {
+ SegmentInfo best = null;
+ double bestRate = -1.0;
+ for (SegmentInfo segment : active) {
+ double rate = statsOf(segment.segmentId(), load).msgRateIn();
+ // Tie-break deterministically on segment id so the choice is
stable across ticks.
+ if (rate > bestRate || (rate == bestRate && best != null
+ && segment.segmentId() < best.segmentId())) {
+ bestRate = rate;
+ best = segment;
+ }
+ }
+ return best;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index c5501db56e2..0b8c70d66b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.scalable;
+import com.google.common.annotations.VisibleForTesting;
import io.github.merlimat.slog.Logger;
import java.time.Clock;
import java.time.Duration;
@@ -31,8 +32,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.BrokerService;
@@ -84,6 +87,27 @@ public class ScalableTopicController {
/** Sealed-segment GC scheduled task. Non-null only while this broker is
leader. */
private volatile ScheduledFuture<?> gcTask;
+ /** Periodic auto split/merge evaluation task (PIP-483). Non-null only
while leader. */
+ private volatile ScheduledFuture<?> autoScaleTask;
+
+ /**
+ * Serializes auto split/merge: an evaluation acquires this before
deciding and holds it
+ * for the whole split/merge it dispatches, so concurrent ticks /
consumer-change triggers
+ * never launch overlapping auto operations.
+ */
+ private final AtomicBoolean autoScaleInFlight = new AtomicBoolean(false);
+
+ /**
+ * Set when a trigger arrives while an evaluation is in flight; the
in-flight run
+ * re-evaluates once on completion so coalesced triggers are not lost
until the next tick.
+ */
+ private final AtomicBoolean autoScaleReEvaluate = new AtomicBoolean(false);
+
+ /** Epoch millis of the last split on this topic (manual or auto);
MIN_VALUE if none. */
+ private volatile long lastSplitAtMs = Long.MIN_VALUE;
+ /** Epoch millis of the last merge on this topic (manual or auto);
MIN_VALUE if none. */
+ private volatile long lastMergeAtMs = Long.MIN_VALUE;
+
@Getter
private volatile LeaderElectionState leaderState =
LeaderElectionState.NoLeader;
@@ -126,10 +150,11 @@ public class ScalableTopicController {
private void onLeaderStateChange(LeaderElectionState state) {
log.info().attr("state", state).log("Leader state change for scalable
topic");
if (state != LeaderElectionState.Leading) {
- // Stepped down (or never was leader). Stop the GC tick so the
deposed leader
- // doesn't race the new one on layout writes / backing-topic
deletes. The new
- // leader's initialize() will reschedule.
+ // Stepped down (or never was leader). Stop the GC and auto-scale
ticks so the
+ // deposed leader doesn't race the new one on layout writes /
backing-topic
+ // deletes. The new leader's initialize() will reschedule.
cancelGcTask();
+ cancelAutoScaleTask();
}
if (state == LeaderElectionState.NoLeader && !closed) {
initialize().exceptionally(ex -> {
@@ -160,7 +185,9 @@ public class ScalableTopicController {
})
.thenCompose(__ -> {
if (isLeader()) {
+ seedAutoScaleCooldownsFromLayout();
scheduleGcTask();
+ scheduleAutoScaleTask();
return ensureActiveSegmentsExist()
.thenCompose(___ ->
restoreSessionsFromStore());
}
@@ -168,6 +195,29 @@ public class ScalableTopicController {
});
}
+ /**
+ * Recover the auto split/merge cooldown clocks after winning leadership.
The timestamps
+ * are in-memory only, but the layout itself records when each segment was
created — a
+ * split's children have exactly one parent, a merge's child has two — so
the most recent
+ * creation time of each class is exactly when the last split / merge
happened. Without
+ * this, every leader failover would reset both cooldowns and e.g. allow
an auto merge
+ * seconds after one just ran on the previous leader.
+ */
+ private void seedAutoScaleCooldownsFromLayout() {
+ long split = Long.MIN_VALUE;
+ long merge = Long.MIN_VALUE;
+ for (SegmentInfo segment : currentLayout.getAllSegments().values()) {
+ int parents = segment.parentIds().size();
+ if (parents == 1) {
+ split = Math.max(split, segment.createdAtMs());
+ } else if (parents >= 2) {
+ merge = Math.max(merge, segment.createdAtMs());
+ }
+ }
+ lastSplitAtMs = split;
+ lastMergeAtMs = merge;
+ }
+
/**
* Recovery path for active segments whose backing topics are missing —
e.g.,
* a {@code createScalableTopic} call that committed metadata but failed to
@@ -237,6 +287,186 @@ public class ScalableTopicController {
}
}
+ // --- Auto split/merge (PIP-483) ---
+
+ /**
+ * Schedule the periodic traffic-driven auto split/merge evaluation. Only
fires on the
+ * controller leader; idempotent. Cancelled on close / leader-loss.
Consumer-count
+ * changes are handled event-driven (see {@link
#onConsumerCountChanged()}), not by this
+ * tick.
+ *
+ * <p>The tick is scheduled even when auto-scaling is currently disabled:
the enabled
+ * flag is dynamic and re-checked on every evaluation, so flipping it on
takes effect at
+ * the next tick rather than waiting for a leadership cycle. A disabled
tick is a cheap
+ * no-op.
+ */
+ private synchronized void scheduleAutoScaleTask() {
+ if (closed || autoScaleTask != null) {
+ return;
+ }
+ ServiceConfiguration config = brokerConfig();
+ if (config == null) {
+ return;
+ }
+ long intervalMs = Duration.ofSeconds(
+ config.getScalableTopicAutoScaleIntervalSeconds()).toMillis();
+ if (intervalMs <= 0) {
+ return;
+ }
+ autoScaleTask = scheduler().scheduleAtFixedRate(
+ () -> runAutoScaleSafely("tick"), intervalMs, intervalMs,
TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void cancelAutoScaleTask() {
+ if (autoScaleTask != null) {
+ autoScaleTask.cancel(false);
+ autoScaleTask = null;
+ }
+ }
+
+ /**
+ * Event-driven trigger: a stream/checkpoint consumer registered or
unregistered, which
+ * may change the per-subscription consumer count. Evaluates the
consumer-count split rule
+ * within seconds rather than waiting for the periodic tick.
+ */
+ private void onConsumerCountChanged() {
+ runAutoScaleSafely("consumer-change");
+ }
+
+ private void runAutoScaleSafely(String trigger) {
+ if (!isLeader() || closed) {
+ return;
+ }
+ try {
+ evaluateAndAct(trigger).exceptionally(ex -> {
+ log.warn().attr("trigger", trigger).exceptionMessage(ex)
+ .log("Auto split/merge evaluation failed");
+ return null;
+ });
+ } catch (Throwable t) {
+ log.warn().attr("trigger", trigger).exception(t)
+ .log("Auto split/merge evaluation threw");
+ }
+ }
+
+ /**
+ * Collect the current inputs, run the pure {@link
AutoScalePolicyEvaluator}, and dispatch
+ * the resulting action. At most one auto operation runs at a time: {@link
#autoScaleInFlight}
+ * is held from before the decision through the end of the dispatched
split/merge.
+ */
+ private CompletableFuture<Void> evaluateAndAct(String trigger) {
+ ServiceConfiguration brokerConfig = brokerConfig();
+ if (brokerConfig == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ AutoScaleConfig config =
AutoScaleConfig.fromBrokerConfig(brokerConfig);
+ if (!config.enabled()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (!autoScaleInFlight.compareAndSet(false, true)) {
+ // Another evaluation or auto operation is already running. Don't
drop the
+ // trigger: mark it pending so the in-flight run re-evaluates on
completion —
+ // e.g. a consumer registering mid-evaluation would otherwise not
be considered
+ // until the next periodic tick.
+ autoScaleReEvaluate.set(true);
+ return CompletableFuture.completedFuture(null);
+ }
+ try {
+ return collectConsumerCounts()
+ .thenCombine(collectLoadSamples(), (consumers, load) ->
+ AutoScalePolicyEvaluator.decide(currentLayout,
load, consumers, config,
+ clock.millis(), lastSplitAtMs,
lastMergeAtMs))
+ .thenCompose(decision -> dispatch(decision, config,
trigger))
+ .whenComplete((__, ex) -> {
+ autoScaleInFlight.set(false);
+ if (autoScaleReEvaluate.getAndSet(false)) {
+ // Re-run off the completion thread for the
trigger(s) coalesced
+ // while this evaluation was in flight.
+ scheduler().execute(() ->
runAutoScaleSafely("coalesced"));
+ }
+ });
+ } catch (Throwable t) {
+ // A synchronous throw between the CAS and the future chain would
otherwise leave
+ // the in-flight flag set forever, silently disabling auto-scaling
on this topic.
+ autoScaleInFlight.set(false);
+ throw t;
+ }
+ }
+
+ private CompletableFuture<Void> dispatch(AutoScaleDecision decision,
AutoScaleConfig config,
+ String trigger) {
+ if (decision instanceof AutoScaleDecision.Split split) {
+ log.info().attr("segmentId", split.segmentId()).attr("reason",
split.reason())
+ .attr("trigger", trigger).log("Auto split");
+ return splitSegment(split.segmentId())
+ .thenApply(__ -> {
+ scheduleFollowUpEvaluation(config);
+ return null;
+ });
+ }
+ if (decision instanceof AutoScaleDecision.Merge merge) {
+ log.info().attr("segmentId1",
merge.segmentId1()).attr("segmentId2", merge.segmentId2())
+ .attr("reason", merge.reason()).attr("trigger",
trigger).log("Auto merge");
+ return mergeSegments(merge.segmentId1(),
merge.segmentId2()).thenApply(__ -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * After a successful auto split, schedule one follow-up evaluation right
after the split
+ * cooldown expires. A burst of consumers joining at once needs one split
per cooldown to
+ * converge (e.g. 1 segment → N); without this it converges one split per
periodic tick
+ * instead, which is slower whenever the cooldown is shorter than the
tick. The chain
+ * stops naturally at the first evaluation that decides {@code NoAction}.
+ */
+ private void scheduleFollowUpEvaluation(AutoScaleConfig config) {
+ if (closed || !isLeader()) {
+ return;
+ }
+ long delayMs = config.splitCooldown().toMillis() + 1;
+ scheduler().schedule(() -> runAutoScaleSafely("post-split"),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Per-subscription consumer counts for the controller-managed
(STREAM/CHECKPOINT)
+ * subscriptions. QUEUE subscriptions bypass the controller and have no
coordinator here,
+ * so they are naturally excluded — exactly the set the consumer-count
split rule wants.
+ */
+ private CompletableFuture<Map<String, Integer>> collectConsumerCounts() {
+ Map<String, Integer> counts = new LinkedHashMap<>();
+ subscriptions.forEach((name, coordinator) ->
+ counts.put(name, coordinator.getConsumers().size()));
+ return CompletableFuture.completedFuture(counts);
+ }
+
+ /** Read the load record (value + Stat modified time) for every active
segment. */
+ private CompletableFuture<Map<Long, SegmentLoadSample>>
collectLoadSamples() {
+ Map<Long, SegmentLoadSample> samples = new ConcurrentHashMap<>();
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ for (Long segmentId : currentLayout.getActiveSegments().keySet()) {
+ futures.add(resources.getSegmentLoadAsync(topicName, segmentId)
+ .thenAccept(opt -> opt.ifPresent(result ->
samples.put(segmentId,
+ new SegmentLoadSample(result.getValue(),
+
result.getStat().getModificationTimestamp())))));
+ }
+ return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
+ .thenApply(__ -> samples);
+ }
+
+ private ServiceConfiguration brokerConfig() {
+ return brokerService.getPulsar().getConfig();
+ }
+
+ /**
+ * Run one auto split/merge evaluation synchronously-awaitable, for tests.
Production code
+ * triggers evaluation via the periodic tick and consumer-change events.
+ */
+ @VisibleForTesting
+ CompletableFuture<Void> evaluateAutoScaleForTest() {
+ return evaluateAndAct("test");
+ }
+
/**
* Load persisted subscriptions and consumer registrations from the
metadata store and
* install them into per-subscription {@link SubscriptionCoordinator}
instances. Called
@@ -432,6 +662,9 @@ public class ScalableTopicController {
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
.thenCompose(optMd -> {
currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+ // Start the auto-split cooldown only now that the split
actually happened
+ // (covers manual and auto splits; a failed attempt doesn't burn
the cooldown).
+ lastSplitAtMs = nowMs;
// Step 5: Notify subscriptions of layout change (triggers
consumer reassignment)
return notifySubscriptions(currentLayout);
@@ -478,6 +711,9 @@ public class ScalableTopicController {
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
.thenCompose(optMd -> {
currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+ // Start the auto-merge cooldown only now that the merge
actually happened
+ // (covers manual and auto merges; a failed attempt doesn't burn
the cooldown).
+ lastMergeAtMs = nowMs;
return notifySubscriptions(currentLayout);
}).thenApply(__ -> currentLayout);
}
@@ -516,6 +752,9 @@ public class ScalableTopicController {
}
return coordinator.registerConsumer(consumerName, consumerId, cnx)
.thenApply(assignments -> {
+ // A new consumer may now outnumber the segments —
evaluate the
+ // consumer-count split rule promptly rather than waiting
for the tick.
+ onConsumerCountChanged();
// Look up by name since the key may have been an existing
session
return assignments.entrySet().stream()
.filter(e ->
consumerName.equals(e.getKey().getConsumerName()))
@@ -961,7 +1200,11 @@ public class ScalableTopicController {
})
.thenCompose(__ -> {
CompletableFuture<?>[] deletes = drained.stream()
- .map(this::deleteSegmentBackingTopic)
+ .map(s -> deleteSegmentBackingTopic(s)
+ // The segment is gone from the layout — drop
its load record
+ // too, or the .../segments/{id}/load entry
leaks forever.
+ .thenCompose(___ ->
+
resources.deleteSegmentLoadAsync(topicName, s.segmentId())))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(deletes);
})
@@ -1094,6 +1337,7 @@ public class ScalableTopicController {
public CompletableFuture<Void> close() {
closed = true;
cancelGcTask();
+ cancelAutoScaleTask();
// Stop each coordinator's drain poller before clearing — otherwise
the scheduler
// task keeps running after the controller goes away.
subscriptions.values().forEach(SubscriptionCoordinator::close);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
index aed7dd3f8e1..a3f1f0c5ef1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -18,11 +18,15 @@
*/
package org.apache.pulsar.broker.service.scalable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
@@ -112,6 +116,43 @@ public class SegmentLayout {
.collect(Collectors.toList());
}
+ /**
+ * Number of merge operations in a segment's ancestry, including the
segment itself.
+ *
+ * <p>A merge is the only operation that produces a segment with more than
one parent
+ * (a split produces children with exactly one parent), so the merge depth
is the count
+ * of segments in this segment's ancestor chain — itself included — that
have
+ * {@code parentIds.size() >= 2}.
+ *
+ * <p>Used by auto split/merge (PIP-483) to cap split↔merge churn: a pair
is only
+ * merge-eligible while neither side's merge depth has reached the
configured maximum,
+ * which bounds the merge depth of the resulting child.
+ *
+ * @param segmentId the segment to measure
+ * @return the number of merges in this segment's lineage (0 for a
never-merged segment)
+ */
+ public int mergeDepth(long segmentId) {
+ int depth = 0;
+ Deque<Long> toVisit = new ArrayDeque<>();
+ Set<Long> visited = new HashSet<>();
+ toVisit.add(segmentId);
+ while (!toVisit.isEmpty()) {
+ long id = toVisit.poll();
+ if (!visited.add(id)) {
+ continue;
+ }
+ SegmentInfo segment = allSegments.get(id);
+ if (segment == null) {
+ continue;
+ }
+ if (segment.parentIds().size() >= 2) {
+ depth++;
+ }
+ toVisit.addAll(segment.parentIds());
+ }
+ return depth;
+ }
+
/**
* Produce a new layout by splitting a segment at its midpoint.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java
new file mode 100644
index 00000000000..56001951c4a
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.DoubleSupplier;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * Writes per-segment {@link SegmentLoadStats} to the metadata store on behalf
of the broker
+ * that owns a segment's {@code segment://} topic (PIP-483).
+ *
+ * <p>To keep metadata write volume bounded, a sample is persisted only when
it differs
+ * materially from the last value this broker wrote for that segment — i.e.
some rate moved
+ * by more than the configured relative threshold (default 25%), or crossed
to/from zero. A
+ * steady-state segment therefore writes once and then stays quiet; the
controller's
+ * windowing relies on the stored record's {@code Stat} modification time
staying put while
+ * load is unchanged.
+ *
+ * <p><b>Known blind spot</b> — the materiality band is anchored at the last
<em>written</em>
+ * value, not at the policy thresholds. A true rate that settles inside the
band never
+ * produces a new record, so a segment can sustain up to {@code threshold}
beyond a split or
+ * merge threshold indefinitely without the controller seeing it. This is
path-dependent
+ * (whether a given load triggers depends on what was last written) but
one-directional: it
+ * can only delay an action, never cause a spurious one, and its magnitude is
bounded by the
+ * configured threshold. Accepted as the cost of bounded write volume;
operators wanting
+ * tighter tracking lower {@code scalableTopicLoadReportRateChangeThreshold}.
+ *
+ * <p>This class owns only the materiality decision and the last-written
cache. Sampling the
+ * live {@code TopicStats} and scheduling the periodic sweep are wired in by
the broker.
+ */
+public class SegmentLoadReporter {
+
+ private final ScalableTopicResources resources;
+ /** Re-read on every sample so the broker config knob is honored
dynamically. */
+ private final DoubleSupplier rateChangeThreshold;
+
+ /** Last value written per load-record path, so we can skip immaterial
updates. */
+ private final ConcurrentHashMap<String, SegmentLoadStats> lastWritten =
new ConcurrentHashMap<>();
+
+ public SegmentLoadReporter(ScalableTopicResources resources,
DoubleSupplier rateChangeThreshold) {
+ this.resources = resources;
+ this.rateChangeThreshold = rateChangeThreshold;
+ }
+
+ public SegmentLoadReporter(ScalableTopicResources resources, double
rateChangeThreshold) {
+ this(resources, () -> rateChangeThreshold);
+ }
+
+ /**
+ * Report a segment's current load, writing to the store only if it
changed materially
+ * since the last write (or has never been written).
+ *
+ * <p>On a local cache miss (broker restart, or segment ownership just
moved here) the
+ * baseline is seeded from the record already in the store, and the
materiality gate is
+ * applied against that. Without this, the first sample after every
ownership move would
+ * write unconditionally and reset the record's modification time — which
the controller
+ * uses as "cold since" for the merge window — starving merges under
frequent rebalancing.
+ *
+ * @return a future completing with {@code true} if a write happened,
{@code false} if the
+ * sample was immaterial and skipped
+ */
+ public CompletableFuture<Boolean> reportIfChanged(TopicName topic, long
segmentId,
+ SegmentLoadStats
current) {
+ String path = resources.segmentLoadPath(topic, segmentId);
+ double threshold = rateChangeThreshold.getAsDouble();
+ SegmentLoadStats last = lastWritten.get(path);
+ if (last == null) {
+ return resources.getSegmentLoadAsync(topic,
segmentId).thenCompose(stored -> {
+ stored.ifPresent(result -> lastWritten.putIfAbsent(path,
result.getValue()));
+ SegmentLoadStats baseline = lastWritten.get(path);
+ if (baseline != null && !isMaterialChange(baseline, current,
threshold)) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return write(topic, segmentId, path, current);
+ });
+ }
+ if (!isMaterialChange(last, current, threshold)) {
+ return CompletableFuture.completedFuture(false);
+ }
+ return write(topic, segmentId, path, current);
+ }
+
+ private CompletableFuture<Boolean> write(TopicName topic, long segmentId,
String path,
+ SegmentLoadStats current) {
+ return resources.reportSegmentLoadAsync(topic, segmentId, current)
+ .thenApply(__ -> {
+ lastWritten.put(path, current);
+ return true;
+ });
+ }
+
+ /**
+ * Drop the cached last-written value for a segment — call when this
broker stops owning
+ * the segment topic (unload, seal, or delete) so the cache doesn't grow
unboundedly with
+ * segment churn. A later re-acquire re-seeds the baseline from the stored
record.
+ */
+ public void forget(TopicName topic, long segmentId) {
+ lastWritten.remove(resources.segmentLoadPath(topic, segmentId));
+ }
+
+ /**
+ * True if any of the four rates changed by more than {@code threshold}
(relative), or
+ * crossed to/from zero.
+ */
+ static boolean isMaterialChange(SegmentLoadStats last, SegmentLoadStats
current,
+ double threshold) {
+ return changed(last.msgRateIn(), current.msgRateIn(), threshold)
+ || changed(last.bytesRateIn(), current.bytesRateIn(),
threshold)
+ || changed(last.msgRateOut(), current.msgRateOut(), threshold)
+ || changed(last.bytesRateOut(), current.bytesRateOut(),
threshold);
+ }
+
+ private static boolean changed(double last, double current, double
threshold) {
+ if (last == 0.0) {
+ // Any move off zero (a segment going from idle to active) is
always material;
+ // staying at zero is not.
+ return current != 0.0;
+ }
+ return Math.abs(current - last) / last > threshold;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java
new file mode 100644
index 00000000000..4fb21d0c954
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLoadSample.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+
+/**
+ * A segment's load record as the controller sees it: the persisted {@link
SegmentLoadStats}
+ * plus the metadata store's last-modified timestamp for the record (PIP-483).
+ *
+ * <p>This is an in-memory evaluator input, never persisted — the timestamp
comes from the
+ * metadata {@code Stat}, not from the stored value. {@code modifiedAtMs} is
what the merge
+ * pass uses to require a segment has stayed cold for at least {@code
mergeWindow}.
+ *
+ * @param stats the persisted rates
+ * @param modifiedAtMs metadata-store last-modified time of the load record,
in epoch millis
+ */
+public record SegmentLoadSample(SegmentLoadStats stats, long modifiedAtMs) {
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
new file mode 100644
index 00000000000..acc104c2773
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+public class AutoScaleConfigTest {
+
+ @Test
+ public void testDefaultsMatchBrokerConfig() {
+ AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(new
ServiceConfiguration());
+ assertTrue(c.enabled());
+ assertEquals(c.maxSegments(), 64);
+ assertEquals(c.minSegments(), 1);
+ assertEquals(c.maxDagDepth(), 10);
+ assertEquals(c.splitCooldown(), Duration.ofSeconds(60));
+ assertEquals(c.mergeCooldown(), Duration.ofMinutes(5));
+ assertEquals(c.mergeWindow(), Duration.ofMinutes(5));
+ assertEquals(c.splitMsgRateIn(), 10_000.0);
+ assertEquals(c.splitBytesRateIn(), 50_000_000.0);
+ assertEquals(c.splitMsgRateOut(), 50_000.0);
+ assertEquals(c.splitBytesRateOut(), 250_000_000.0);
+ assertEquals(c.mergeMsgRateIn(), 1_000.0);
+ assertEquals(c.mergeBytesRateIn(), 5_000_000.0);
+ assertEquals(c.mergeMsgRateOut(), 5_000.0);
+ assertEquals(c.mergeBytesRateOut(), 25_000_000.0);
+
+ // Split thresholds must sit strictly above the corresponding merge
thresholds
+ // (the hysteresis dead-band).
+ assertTrue(c.splitMsgRateIn() > c.mergeMsgRateIn());
+ assertTrue(c.splitBytesRateIn() > c.mergeBytesRateIn());
+ assertTrue(c.splitMsgRateOut() > c.mergeMsgRateOut());
+ assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut());
+ }
+
+ @Test
+ public void testValidationRejectsBadConfig() {
+ // Zero split threshold: the evaluator scores rate/threshold, so 0
would yield
+ // Infinity (or NaN for a zero rate) — must be rejected at resolution
time.
+ ServiceConfiguration zeroSplit = new ServiceConfiguration();
+ zeroSplit.setScalableTopicSplitMsgRateInThreshold(0);
+ assertThrows(IllegalArgumentException.class,
+ () -> AutoScaleConfig.fromBrokerConfig(zeroSplit));
+
+ // Hysteresis inversion: merge threshold at/above the split threshold.
+ ServiceConfiguration inverted = new ServiceConfiguration();
+ inverted.setScalableTopicMergeMsgRateInThreshold(
+ inverted.getScalableTopicSplitMsgRateInThreshold());
+ assertThrows(IllegalArgumentException.class,
+ () -> AutoScaleConfig.fromBrokerConfig(inverted));
+
+ // min/max segment inversion.
+ ServiceConfiguration minOverMax = new ServiceConfiguration();
+ minOverMax.setScalableTopicMinSegments(10);
+ minOverMax.setScalableTopicMaxSegments(5);
+ assertThrows(IllegalArgumentException.class,
+ () -> AutoScaleConfig.fromBrokerConfig(minOverMax));
+
+ // Negative cooldown.
+ ServiceConfiguration negativeCooldown = new ServiceConfiguration();
+ negativeCooldown.setScalableTopicSplitCooldownSeconds(-1);
+ assertThrows(IllegalArgumentException.class,
+ () -> AutoScaleConfig.fromBrokerConfig(negativeCooldown));
+ }
+
+ @Test
+ public void testZeroMergeThresholdsAllowedAsMergeDisable() {
+ // Merge thresholds of 0 are a legitimate "never merge" setting: no
rate is ever
+ // strictly below 0, so segments never qualify as cold. Must validate
cleanly.
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setScalableTopicMergeMsgRateInThreshold(0);
+ conf.setScalableTopicMergeBytesRateInThreshold(0L);
+ conf.setScalableTopicMergeMsgRateOutThreshold(0);
+ conf.setScalableTopicMergeBytesRateOutThreshold(0L);
+ AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(conf);
+ assertEquals(c.mergeMsgRateIn(), 0.0);
+ }
+
+ @Test
+ public void testOverriddenBrokerConfigPropagates() {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setScalableTopicAutoScaleEnabled(false);
+ conf.setScalableTopicMaxSegments(8);
+ conf.setScalableTopicMinSegments(2);
+ conf.setScalableTopicMaxDagDepth(3);
+ conf.setScalableTopicSplitCooldownSeconds(30);
+ conf.setScalableTopicMergeCooldownSeconds(120);
+ conf.setScalableTopicMergeWindowSeconds(90);
+ conf.setScalableTopicSplitMsgRateInThreshold(1234);
+ conf.setScalableTopicMergeBytesRateOutThreshold(99L);
+
+ AutoScaleConfig c = AutoScaleConfig.fromBrokerConfig(conf);
+ assertFalse(c.enabled());
+ assertEquals(c.maxSegments(), 8);
+ assertEquals(c.minSegments(), 2);
+ assertEquals(c.maxDagDepth(), 3);
+ assertEquals(c.splitCooldown(), Duration.ofSeconds(30));
+ assertEquals(c.mergeCooldown(), Duration.ofSeconds(120));
+ assertEquals(c.mergeWindow(), Duration.ofSeconds(90));
+ assertEquals(c.splitMsgRateIn(), 1234.0);
+ assertEquals(c.mergeBytesRateOut(), 99.0);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java
new file mode 100644
index 00000000000..b85845de647
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScalePolicyEvaluatorTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for the pure {@link AutoScalePolicyEvaluator} decision function
(PIP-483).
+ * All inputs are constructed in-memory; there is no broker or metadata store.
+ */
+public class AutoScalePolicyEvaluatorTest {
+
+ private static final long NOW = 1_700_000_000_000L;
+ private static final long NO_PRIOR = Long.MIN_VALUE;
+
+ // Split thresholds well above merge thresholds — the dead-band is the
hysteresis.
+ private static final double SPLIT_MSG_IN = 10_000;
+ private static final double SPLIT_BYTES_IN = 50_000_000;
+ private static final double SPLIT_MSG_OUT = 50_000;
+ private static final double SPLIT_BYTES_OUT = 250_000_000;
+ private static final double MERGE_MSG_IN = 1_000;
+ private static final double MERGE_BYTES_IN = 5_000_000;
+ private static final double MERGE_MSG_OUT = 5_000;
+ private static final double MERGE_BYTES_OUT = 25_000_000;
+
+ private static AutoScaleConfig.AutoScaleConfigBuilder baseConfig() {
+ return AutoScaleConfig.builder()
+ .enabled(true)
+ .maxSegments(64)
+ .minSegments(1)
+ .maxDagDepth(10)
+ .splitCooldown(Duration.ofMinutes(1))
+ .mergeCooldown(Duration.ofMinutes(5))
+ .mergeWindow(Duration.ofMinutes(5))
+ .splitMsgRateIn(SPLIT_MSG_IN)
+ .splitBytesRateIn(SPLIT_BYTES_IN)
+ .splitMsgRateOut(SPLIT_MSG_OUT)
+ .splitBytesRateOut(SPLIT_BYTES_OUT)
+ .mergeMsgRateIn(MERGE_MSG_IN)
+ .mergeBytesRateIn(MERGE_BYTES_IN)
+ .mergeMsgRateOut(MERGE_MSG_OUT)
+ .mergeBytesRateOut(MERGE_BYTES_OUT);
+ }
+
+ private static SegmentLayout initialLayout(int segments) {
+ return SegmentLayout.fromMetadata(
+ ScalableTopicController.createInitialMetadata(segments,
Map.of()));
+ }
+
+ /** A load sample with the given rates, last modified {@code ageMs} ago. */
+ private static SegmentLoadSample sample(double msgIn, double bytesIn,
double msgOut,
+ double bytesOut, long ageMs) {
+ return new SegmentLoadSample(
+ new SegmentLoadStats(msgIn, bytesIn, msgOut, bytesOut), NOW -
ageMs);
+ }
+
+ private static SegmentLoadSample cold(long ageMs) {
+ return sample(0, 0, 0, 0, ageMs);
+ }
+
+ private static long old() {
+ return Duration.ofMinutes(10).toMillis();
+ }
+
+ private static AutoScaleDecision decide(SegmentLayout layout,
+ Map<Long, SegmentLoadSample> load,
+ Map<String, Integer> consumers,
+ AutoScaleConfig config) {
+ return AutoScalePolicyEvaluator.decide(layout, load, consumers,
config, NOW,
+ NO_PRIOR, NO_PRIOR);
+ }
+
+ // --- enable switch ---
+
+ @Test
+ public void testDisabledReturnsNoAction() {
+ SegmentLayout layout = initialLayout(1);
+ Map<Long, SegmentLoadSample> load = Map.of(0L, sample(1_000_000, 0, 0,
0, old()));
+ AutoScaleDecision d = decide(layout, load, Map.of("s", 100),
+ baseConfig().enabled(false).build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction);
+ }
+
+ // --- consumer-driven split ---
+
+ @Test
+ public void testConsumerDrivenSplitTargetsBusiestSegment() {
+ SegmentLayout layout = initialLayout(2);
+ Map<Long, SegmentLoadSample> load = Map.of(
+ 0L, sample(100, 0, 0, 0, old()),
+ 1L, sample(200, 0, 0, 0, old()));
+ // One subscription with 3 consumers but only 2 segments → need a 3rd
segment.
+ AutoScaleDecision d = decide(layout, load, Map.of("sub", 3),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+ AutoScaleDecision.Split s = (AutoScaleDecision.Split) d;
+ assertEquals(s.segmentId(), 1L, "should split the busiest segment by
msgRateIn");
+ assertEquals(s.reason(), "consumer-count");
+ }
+
+ @Test
+ public void testConsumerCountUsesPerSubscriptionMaxNotSum() {
+ SegmentLayout layout = initialLayout(2);
+ // Fresh samples (age 0) so the merge pass can't fire — isolates the
consumer check.
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(0), 1L, cold(0));
+ // Two subscriptions, 2 consumers each. Per-subscription max is 2, not
4 → 2 == 2
+ // segments, no scale-up needed.
+ AutoScaleDecision d = decide(layout, load, Map.of("a", 2, "b", 2),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction,
+ "per-subscription max (2) does not exceed 2 segments");
+ }
+
+ @Test
+ public void testConsumerDrivenSplitRespectsMaxSegments() {
+ SegmentLayout layout = initialLayout(2);
+ // Fresh samples so the merge pass can't fire — isolates the split
suppression.
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(0), 1L, cold(0));
+ AutoScaleDecision d = decide(layout, load, Map.of("sub", 5),
+ baseConfig().maxSegments(2).build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction, "at maxSegments,
no split");
+ }
+
+ @Test
+ public void testConsumerDrivenSplitRespectsSplitCooldown() {
+ SegmentLayout layout = initialLayout(2);
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(0), 1L, cold(0));
+ long recentSplit = NOW - Duration.ofSeconds(30).toMillis(); // < 1m
cooldown
+ AutoScaleDecision d = AutoScalePolicyEvaluator.decide(layout, load,
Map.of("sub", 3),
+ baseConfig().build(), NOW, recentSplit, NO_PRIOR);
+ assertTrue(d instanceof AutoScaleDecision.NoAction, "within split
cooldown, no split");
+ }
+
+ // --- load-driven split ---
+
+ @Test
+ public void testLoadDrivenSplitOnMsgRateIn() {
+ SegmentLayout layout = initialLayout(1);
+ Map<Long, SegmentLoadSample> load = Map.of(0L, sample(SPLIT_MSG_IN +
1, 0, 0, 0, old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+ AutoScaleDecision.Split s = (AutoScaleDecision.Split) d;
+ assertEquals(s.segmentId(), 0L);
+ assertEquals(s.reason(), "msgRateIn");
+ }
+
+ @Test
+ public void testLoadDrivenSplitOnBytesRateOut() {
+ SegmentLayout layout = initialLayout(1);
+ Map<Long, SegmentLoadSample> load =
+ Map.of(0L, sample(0, 0, 0, SPLIT_BYTES_OUT + 1, old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+ assertEquals(((AutoScaleDecision.Split) d).reason(), "bytesRateOut");
+ }
+
+ @Test
+ public void testLoadDrivenSplitPicksMostOverloaded() {
+ SegmentLayout layout = initialLayout(2);
+ // seg0 at 1.1x, seg1 at 1.5x of the msgRateIn split threshold.
+ Map<Long, SegmentLoadSample> load = Map.of(
+ 0L, sample(SPLIT_MSG_IN * 1.1, 0, 0, 0, old()),
+ 1L, sample(SPLIT_MSG_IN * 1.5, 0, 0, 0, old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Split, d.toString());
+ assertEquals(((AutoScaleDecision.Split) d).segmentId(), 1L,
+ "should split the more overloaded segment");
+ }
+
+ @Test
+ public void testNoSplitWhenAllUnderThreshold() {
+ SegmentLayout layout = initialLayout(1);
+ // Just below every split threshold, freshly updated → not
merge-eligible either.
+ Map<Long, SegmentLoadSample> load = Map.of(0L,
+ sample(SPLIT_MSG_IN - 1, SPLIT_BYTES_IN - 1, SPLIT_MSG_OUT - 1,
+ SPLIT_BYTES_OUT - 1, 0));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction);
+ }
+
+ // --- merge ---
+
+ @Test
+ public void testMergeColdAdjacentPair() {
+ SegmentLayout layout = initialLayout(2);
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(old()), 1L,
cold(old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Merge, d.toString());
+ AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d;
+ assertTrue((m.segmentId1() == 0L && m.segmentId2() == 1L)
+ || (m.segmentId1() == 1L && m.segmentId2() == 0L));
+ }
+
+ @Test
+ public void testMergeRespectsMinSegments() {
+ SegmentLayout layout = initialLayout(2);
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(old()), 1L,
cold(old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().minSegments(2).build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction, "at minSegments,
no merge");
+ }
+
+ @Test
+ public void testMergeRespectsMergeCooldown() {
+ SegmentLayout layout = initialLayout(2);
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(old()), 1L,
cold(old()));
+ long recentMerge = NOW - Duration.ofMinutes(1).toMillis(); // < 5m
cooldown
+ AutoScaleDecision d = AutoScalePolicyEvaluator.decide(layout, load,
Map.of(),
+ baseConfig().build(), NOW, NO_PRIOR, recentMerge);
+ assertTrue(d instanceof AutoScaleDecision.NoAction, "within merge
cooldown, no merge");
+ }
+
+ @Test
+ public void testMergeRequiresColdForFullWindow() {
+ SegmentLayout layout = initialLayout(2);
+ // Cold values, but only updated 1 minute ago — window is 5 minutes.
+ long recent = Duration.ofMinutes(1).toMillis();
+ Map<Long, SegmentLoadSample> load = Map.of(0L, cold(recent), 1L,
cold(recent));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction,
+ "segment must stay cold for the full mergeWindow");
+ }
+
+ @Test
+ public void testMergeRequiresLoadRecordPresent() {
+ SegmentLayout layout = initialLayout(2);
+ // No load records at all → no evidence of durable coldness → never
merge.
+ AutoScaleDecision d = decide(layout, new HashMap<>(), Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction);
+ }
+
+ @Test
+ public void testHysteresisDeadBandNoSplitNoMerge() {
+ SegmentLayout layout = initialLayout(2);
+ // msgRateIn sits between the merge threshold and the split threshold
for seg0:
+ // not hot enough to split, not cold enough to merge.
+ Map<Long, SegmentLoadSample> load = Map.of(
+ 0L, sample(MERGE_MSG_IN + 1, 0, 0, 0, old()),
+ 1L, cold(old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.NoAction,
+ "in the dead-band, neither split nor merge");
+ }
+
+ @Test
+ public void testSplitTakesPriorityOverMerge() {
+ SegmentLayout layout = initialLayout(2);
+ // seg0 hot (would split), seg0+seg1 also a cold-ish adjacent pair —
split wins.
+ Map<Long, SegmentLoadSample> load = Map.of(
+ 0L, sample(SPLIT_MSG_IN + 1, 0, 0, 0, old()),
+ 1L, cold(old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Split, "split has priority");
+ }
+
+ @Test
+ public void testMergeRespectsMaxDagDepth() {
+ // Build a layout whose two active segments each already have merge
depth 1:
+ // split(0) → {1,2}; merge(1,2) → {3}; split(3) → {4,5}. Segments 4
and 5 each have
+ // one merge (node 3) in their ancestry.
+ SegmentLayout layout = initialLayout(1)
+ .splitSegment(0, NOW)
+ .mergeSegments(1, 2, NOW)
+ .splitSegment(3, NOW);
+ List<Long> active =
layout.getActiveSegments().keySet().stream().sorted().toList();
+ assertEquals(active, List.of(4L, 5L));
+ assertEquals(layout.mergeDepth(4L), 1);
+ assertEquals(layout.mergeDepth(5L), 1);
+
+ Map<Long, SegmentLoadSample> load = Map.of(4L, cold(old()), 5L,
cold(old()));
+
+ // maxDagDepth=1: both at the cap → no merge.
+ AutoScaleDecision blocked = decide(layout, load, Map.of(),
+ baseConfig().maxDagDepth(1).build());
+ assertTrue(blocked instanceof AutoScaleDecision.NoAction, "merge
blocked at maxDagDepth");
+
+ // maxDagDepth=2: under the cap → merge allowed.
+ AutoScaleDecision allowed = decide(layout, load, Map.of(),
+ baseConfig().maxDagDepth(2).build());
+ assertTrue(allowed instanceof AutoScaleDecision.Merge, "merge allowed
below maxDagDepth");
+ }
+
+ @Test
+ public void testMergePicksColdestAdjacentPair() {
+ SegmentLayout layout = initialLayout(4); // ranges tile [0,MAX] in 4
adjacent quarters
+ // seg0+seg1 combined hotter than seg2+seg3; all below merge
thresholds and old.
+ Map<Long, SegmentLoadSample> load = Map.of(
+ 0L, sample(900, 0, 0, 0, old()),
+ 1L, sample(900, 0, 0, 0, old()),
+ 2L, sample(10, 0, 0, 0, old()),
+ 3L, sample(10, 0, 0, 0, old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Merge, d.toString());
+ AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d;
+ // The coldest adjacent pair is {2,3}.
+ assertTrue((m.segmentId1() == 2L && m.segmentId2() == 3L)
+ || (m.segmentId1() == 3L && m.segmentId2() == 2L),
+ "should pick the coldest adjacent pair {2,3}, got " + m);
+ }
+
+ @Test
+ public void testMergedPairIsAlwaysAdjacent() {
+ SegmentLayout layout = initialLayout(4);
+ Map<Long, SegmentLoadSample> load = Map.of(
+ 0L, cold(old()), 1L, cold(old()), 2L, cold(old()), 3L,
cold(old()));
+ AutoScaleDecision d = decide(layout, load, Map.of(),
baseConfig().build());
+ assertTrue(d instanceof AutoScaleDecision.Merge, d.toString());
+ AutoScaleDecision.Merge m = (AutoScaleDecision.Merge) d;
+ SegmentInfo a = layout.getAllSegments().get(m.segmentId1());
+ SegmentInfo b = layout.getAllSegments().get(m.segmentId2());
+ assertTrue(a.hashRange().isAdjacentTo(b.hashRange()),
+ "merged pair must be hash-range adjacent");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
new file mode 100644
index 00000000000..74ad5e8a5a3
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.ScalableTopics;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for the controller's auto split/merge wiring (PIP-483):
the periodic /
+ * event-driven evaluation reads load records + consumer counts, runs the
evaluator, and
+ * dispatches to the real splitSegment / mergeSegments paths (against an
in-memory metadata
+ * store and a mocked cross-broker admin client). The decision logic itself is
unit-tested in
+ * {@link AutoScalePolicyEvaluatorTest}; here we verify the plumbing actually
fires.
+ */
+public class ScalableTopicControllerAutoScaleTest {
+
+ private static final String BROKER_ID = "broker-test";
+
+ private MetadataStoreExtended store;
+ private CoordinationService coordinationService;
+ private ScalableTopicResources resources;
+ private ScheduledExecutorService scheduler;
+ private BrokerService brokerService;
+ private PulsarService pulsar;
+ private ServiceConfiguration config;
+ private ScalableTopics scalableTopics;
+ private ScalableTopicController controller;
+ private TopicName topicName;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = new LocalMemoryMetadataStore("memory:local",
MetadataStoreConfig.builder().build());
+ coordinationService = new CoordinationServiceImpl(store);
+ resources = new ScalableTopicResources(store, 30);
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ topicName = TopicName.get("topic://tenant/ns/my-topic");
+
+ // Auto-scale tuned for deterministic single-shot evaluation: no
cooldowns/windows so a
+ // single evaluateAutoScaleForTest() call acts immediately, low-ish
thresholds.
+ config = new ServiceConfiguration();
+ config.setScalableTopicAutoScaleEnabled(true);
+ config.setScalableTopicMaxSegments(64);
+ config.setScalableTopicMinSegments(1);
+ config.setScalableTopicSplitCooldownSeconds(0);
+ config.setScalableTopicMergeCooldownSeconds(0);
+ config.setScalableTopicMergeWindowSeconds(0);
+ config.setScalableTopicSplitMsgRateInThreshold(10_000);
+
+ brokerService = mock(BrokerService.class);
+ pulsar = mock(PulsarService.class);
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ Topics topics = mock(Topics.class);
+ scalableTopics = mock(ScalableTopics.class);
+
+ when(brokerService.getPulsar()).thenReturn(pulsar);
+ when(brokerService.getTopicIfExists(anyString()))
+
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+ when(pulsar.getBrokerId()).thenReturn(BROKER_ID);
+ when(pulsar.getExecutor()).thenReturn(scheduler);
+ when(pulsar.getConfig()).thenReturn(config);
+ when(pulsar.getAdminClient()).thenReturn(admin);
+ when(admin.topics()).thenReturn(topics);
+ when(admin.scalableTopics()).thenReturn(scalableTopics);
+ when(scalableTopics.createSegmentAsync(anyString(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(scalableTopics.terminateSegmentAsync(anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (controller != null) {
+ controller.close().join();
+ }
+ if (coordinationService != null) {
+ coordinationService.close();
+ }
+ if (store != null) {
+ store.close();
+ }
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ }
+ }
+
+ private void startController(int initialSegments) throws Exception {
+ resources.createScalableTopicAsync(topicName,
+ ScalableTopicController.createInitialMetadata(initialSegments,
Map.of())).get();
+ controller = new ScalableTopicController(topicName, resources,
brokerService,
+ coordinationService);
+ controller.initialize().get();
+ }
+
+ private int activeSegmentCount() throws Exception {
+ return controller.getLayout().get().getActiveSegments().size();
+ }
+
+ @Test
+ public void testLoadDrivenSplit() throws Exception {
+ startController(2);
+ assertEquals(activeSegmentCount(), 2);
+
+ // Segment 0 is hot on msgRateIn → expect a split.
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "hot segment should have been
split");
+ }
+
+ @Test
+ public void testNoSplitWhenUnderThreshold() throws Exception {
+ startController(2);
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(100, 0, 0, 0)).get();
+
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 2, "no segment over threshold → no
change");
+ }
+
+ @Test
+ public void testDisabledConfigIsNoOp() throws Exception {
+ config.setScalableTopicAutoScaleEnabled(false);
+ startController(2);
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(1_000_000, 0, 0, 0)).get();
+
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 2, "disabled → no action even when
hot");
+ }
+
+ @Test
+ public void testColdSegmentsMerge() throws Exception {
+ startController(4);
+ // All segments cold (no load records written → treated as zero; but
merge requires a
+ // record present, so write explicit cold records). mergeWindow=0 so
they're eligible.
+ for (long id = 0; id < 4; id++) {
+ resources.reportSegmentLoadAsync(topicName, id, new
SegmentLoadStats(1, 0, 0, 0)).get();
+ }
+
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "a cold adjacent pair should
have been merged");
+ }
+
+ @Test
+ public void testConsumerDrivenSplit() throws Exception {
+ startController(1);
+ assertEquals(activeSegmentCount(), 1);
+
+ // Two consumers on one segment → need a second segment.
registerConsumer fires an
+ // event-driven evaluation (fire-and-forget); await its effect.
+ controller.registerConsumer("sub", "c1", 1L,
ScalableConsumerType.STREAM,
+ mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub", "c2", 2L,
ScalableConsumerType.STREAM,
+ mock(TransportCnx.class)).get();
+
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(
+ () -> assertEquals(activeSegmentCount(), 2,
+ "2 consumers on 1 segment should drive a split"));
+ }
+
+ @Test
+ public void testConsumerBurstConvergesWithoutTicks() throws Exception {
+ // A group of consumers joining in quick succession must converge to
one segment
+ // each purely from the event-driven evaluations + post-split
follow-up chain — no
+ // periodic tick and no manual evaluation calls.
+ startController(1);
+ for (int i = 1; i <= 4; i++) {
+ controller.registerConsumer("sub", "c" + i, i,
ScalableConsumerType.STREAM,
+ mock(TransportCnx.class)).get();
+ }
+ Awaitility.await().atMost(Duration.ofSeconds(20)).untilAsserted(
+ () -> assertEquals(activeSegmentCount(), 4,
+ "4 consumers must drive convergence to 4 segments"));
+ }
+
+ @Test
+ public void testSplitCooldownBlocksSecondSplit() throws Exception {
+ config.setScalableTopicSplitCooldownSeconds(3600); // 1h — blocks a
second split
+ startController(2);
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "first split happens");
+
+ // Still hot, but within cooldown → no second split.
+ resources.reportSegmentLoadAsync(topicName, 1,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "second split blocked by
cooldown");
+ }
+
+ @Test
+ public void testSplitCooldownSurvivesLeaderFailover() throws Exception {
+ config.setScalableTopicSplitCooldownSeconds(3600);
+ startController(2);
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "first split happens");
+
+ // Leadership moves: close this controller and elect a fresh one. The
new leader's
+ // in-memory cooldown clocks must be re-seeded from the layout (the
children's
+ // createdAtMs records when the last split ran), not reset to "never".
+ controller.close().join();
+ controller = new ScalableTopicController(topicName, resources,
brokerService,
+ coordinationService);
+ controller.initialize().get();
+
+ resources.reportSegmentLoadAsync(topicName, 1,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3,
+ "split cooldown must survive failover via layout-derived
seeding");
+ }
+
+ @Test
+ public void testFailedSplitDoesNotBurnCooldown() throws Exception {
+ config.setScalableTopicSplitCooldownSeconds(3600);
+ startController(2);
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+
+ // First attempt fails at the segment-topic-creation step. The
evaluation future
+ // surfaces the failure (the production tick wrapper logs-and-swallows
it).
+ when(scalableTopics.createSegmentAsync(anyString(), any()))
+ .thenReturn(CompletableFuture.failedFuture(new
RuntimeException("injected")));
+ assertThrows(java.util.concurrent.ExecutionException.class,
+ () -> controller.evaluateAutoScaleForTest().get());
+ assertEquals(activeSegmentCount(), 2, "failed split leaves the layout
unchanged");
+
+ // The failure must not have started the cooldown: once the transient
error clears,
+ // the next evaluation splits immediately instead of waiting out the
hour.
+ when(scalableTopics.createSegmentAsync(anyString(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "retry after a failed split is
not cooldown-blocked");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 4a0cca1eb78..dccd8d0e307 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -693,6 +693,10 @@ public class ScalableTopicControllerTest {
controller.splitSegment(0).get();
assertEquals(controller.sealedSegmentCount(), sealedBefore + 1);
+ // Give the doomed segment a load record, as the owning broker's
reporter would.
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new org.apache.pulsar.common.scalable.SegmentLoadStats(1, 1,
1, 1)).get();
+
// Tick at the seal time — retention not yet elapsed; nothing pruned.
controller.runGcTickAsync().get();
assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L),
@@ -705,6 +709,9 @@ public class ScalableTopicControllerTest {
"tick past retention must prune the sealed segment");
// Backing topic delete was issued via the segment-aware admin call.
verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean());
+ // The pruned segment's load record is deleted along with it.
+ assertFalse(resources.getSegmentLoadAsync(topicName,
0).get().isPresent(),
+ "prune must delete the segment's load record");
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
index 47271c2dc02..8ece3651e5a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
@@ -327,9 +327,20 @@ public class ScalableTopicServiceTest {
service.createScalableTopic(tn, 2).get();
assertTrue(resources.scalableTopicExistsAsync(tn).get());
+ // Seed child records under the topic — a subscription and a
per-segment load
+ // record — to verify the delete is recursive and takes everything
with it.
+ resources.createSubscriptionAsync(tn, "sub-a",
+
org.apache.pulsar.broker.resources.SubscriptionType.STREAM).get();
+ resources.reportSegmentLoadAsync(tn, 0,
+ new org.apache.pulsar.common.scalable.SegmentLoadStats(1, 1,
1, 1)).get();
+
service.deleteScalableTopic(tn).get();
assertFalse(resources.scalableTopicExistsAsync(tn).get());
+ assertFalse(resources.subscriptionExistsAsync(tn, "sub-a").get(),
+ "topic delete must remove subscription records");
+ assertFalse(resources.getSegmentLoadAsync(tn, 0).get().isPresent(),
+ "topic delete must remove segment load records");
verify(scalableTopicsAdmin, org.mockito.Mockito.atLeast(2))
.deleteSegmentAsync(anyString(), anyBoolean());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
index b936820072f..364c9299a11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -291,4 +291,37 @@ public class SegmentLayoutTest {
SegmentLayout split2 = split1.splitSegment(1, 0L);
assertEquals(split2.getNextSegmentId(), 6);
}
+
+ @Test
+ public void testMergeDepthZeroForNeverMergedSegments() {
+ SegmentLayout layout = SegmentLayout.fromMetadata(
+ ScalableTopicController.createInitialMetadata(2, Map.of()));
+ assertEquals(layout.mergeDepth(0), 0);
+ assertEquals(layout.mergeDepth(1), 0);
+
+ // Splits never increase merge depth.
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
+ assertEquals(afterSplit.mergeDepth(2), 0);
+ assertEquals(afterSplit.mergeDepth(3), 0);
+ }
+
+ @Test
+ public void testMergeDepthCountsMergesInLineage() {
+ // split(0) → {1,2}; merge(1,2) → {3}; split(3) → {4,5}.
+ SegmentLayout layout = SegmentLayout
+ .fromMetadata(ScalableTopicController.createInitialMetadata(1,
Map.of()))
+ .splitSegment(0, 0L)
+ .mergeSegments(1, 2, 0L)
+ .splitSegment(3, 0L);
+
+ // The merged node 3 contributes one merge to the ancestry of 4 and 5.
+ assertEquals(layout.mergeDepth(3), 1);
+ assertEquals(layout.mergeDepth(4), 1);
+ assertEquals(layout.mergeDepth(5), 1);
+
+ // A second merge in the lineage bumps it to 2.
+ SegmentLayout twice = layout.mergeSegments(4, 5, 0L);
+ long mergedAgain = twice.getNextSegmentId() - 1;
+ assertEquals(twice.mergeDepth(mergedAgain), 2);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java
new file mode 100644
index 00000000000..d35c074105e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLoadReporterTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentLoadStats;
+import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SegmentLoadReporterTest {
+
+ private static final double THRESHOLD = 0.25;
+ private static final TopicName TOPIC =
TopicName.get("topic://tenant/ns/my-topic");
+
+ private MetadataStoreExtended store;
+ private ScalableTopicResources resources;
+ private SegmentLoadReporter reporter;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = new LocalMemoryMetadataStore("memory:local",
MetadataStoreConfig.builder().build());
+ resources = new ScalableTopicResources(store, 30);
+ reporter = new SegmentLoadReporter(resources, THRESHOLD);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ private static SegmentLoadStats stats(double msgIn) {
+ return new SegmentLoadStats(msgIn, 0, 0, 0);
+ }
+
+ // --- isMaterialChange (pure) ---
+
+ @Test
+ public void testMaterialChangeRelativeThreshold() {
+ // +30% on msgRateIn exceeds the 25% threshold.
+ assertTrue(SegmentLoadReporter.isMaterialChange(stats(1000),
stats(1300), THRESHOLD));
+ // +10% does not.
+ assertFalse(SegmentLoadReporter.isMaterialChange(stats(1000),
stats(1100), THRESHOLD));
+ // -30% (drop) is symmetric and material.
+ assertTrue(SegmentLoadReporter.isMaterialChange(stats(1000),
stats(700), THRESHOLD));
+ }
+
+ @Test
+ public void testMaterialChangeCrossingZero() {
+ assertTrue(SegmentLoadReporter.isMaterialChange(stats(0), stats(1),
THRESHOLD));
+ assertTrue(SegmentLoadReporter.isMaterialChange(stats(1), stats(0),
THRESHOLD));
+ assertFalse(SegmentLoadReporter.isMaterialChange(stats(0), stats(0),
THRESHOLD));
+ }
+
+ @Test
+ public void testMaterialChangeAnyMetric() {
+ SegmentLoadStats last = new SegmentLoadStats(1000, 1000, 1000, 1000);
+ // Only bytesRateOut moves materially; still counts.
+ SegmentLoadStats current = new SegmentLoadStats(1000, 1000, 1000,
2000);
+ assertTrue(SegmentLoadReporter.isMaterialChange(last, current,
THRESHOLD));
+ }
+
+ // --- reportIfChanged (against an in-memory store) ---
+
+ @Test
+ public void testFirstReportAlwaysWrites() throws Exception {
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(500)).get());
+ Optional<CacheGetResult<SegmentLoadStats>> got =
+ resources.getSegmentLoadAsync(TOPIC, 0).get();
+ assertTrue(got.isPresent());
+ assertEquals(got.get().getValue().msgRateIn(), 500.0);
+ }
+
+ @Test
+ public void testImmaterialSecondReportSkipped() throws Exception {
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+ long firstModified = resources.getSegmentLoadAsync(TOPIC, 0).get()
+ .get().getStat().getModificationTimestamp();
+
+ // +10% is immaterial → no write → stored value and timestamp
unchanged.
+ assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1100)).get());
+ CacheGetResult<SegmentLoadStats> after =
resources.getSegmentLoadAsync(TOPIC, 0).get().get();
+ assertEquals(after.getValue().msgRateIn(), 1000.0);
+ assertEquals(after.getStat().getModificationTimestamp(),
firstModified);
+ }
+
+ @Test
+ public void testMaterialSecondReportWrites() throws Exception {
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+ // +50% is material → write.
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1500)).get());
+ assertEquals(resources.getSegmentLoadAsync(TOPIC,
0).get().get().getValue().msgRateIn(),
+ 1500.0);
+ }
+
+ @Test
+ public void testForgetReSeedsBaselineFromStore() throws Exception {
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+ // Without forget, an immaterial sample is skipped.
+ assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1050)).get());
+ // After forget (unload + re-acquire), the baseline re-seeds from the
stored record:
+ // an immaterial sample is still skipped (so the merge window isn't
reset)...
+ reporter.forget(TOPIC, 0);
+ assertFalse(reporter.reportIfChanged(TOPIC, 0, stats(1050)).get());
+ // ...while a material one writes.
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(2000)).get());
+ }
+
+ @Test
+ public void testNewOwnerSeedsBaselineFromStore() throws Exception {
+ // Old owner writes a record.
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+ long modified = resources.getSegmentLoadAsync(TOPIC, 0).get()
+ .get().getStat().getModificationTimestamp();
+
+ // Ownership moves: a fresh reporter (empty lastWritten cache) samples
a rate within
+ // the materiality band of the STORED value. It must seed its baseline
from the store
+ // and skip the write — otherwise every rebalance would reset the
record's
+ // modification time and starve the controller's merge window.
+ SegmentLoadReporter newOwner = new SegmentLoadReporter(resources,
THRESHOLD);
+ assertFalse(newOwner.reportIfChanged(TOPIC, 0, stats(1100)).get());
+ assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get()
+ .get().getStat().getModificationTimestamp(), modified);
+
+ // A materially different sample still writes.
+ assertTrue(newOwner.reportIfChanged(TOPIC, 0, stats(2000)).get());
+ }
+
+ @Test
+ public void testIdenticalValueDoesNotBumpModificationTime() throws
Exception {
+ assertTrue(reporter.reportIfChanged(TOPIC, 0, stats(1000)).get());
+ long modified = resources.getSegmentLoadAsync(TOPIC, 0).get()
+ .get().getStat().getModificationTimestamp();
+
+ // Bit-identical re-report through the resources layer is a no-op
write.
+ resources.reportSegmentLoadAsync(TOPIC, 0, stats(1000)).get();
+ assertEquals(resources.getSegmentLoadAsync(TOPIC, 0).get()
+ .get().getStat().getModificationTimestamp(), modified);
+ }
+
+ @Test
+ public void testDeleteSegmentLoadToleratesMissing() throws Exception {
+ // No record yet — delete must not fail.
+ resources.deleteSegmentLoadAsync(TOPIC, 7).get();
+ reporter.reportIfChanged(TOPIC, 7, stats(100)).get();
+ resources.deleteSegmentLoadAsync(TOPIC, 7).get();
+ assertFalse(resources.getSegmentLoadAsync(TOPIC, 7).get().isPresent());
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java
new file mode 100644
index 00000000000..086d82fcce7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentLoadReporterTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the broker-side segment load reporter sweep
(PIP-483): after a
+ * scalable topic has live segment topics on the broker, the periodic sweep
(driven manually
+ * here for determinism) must write a {@link
org.apache.pulsar.common.scalable.SegmentLoadStats}
+ * record to the metadata store for each hosted segment, which is what the
controller's auto
+ * split/merge reads.
+ */
+public class V5SegmentLoadReporterTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSweepWritesSegmentLoadRecords() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<byte[]> producer = v5Client.newProducer(Schema.bytes())
+ .topic(topic)
+ .create();
+ // Produce across keys so both initial segments get a live segment
topic on the broker.
+ for (int i = 0; i < 50; i++) {
+ producer.newMessage().key("k-" + i).value(("v-" +
i).getBytes()).send();
+ }
+
+ ScalableTopicResources resources =
+ getPulsar().getPulsarResources().getScalableTopicResources();
+ TopicName parent = TopicName.get(topic);
+
+ // Drive the sweep directly rather than waiting for the 10s scheduled
tick.
+ getPulsar().getBrokerService().runSegmentLoadReportOnceForTest();
+
+ // Both initial segments (0 and 1) should now have a load record.
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+ getPulsar().getBrokerService().runSegmentLoadReportOnceForTest();
+ assertTrue(resources.getSegmentLoadAsync(parent,
0).get().isPresent(),
+ "segment 0 load record must be written by the sweep");
+ assertTrue(resources.getSegmentLoadAsync(parent,
1).get().isPresent(),
+ "segment 1 load record must be written by the sweep");
+ });
+ }
+
+ @Test
+ public void testNonSegmentTopicsProduceNoLoadRecords() throws Exception {
+ // A plain persistent topic must not produce any scalable-segment load
record.
+ @Cleanup
+ org.apache.pulsar.client.api.PulsarClient v4 =
+ org.apache.pulsar.client.api.PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl()).build();
+ String plain = "persistent://" + getNamespace() + "/plain-" +
System.nanoTime();
+ @Cleanup
+ org.apache.pulsar.client.api.Producer<byte[]> p =
+ v4.newProducer().topic(plain).create();
+ p.send("x".getBytes());
+
+ // Sweep must not throw on non-segment topics (and obviously writes
nothing for them).
+ getPulsar().getBrokerService().runSegmentLoadReportOnceForTest();
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java
new file mode 100644
index 00000000000..00701568a3e
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentLoadStats.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.scalable;
+
+/**
+ * Per-segment load sample for scalable-topic auto split/merge (PIP-483).
+ *
+ * <p>Written by the broker that owns a segment's {@code segment://} topic,
directly to the
+ * metadata store under {@code .../segments/{segmentId}/load}, and read by the
controller
+ * leader's auto-scaling evaluator. To keep write volume bounded, the owning
broker only
+ * rewrites this record when one of the rates changes by more than a
significant threshold
+ * (default ±25%) since the last write — see {@code SegmentLoadReporter}.
+ *
+ * <p>The record carries no timestamp of its own: the metadata store exposes
the record's
+ * last-modified time via its {@code Stat}, and the controller uses that for
the "cold for
+ * at least mergeWindow" check.
+ *
+ * @param msgRateIn inbound messages per second (60s rolling average, from
{@code TopicStats})
+ * @param bytesRateIn inbound bytes per second
+ * @param msgRateOut outbound (dispatched) messages per second, summed
across subscriptions
+ * @param bytesRateOut outbound bytes per second
+ */
+public record SegmentLoadStats(
+ double msgRateIn,
+ double bytesRateIn,
+ double msgRateOut,
+ double bytesRateOut
+) {
+ public static final SegmentLoadStats ZERO = new SegmentLoadStats(0, 0, 0,
0);
+}