This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0fda82bea79 [improve][broker] PIP-483: namespace + topic auto
split/merge policy overrides (#26008)
0fda82bea79 is described below
commit 0fda82bea79a4ce4abfec0aa80ca20f11cc386b7
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jun 12 13:44:32 2026 -0700
[improve][broker] PIP-483: namespace + topic auto split/merge policy
overrides (#26008)
---
.../broker/resources/ScalableTopicMetadata.java | 7 +
.../pulsar/broker/admin/impl/NamespacesBase.java | 32 +++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 97 +++++++++++++
.../pulsar/broker/admin/v2/ScalableTopics.java | 151 +++++++++++++++++++++
.../broker/service/scalable/AutoScaleConfig.java | 81 ++++++++++-
.../service/scalable/ScalableTopicController.java | 67 +++++++--
.../broker/service/scalable/SegmentLayout.java | 9 +-
.../admin/ScalableTopicAutoScalePolicyTest.java | 133 ++++++++++++++++++
.../service/scalable/AutoScaleConfigTest.java | 46 +++++++
.../ScalableTopicControllerAutoScaleTest.java | 124 +++++++++++++++++
.../broker/service/scalable/SegmentLayoutTest.java | 7 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 67 +++++++++
.../apache/pulsar/client/admin/ScalableTopics.java | 49 +++++++
.../policies/data/AutoScalePolicyOverride.java | 90 ++++++++++++
.../pulsar/common/policies/data/Policies.java | 5 +-
.../policies/data/ScalableTopicMetadata.java | 6 +
.../policies/data/ScalableTopicMetadataTest.java | 5 +-
.../client/admin/internal/NamespacesImpl.java | 40 ++++++
.../client/admin/internal/ScalableTopicsImpl.java | 38 ++++++
.../pulsar/common/policies/data/PolicyName.java | 1 +
20 files changed, 1035 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
index 8af85231651..228e2c25e65 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicMetadata.java
@@ -24,6 +24,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.scalable.SegmentInfo;
/**
@@ -51,4 +52,10 @@ public class ScalableTopicMetadata {
/** User-defined topic properties. */
@Builder.Default
private Map<String, String> properties = Map.of();
+
+ /**
+ * Per-topic auto split/merge policy override (PIP-483). {@code null}
means no override:
+ * the namespace policy and then the broker configuration apply.
+ */
+ private AutoScalePolicyOverride autoScalePolicy;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bff682e0d1b..911d3de2b04 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.scalable.AutoScaleConfig;
import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
import org.apache.pulsar.broker.web.RestException;
@@ -83,6 +84,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -1307,6 +1309,36 @@ public abstract class NamespacesBase extends
AdminResource {
}));
}
+ protected CompletableFuture<AutoScalePolicyOverride>
internalGetScalableTopicAutoScalePolicyAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.SCALABLE_TOPIC_AUTO_SCALE,
+ PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.scalableTopicAutoScalePolicy);
+ }
+
+ protected CompletableFuture<Void>
internalSetScalableTopicAutoScalePolicyAsync(
+ AutoScalePolicyOverride override) {
+ return validateNamespacePolicyOperationAsync(namespaceName,
+ PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> {
+ if (override != null) {
+ // The override only has to be valid in combination
with the broker
+ // defaults — resolve and let the invariant checks
reject bad values
+ // (e.g. zero split thresholds, split <= merge
hysteresis inversion).
+ try {
+ AutoScaleConfig.resolve(pulsar().getConfig(),
override, null);
+ } catch (IllegalArgumentException e) {
+ throw new
RestException(Status.PRECONDITION_FAILED, e.getMessage());
+ }
+ }
+ })
+ .thenCompose(__ ->
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
+ policies.scalableTopicAutoScalePolicy = override;
+ return policies;
+ }));
+ }
+
protected CompletableFuture<Void>
internalSetAutoSubscriptionCreationAsync(AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
// Force to read the data s.t. the watch to the cache content is setup.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 77c18f4169d..33f4eb76858 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -880,6 +881,102 @@ public class Namespaces extends NamespacesBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
+ @Operation(summary = "Get the scalable-topic auto split/merge policy
override for a namespace")
+ @ApiResponses(value = {
+ @ApiResponse(responseCode = "200",
+ description = "The scalable-topic auto split/merge policy
override for the namespace",
+ content = @Content(schema = @Schema(implementation =
AutoScalePolicyOverride.class))),
+ @ApiResponse(responseCode = "204", description = "No override is
set on this namespace"),
+ @ApiResponse(responseCode = "403", description = "Don't have admin
permission"),
+ @ApiResponse(responseCode = "404", description = "Tenant or
namespace doesn't exist")})
+ public void getScalableTopicAutoScalePolicy(@Suspended AsyncResponse
asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalGetScalableTopicAutoScalePolicyAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error()
+ .attr("namespace", namespaceName)
+ .exception(ex)
+ .log("Failed to get scalableTopicAutoScalePolicy
for namespace");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
+ @Operation(summary = "Override the broker's scalable-topic auto
split/merge settings for a namespace")
+ @ApiResponses(value = {
+ @ApiResponse(responseCode = "204", description = "Operation
successful"),
+ @ApiResponse(responseCode = "403", description = "Don't have admin
permission"),
+ @ApiResponse(responseCode = "404", description = "Tenant or
cluster or namespace doesn't exist"),
+ @ApiResponse(responseCode = "412",
+ description = "The resolved auto split/merge policy
violates an invariant")})
+ public void setScalableTopicAutoScalePolicy(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
+ @RequestBody(description = "Auto split/merge policy override",
required = true)
+ AutoScalePolicyOverride override) {
+ validateNamespaceName(tenant, namespace);
+ internalSetScalableTopicAutoScalePolicyAsync(override)
+ .thenAccept(__ -> {
+ log.info()
+ .attr("namespace", namespaceName)
+ .log("Successfully set
scalableTopicAutoScalePolicy on namespace");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error()
+ .attr("namespace", namespaceName)
+ .exception(ex)
+ .log("Failed to set scalableTopicAutoScalePolicy
on namespace");
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/scalableTopicAutoScalePolicy")
+ @Operation(summary = "Remove the scalable-topic auto split/merge policy
override from a namespace")
+ @ApiResponses(value = {
+ @ApiResponse(responseCode = "204", description = "Operation
successful"),
+ @ApiResponse(responseCode = "403", description = "Don't have admin
permission"),
+ @ApiResponse(responseCode = "404", description = "Tenant or
cluster or namespace doesn't exist") })
+ public void removeScalableTopicAutoScalePolicy(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace")
String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetScalableTopicAutoScalePolicyAsync(null)
+ .thenAccept(__ -> {
+ log.info()
+ .attr("namespace", namespaceName)
+ .log("Successfully removed
scalableTopicAutoScalePolicy on namespace");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ log.error()
+ .attr("namespace", namespaceName)
+ .exception(ex)
+ .log("Failed to remove
scalableTopicAutoScalePolicy on namespace");
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
+ }
+
@POST
@Path("/{tenant}/{namespace}/autoSubscriptionCreation")
@Operation(summary = "Override broker's allowAutoSubscriptionCreation
setting for a namespace")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 13afb2abe2e..5a365ffa523 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -55,13 +55,17 @@ import lombok.CustomLog;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.scalable.AutoScaleConfig;
import org.apache.pulsar.broker.service.scalable.ScalableTopicController;
import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.scalable.ScalableTopicConstants;
@@ -477,6 +481,153 @@ public class ScalableTopics extends AdminResource {
});
}
+ // --- Auto split/merge policy override (PIP-483) ---
+
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy")
+ @Operation(summary = "Get the per-topic auto split/merge policy override.")
+ @ApiResponses(value = {
+ @ApiResponse(responseCode = "200", description = "The per-topic
auto split/merge policy override.",
+ content = @Content(schema = @Schema(implementation =
AutoScalePolicyOverride.class))),
+ @ApiResponse(responseCode = "204", description = "No override is
set on this topic"),
+ @ApiResponse(responseCode = "401",
+ description = "Don't have permission to administrate
resources on this tenant"),
+ @ApiResponse(responseCode = "403", description = "Don't have admin
permission on the namespace"),
+ @ApiResponse(responseCode = "404", description = "Scalable topic
doesn't exist"),
+ @ApiResponse(responseCode = "500", description = "Internal server
error")})
+ public void getAutoScalePolicy(
+ @Suspended final AsyncResponse asyncResponse,
+ @Parameter(description = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @Parameter(description = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @Parameter(description = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateNamespaceName(tenant, namespace);
+ TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
+
+ validateTopicPolicyOperationAsync(tn,
PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.READ)
+ .thenCompose(__ ->
resources().getScalableTopicMetadataAsync(tn))
+ .thenAccept(optMd -> {
+ if (optMd.isEmpty()) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Scalable topic not found: " + tn));
+ } else {
+ asyncResponse.resume(optMd.get().getAutoScalePolicy());
+ }
+ })
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId",
clientAppId()).attr("topic", tn)
+ .exception(ex).log("Failed to get autoScalePolicy
for scalable topic");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy")
+ @Operation(summary = "Set the per-topic auto split/merge policy override.")
+ @ApiResponses(value = {
+ @ApiResponse(responseCode = "204", description = "Override set
successfully"),
+ @ApiResponse(responseCode = "401",
+ description = "Don't have permission to administrate
resources on this tenant"),
+ @ApiResponse(responseCode = "403", description = "Don't have admin
permission on the namespace"),
+ @ApiResponse(responseCode = "404", description = "Scalable topic
doesn't exist"),
+ @ApiResponse(responseCode = "412",
+ description = "The resolved auto split/merge policy
violates an invariant"),
+ @ApiResponse(responseCode = "500", description = "Internal server
error")})
+ public void setAutoScalePolicy(
+ @Suspended final AsyncResponse asyncResponse,
+ @Parameter(description = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @Parameter(description = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @Parameter(description = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @RequestBody(description = "Auto split/merge policy override",
required = true)
+ AutoScalePolicyOverride override) {
+ validateNamespaceName(tenant, namespace);
+ TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
+ internalSetAutoScalePolicy(asyncResponse, tn, override);
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/autoScalePolicy")
+ @Operation(summary = "Remove the per-topic auto split/merge policy
override.")
+ @ApiResponses(value = {
+ @ApiResponse(responseCode = "204", description = "Override removed
successfully"),
+ @ApiResponse(responseCode = "401",
+ description = "Don't have permission to administrate
resources on this tenant"),
+ @ApiResponse(responseCode = "403", description = "Don't have admin
permission on the namespace"),
+ @ApiResponse(responseCode = "404", description = "Scalable topic
doesn't exist"),
+ @ApiResponse(responseCode = "500", description = "Internal server
error")})
+ public void removeAutoScalePolicy(
+ @Suspended final AsyncResponse asyncResponse,
+ @Parameter(description = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @Parameter(description = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @Parameter(description = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateNamespaceName(tenant, namespace);
+ TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
+ internalSetAutoScalePolicy(asyncResponse, tn, null);
+ }
+
+ private void internalSetAutoScalePolicy(AsyncResponse asyncResponse,
TopicName tn,
+ AutoScalePolicyOverride override) {
+ validateTopicPolicyOperationAsync(tn,
PolicyName.SCALABLE_TOPIC_AUTO_SCALE, PolicyOperation.WRITE)
+ .thenCompose(__ -> {
+ if (override == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // Validate the override in combination with the layers it
will actually
+ // be resolved with: the broker defaults AND the current
namespace
+ // override — two layers that are each valid against the
defaults can
+ // still combine into an invalid policy (e.g. the
namespace raises a
+ // merge threshold and the topic lowers the matching split
threshold).
+ // This check is best-effort: the namespace override can
still change
+ // afterwards, and broker defaults can differ across
restarts — the
+ // controller handles a combination that has become
invalid by falling
+ // back to disabled (see
ScalableTopicController.resolveAutoScaleConfig).
+ return
pulsar().getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(namespaceName)
+ .thenAccept(optPolicies -> {
+ AutoScalePolicyOverride nsOverride =
optPolicies
+ .map(p ->
p.scalableTopicAutoScalePolicy)
+ .orElse(null);
+ try {
+
AutoScaleConfig.resolve(pulsar().getConfig(), nsOverride, override);
+ } catch (IllegalArgumentException e) {
+ throw new
RestException(Response.Status.PRECONDITION_FAILED,
+ e.getMessage());
+ }
+ });
+ })
+ .thenCompose(__ -> resources().updateScalableTopicAsync(tn, md
-> {
+ md.setAutoScalePolicy(override);
+ return md;
+ }))
+ .thenAccept(__ -> {
+ log.info().attr("clientAppId",
clientAppId()).attr("topic", tn)
+ .attr("removed", override == null)
+ .log("Updated autoScalePolicy on scalable topic");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(e -> {
+ Throwable ex = FutureUtil.unwrapCompletionException(e);
+ if (ex instanceof
MetadataStoreException.NotFoundException) {
+ asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
+ "Scalable topic not found: " + tn));
+ return null;
+ }
+ log.error().attr("clientAppId",
clientAppId()).attr("topic", tn)
+ .exception(ex).log("Failed to update
autoScalePolicy for scalable topic");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
// --- Delete ---
@DELETE
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
index b739b102a76..848b59cfa14 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfig.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.scalable;
import java.time.Duration;
import lombok.Builder;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
/**
* Fully-resolved auto split/merge policy for a single scalable topic
(PIP-483).
@@ -81,6 +82,30 @@ public record AutoScaleConfig(
* @return the resolved policy reflecting the {@code scalableTopic*}
settings
*/
public static AutoScaleConfig fromBrokerConfig(ServiceConfiguration conf) {
+ return brokerDefaults(conf).validated();
+ }
+
+ /**
+ * Resolve the effective policy for a topic: broker defaults, overlaid
with the namespace
+ * override, overlaid with the topic override (most-specific wins per
field; {@code null}
+ * override fields fall through).
+ *
+ * @param conf the broker configuration (cluster-wide
defaults)
+ * @param namespaceOverride the namespace-level override, or {@code null}
if none
+ * @param topicOverride the topic-level override, or {@code null} if
none
+ * @return the validated effective policy
+ * @throws IllegalArgumentException if the resolved policy violates an
invariant
+ */
+ public static AutoScaleConfig resolve(ServiceConfiguration conf,
+ AutoScalePolicyOverride
namespaceOverride,
+ AutoScalePolicyOverride
topicOverride) {
+ AutoScaleConfig config = brokerDefaults(conf);
+ config = applyOverride(config, namespaceOverride);
+ config = applyOverride(config, topicOverride);
+ return config.validated();
+ }
+
+ private static AutoScaleConfig brokerDefaults(ServiceConfiguration conf) {
return AutoScaleConfig.builder()
.enabled(conf.isScalableTopicAutoScaleEnabled())
.maxSegments(conf.getScalableTopicMaxSegments())
@@ -97,8 +122,60 @@ public record AutoScaleConfig(
.mergeBytesRateIn(conf.getScalableTopicMergeBytesRateInThreshold())
.mergeMsgRateOut(conf.getScalableTopicMergeMsgRateOutThreshold())
.mergeBytesRateOut(conf.getScalableTopicMergeBytesRateOutThreshold())
- .build()
- .validated();
+ .build();
+ }
+
+ private static AutoScaleConfig applyOverride(AutoScaleConfig base,
AutoScalePolicyOverride o) {
+ if (o == null) {
+ return base;
+ }
+ AutoScaleConfigBuilder b = base.toBuilder();
+ if (o.getEnabled() != null) {
+ b.enabled(o.getEnabled());
+ }
+ if (o.getMaxSegments() != null) {
+ b.maxSegments(o.getMaxSegments());
+ }
+ if (o.getMinSegments() != null) {
+ b.minSegments(o.getMinSegments());
+ }
+ if (o.getMaxDagDepth() != null) {
+ b.maxDagDepth(o.getMaxDagDepth());
+ }
+ if (o.getSplitCooldownSeconds() != null) {
+ b.splitCooldown(Duration.ofSeconds(o.getSplitCooldownSeconds()));
+ }
+ if (o.getMergeCooldownSeconds() != null) {
+ b.mergeCooldown(Duration.ofSeconds(o.getMergeCooldownSeconds()));
+ }
+ if (o.getMergeWindowSeconds() != null) {
+ b.mergeWindow(Duration.ofSeconds(o.getMergeWindowSeconds()));
+ }
+ if (o.getSplitMsgRateInThreshold() != null) {
+ b.splitMsgRateIn(o.getSplitMsgRateInThreshold());
+ }
+ if (o.getSplitBytesRateInThreshold() != null) {
+ b.splitBytesRateIn(o.getSplitBytesRateInThreshold());
+ }
+ if (o.getSplitMsgRateOutThreshold() != null) {
+ b.splitMsgRateOut(o.getSplitMsgRateOutThreshold());
+ }
+ if (o.getSplitBytesRateOutThreshold() != null) {
+ b.splitBytesRateOut(o.getSplitBytesRateOutThreshold());
+ }
+ if (o.getMergeMsgRateInThreshold() != null) {
+ b.mergeMsgRateIn(o.getMergeMsgRateInThreshold());
+ }
+ if (o.getMergeBytesRateInThreshold() != null) {
+ b.mergeBytesRateIn(o.getMergeBytesRateInThreshold());
+ }
+ if (o.getMergeMsgRateOutThreshold() != null) {
+ b.mergeMsgRateOut(o.getMergeMsgRateOutThreshold());
+ }
+ if (o.getMergeBytesRateOutThreshold() != null) {
+ b.mergeBytesRateOut(o.getMergeBytesRateOutThreshold());
+ }
+ return b.build();
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 0b8c70d66b1..a85187df28c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.scalable.HashRange;
@@ -359,10 +360,6 @@ public class ScalableTopicController {
if (brokerConfig == null) {
return CompletableFuture.completedFuture(null);
}
- AutoScaleConfig config =
AutoScaleConfig.fromBrokerConfig(brokerConfig);
- if (!config.enabled()) {
- return CompletableFuture.completedFuture(null);
- }
if (!autoScaleInFlight.compareAndSet(false, true)) {
// Another evaluation or auto operation is already running. Don't
drop the
// trigger: mark it pending so the in-flight run re-evaluates on
completion —
@@ -372,11 +369,18 @@ public class ScalableTopicController {
return CompletableFuture.completedFuture(null);
}
try {
- return collectConsumerCounts()
- .thenCombine(collectLoadSamples(), (consumers, load) ->
- AutoScalePolicyEvaluator.decide(currentLayout,
load, consumers, config,
- clock.millis(), lastSplitAtMs,
lastMergeAtMs))
- .thenCompose(decision -> dispatch(decision, config,
trigger))
+ return resolveAutoScaleConfig(brokerConfig)
+ .thenCompose(config -> {
+ if (!config.enabled()) {
+ return
CompletableFuture.<Void>completedFuture(null);
+ }
+ return collectConsumerCounts()
+ .thenCombine(collectLoadSamples(), (consumers,
load) ->
+
AutoScalePolicyEvaluator.decide(currentLayout, load,
+ consumers, config,
clock.millis(),
+ lastSplitAtMs, lastMergeAtMs))
+ .thenCompose(decision -> dispatch(decision,
config, trigger));
+ })
.whenComplete((__, ex) -> {
autoScaleInFlight.set(false);
if (autoScaleReEvaluate.getAndSet(false)) {
@@ -393,6 +397,45 @@ public class ScalableTopicController {
}
}
+ /**
+ * Resolve the effective auto split/merge policy for this topic: broker
defaults overlaid
+ * with the namespace-level override ({@code
Policies.scalableTopicAutoScalePolicy}) and
+ * then the per-topic override ({@code
ScalableTopicMetadata.autoScalePolicy}). Both reads
+ * are metadata-cache-backed, so this is cheap per evaluation and override
changes take
+ * effect on the next tick without controller restarts.
+ *
+ * <p>Set-time validation is best-effort only (the namespace override can
change after a
+ * topic override was validated against it, and broker defaults can change
across
+ * restarts), so the stored combination can be invalid here. In that case
auto split/merge
+ * is treated as <b>disabled</b> for the topic — predictable, and loudly
logged on every
+ * evaluation until an operator fixes the overrides — rather than failing
the evaluation
+ * chain.
+ */
+ private CompletableFuture<AutoScaleConfig> resolveAutoScaleConfig(
+ ServiceConfiguration brokerConfig) {
+ CompletableFuture<AutoScalePolicyOverride> namespaceOverride =
+
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenApply(opt -> opt.map(p ->
p.scalableTopicAutoScalePolicy)
+ .orElse(null));
+ CompletableFuture<AutoScalePolicyOverride> topicOverride =
+ resources.getScalableTopicMetadataAsync(topicName)
+ .thenApply(opt ->
opt.map(ScalableTopicMetadata::getAutoScalePolicy)
+ .orElse(null));
+ return namespaceOverride.thenCombine(topicOverride, (ns, topic) -> {
+ try {
+ return AutoScaleConfig.resolve(brokerConfig, ns, topic);
+ } catch (IllegalArgumentException e) {
+ log.warn().attr("reason", e.getMessage())
+ .log("Resolved auto split/merge policy is invalid;
treating auto "
+ + "split/merge as disabled for this topic
until the namespace "
+ + "or topic override is fixed");
+ return AutoScaleConfig.fromBrokerConfig(brokerConfig)
+ .toBuilder().enabled(false).build();
+ }
+ });
+ }
+
private CompletableFuture<Void> dispatch(AutoScaleDecision decision,
AutoScaleConfig config,
String trigger) {
if (decision instanceof AutoScaleDecision.Split split) {
@@ -657,7 +700,7 @@ public class ScalableTopicController {
.thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md
-> {
SegmentLayout latest = SegmentLayout.fromMetadata(md);
SegmentLayout updated = latest.splitSegment(segmentId, nowMs);
- return updated.toMetadata(md.getProperties());
+ return updated.toMetadata(md);
}))
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
.thenCompose(optMd -> {
@@ -706,7 +749,7 @@ public class ScalableTopicController {
.thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md
-> {
SegmentLayout latest = SegmentLayout.fromMetadata(md);
SegmentLayout updated = latest.mergeSegments(segmentId1,
segmentId2, nowMs);
- return updated.toMetadata(md.getProperties());
+ return updated.toMetadata(md);
}))
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
.thenCompose(optMd -> {
@@ -1192,7 +1235,7 @@ public class ScalableTopicController {
updated = updated.pruneSegment(s.segmentId());
}
}
- return updated == latest ? md :
updated.toMetadata(md.getProperties());
+ return updated == latest ? md : updated.toMetadata(md);
}).thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
.thenCompose(optMd -> {
currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
index a3f1f0c5ef1..c823fb75e91 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -276,14 +276,17 @@ public class SegmentLayout {
}
/**
- * Convert back to metadata for persistence.
+ * Convert back to metadata for persistence, carrying over the non-layout
fields
+ * (properties, per-topic auto-scale policy) from the record being
replaced. Layout
+ * mutations must never silently drop fields they don't model.
*/
- public ScalableTopicMetadata toMetadata(Map<String, String> properties) {
+ public ScalableTopicMetadata toMetadata(ScalableTopicMetadata original) {
return ScalableTopicMetadata.builder()
.epoch(epoch)
.nextSegmentId(nextSegmentId)
.segments(new LinkedHashMap<>(allSegments))
- .properties(properties)
+ .properties(original.getProperties())
+ .autoScalePolicy(original.getAutoScalePolicy())
.build();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java
new file mode 100644
index 00000000000..85c65f9b9ec
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicAutoScalePolicyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the auto split/merge policy override admin API
(PIP-483), at both
+ * levels, through the full HTTP path against a real shared broker.
+ *
+ * <p>Endpoints under test:
+ *
+ * <ul>
+ * <li>namespace: {@code
/admin/v2/namespaces/{tenant}/{ns}/scalableTopicAutoScalePolicy}</li>
+ * <li>topic: {@code
/admin/v2/scalable/{tenant}/{ns}/{topic}/autoScalePolicy}</li>
+ * </ul>
+ */
+public class ScalableTopicAutoScalePolicyTest extends SharedPulsarBaseTest {
+
+ private String newScalableTopic() throws Exception {
+ String topic = "topic://" + getNamespace() + "/autoscale-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic(topic, 1);
+ return topic;
+ }
+
+ @Test
+ public void testTopicLevelRoundTrip() throws Exception {
+ String topic = newScalableTopic();
+
+ // No override initially.
+ assertNull(admin.scalableTopics().getAutoScalePolicy(topic));
+
+ AutoScalePolicyOverride override = AutoScalePolicyOverride.builder()
+ .enabled(true)
+ .maxSegments(8)
+ .splitMsgRateInThreshold(5_000.0)
+ .build();
+ admin.scalableTopics().setAutoScalePolicy(topic, override);
+ assertEquals(admin.scalableTopics().getAutoScalePolicy(topic),
override);
+
+ admin.scalableTopics().removeAutoScalePolicy(topic);
+ assertNull(admin.scalableTopics().getAutoScalePolicy(topic));
+ }
+
+ @Test
+ public void testNamespaceLevelRoundTrip() throws Exception {
+ String namespace = getNamespace();
+
+
assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace));
+
+ AutoScalePolicyOverride override = AutoScalePolicyOverride.builder()
+ .enabled(false)
+ .build();
+ admin.namespaces().setScalableTopicAutoScalePolicy(namespace,
override);
+
assertEquals(admin.namespaces().getScalableTopicAutoScalePolicy(namespace),
override);
+
+ admin.namespaces().removeScalableTopicAutoScalePolicy(namespace);
+
assertNull(admin.namespaces().getScalableTopicAutoScalePolicy(namespace));
+ }
+
+ @Test
+ public void testInvalidOverrideRejected() throws Exception {
+ String topic = newScalableTopic();
+
+ // minSegments above the broker-default maxSegments breaks min <= max
on resolution.
+ AutoScalePolicyOverride bad = AutoScalePolicyOverride.builder()
+ .minSegments(1_000_000)
+ .build();
+ assertThrows(PulsarAdminException.PreconditionFailedException.class,
+ () -> admin.scalableTopics().setAutoScalePolicy(topic, bad));
+ assertThrows(PulsarAdminException.PreconditionFailedException.class,
+ () ->
admin.namespaces().setScalableTopicAutoScalePolicy(getNamespace(), bad));
+ }
+
+ @Test
+ public void testTopicOverrideValidatedAgainstNamespaceOverride() throws
Exception {
+ // Each layer is valid against the broker defaults, but combined they
invert the
+ // hysteresis invariant: ns raises mergeMsgRateIn to 5000, topic lowers
+ // splitMsgRateIn to 2000 → split <= merge. The topic-level set must
see the
+ // current namespace override and reject with 412.
+ String namespace = getNamespace();
+ String topic = newScalableTopic();
+ try {
+ admin.namespaces().setScalableTopicAutoScalePolicy(namespace,
+
AutoScalePolicyOverride.builder().mergeMsgRateInThreshold(5_000.0).build());
+
+
assertThrows(PulsarAdminException.PreconditionFailedException.class,
+ () -> admin.scalableTopics().setAutoScalePolicy(topic,
+ AutoScalePolicyOverride.builder()
+
.splitMsgRateInThreshold(2_000.0).build()));
+
+ // The same topic override is accepted once the conflicting
namespace layer
+ // is gone.
+ admin.namespaces().removeScalableTopicAutoScalePolicy(namespace);
+ admin.scalableTopics().setAutoScalePolicy(topic,
+
AutoScalePolicyOverride.builder().splitMsgRateInThreshold(2_000.0).build());
+ } finally {
+ admin.namespaces().removeScalableTopicAutoScalePolicy(namespace);
+ }
+ }
+
+ @Test
+ public void testTopicLevelOnMissingTopicIs404() {
+ String missing = "topic://" + getNamespace() + "/does-not-exist";
+ assertThrows(PulsarAdminException.NotFoundException.class,
+ () -> admin.scalableTopics().setAutoScalePolicy(missing,
+
AutoScalePolicyOverride.builder().enabled(false).build()));
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
index acc104c2773..7a991834461 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/AutoScaleConfigTest.java
@@ -55,6 +55,52 @@ public class AutoScaleConfigTest {
assertTrue(c.splitBytesRateOut() > c.mergeBytesRateOut());
}
+ @Test
+ public void testResolveLayersOverrides() {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ org.apache.pulsar.common.policies.data.AutoScalePolicyOverride ns =
+
org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder()
+ .maxSegments(16)
+ .splitMsgRateInThreshold(20_000.0)
+ .build();
+ org.apache.pulsar.common.policies.data.AutoScalePolicyOverride topic =
+
org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder()
+ .maxSegments(4)
+ .splitCooldownSeconds(5L)
+ .build();
+
+ AutoScaleConfig c = AutoScaleConfig.resolve(conf, ns, topic);
+ // Topic wins where both set.
+ assertEquals(c.maxSegments(), 4);
+ // Namespace applies where the topic is silent.
+ assertEquals(c.splitMsgRateIn(), 20_000.0);
+ // Topic-only field applies.
+ assertEquals(c.splitCooldown(), Duration.ofSeconds(5));
+ // Untouched fields fall through to the broker defaults.
+ assertEquals(c.mergeCooldown(), Duration.ofMinutes(5));
+ assertTrue(c.enabled());
+ }
+
+ @Test
+ public void testResolveNullOverridesEqualsBrokerConfig() {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ assertEquals(AutoScaleConfig.resolve(conf, null, null),
+ AutoScaleConfig.fromBrokerConfig(conf));
+ }
+
+ @Test
+ public void testResolveRejectsInvalidCombination() {
+ // The override is only invalid in combination: a merge threshold
raised above the
+ // broker-default split threshold breaks the hysteresis invariant.
+ ServiceConfiguration conf = new ServiceConfiguration();
+ org.apache.pulsar.common.policies.data.AutoScalePolicyOverride bad =
+
org.apache.pulsar.common.policies.data.AutoScalePolicyOverride.builder()
+
.mergeMsgRateInThreshold(conf.getScalableTopicSplitMsgRateInThreshold())
+ .build();
+ assertThrows(IllegalArgumentException.class,
+ () -> AutoScaleConfig.resolve(conf, null, bad));
+ }
+
@Test
public void testValidationRejectsBadConfig() {
// Zero split threshold: the evaluator scores rate/threshold, so 0
would yield
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
index 74ad5e8a5a3..6b1fc907696 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerAutoScaleTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.TransportCnx;
@@ -39,7 +41,10 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ScalableTopics;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.scalable.SegmentLoadStats;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -70,6 +75,10 @@ public class ScalableTopicControllerAutoScaleTest {
private PulsarService pulsar;
private ServiceConfiguration config;
private ScalableTopics scalableTopics;
+ private PulsarResources pulsarResources;
+ private NamespaceResources namespaceResources;
+ /** Namespace policies served by the mocked resources; null = namespace
has no policies. */
+ private Policies namespacePolicies;
private ScalableTopicController controller;
private TopicName topicName;
@@ -104,6 +113,19 @@ public class ScalableTopicControllerAutoScaleTest {
when(pulsar.getBrokerId()).thenReturn(BROKER_ID);
when(pulsar.getExecutor()).thenReturn(scheduler);
when(pulsar.getConfig()).thenReturn(config);
+
+ // Namespace policies feed the per-namespace auto-scale override
resolution.
+ // Default: no policies → broker config applies. Tests install
overrides via
+ // namespacePolicies. Reset explicitly — TestNG reuses the test
instance.
+ namespacePolicies = null;
+ pulsarResources = mock(PulsarResources.class);
+ namespaceResources = mock(NamespaceResources.class);
+ when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
+
when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources);
+ when(namespaceResources.getPoliciesAsync(any(NamespaceName.class)))
+ .thenAnswer(__ -> CompletableFuture.completedFuture(
+ Optional.ofNullable(namespacePolicies)));
+
when(pulsar.getAdminClient()).thenReturn(admin);
when(admin.topics()).thenReturn(topics);
when(admin.scalableTopics()).thenReturn(scalableTopics);
@@ -205,6 +227,108 @@ public class ScalableTopicControllerAutoScaleTest {
"2 consumers on 1 segment should drive a split"));
}
+ @Test
+ public void testNamespaceOverrideDisablesAutoScale() throws Exception {
+ namespacePolicies = new Policies();
+ namespacePolicies.scalableTopicAutoScalePolicy =
+ AutoScalePolicyOverride.builder().enabled(false).build();
+
+ startController(2);
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 2,
+ "namespace override enabled=false must suppress the split");
+ }
+
+ @Test
+ public void testTopicOverrideWinsOverNamespace() throws Exception {
+ // Namespace disables auto-scale; the topic explicitly re-enables it.
+ namespacePolicies = new Policies();
+ namespacePolicies.scalableTopicAutoScalePolicy =
+ AutoScalePolicyOverride.builder().enabled(false).build();
+
+ startController(2);
+ resources.updateScalableTopicAsync(topicName, md -> {
+
md.setAutoScalePolicy(AutoScalePolicyOverride.builder().enabled(true).build());
+ return md;
+ }).get();
+
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3,
+ "topic override enabled=true must win over the namespace
disable");
+ }
+
+ @Test
+ public void testTopicOverrideMaxSegmentsCapsSplit() throws Exception {
+ startController(2);
+ resources.updateScalableTopicAsync(topicName, md -> {
+
md.setAutoScalePolicy(AutoScalePolicyOverride.builder().maxSegments(2).build());
+ return md;
+ }).get();
+
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 2,
+ "per-topic maxSegments=2 must cap the split despite the hot
segment");
+ }
+
+ @Test
+ public void testTopicOverrideSurvivesSplit() throws Exception {
+ // The override must survive a layout mutation (toMetadata must carry
it over).
+ startController(2);
+ AutoScalePolicyOverride override =
+ AutoScalePolicyOverride.builder().maxSegments(3).build();
+ resources.updateScalableTopicAsync(topicName, md -> {
+ md.setAutoScalePolicy(override);
+ return md;
+ }).get();
+
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "first split allowed up to
maxSegments=3");
+ assertEquals(resources.getScalableTopicMetadataAsync(topicName).get()
+ .orElseThrow().getAutoScalePolicy(), override,
+ "the per-topic override must survive the split's metadata
rewrite");
+
+ // At the cap now — further hot reports must not split.
+ resources.reportSegmentLoadAsync(topicName, 1,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 3, "capped at the per-topic
maxSegments");
+ }
+
+ @Test
+ public void testInvalidOverrideCombinationFallsBackToDisabled() throws
Exception {
+ // Namespace and topic overrides that are each valid against the
broker defaults but
+ // invalid combined: the namespace raises the merge threshold, the
topic lowers the
+ // matching split threshold below it (hysteresis inversion). The
controller must not
+ // fail the evaluation chain — it treats auto split/merge as disabled
until fixed.
+ namespacePolicies = new Policies();
+ namespacePolicies.scalableTopicAutoScalePolicy =
AutoScalePolicyOverride.builder()
+ .mergeMsgRateInThreshold(5_000.0)
+ .build();
+
+ startController(2);
+ resources.updateScalableTopicAsync(topicName, md -> {
+ md.setAutoScalePolicy(AutoScalePolicyOverride.builder()
+ .splitMsgRateInThreshold(2_000.0)
+ .build());
+ return md;
+ }).get();
+
+ resources.reportSegmentLoadAsync(topicName, 0,
+ new SegmentLoadStats(20_000, 0, 0, 0)).get();
+ // Must complete normally (no IllegalArgumentException) and take no
action.
+ controller.evaluateAutoScaleForTest().get();
+ assertEquals(activeSegmentCount(), 2,
+ "invalid override combination must disable auto split/merge,
not split");
+ }
+
@Test
public void testConsumerBurstConvergesWithoutTicks() throws Exception {
// A group of consumers joining in quick succession must converge to
one segment
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
index 364c9299a11..afcab4c8b98 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -269,14 +269,19 @@ public class SegmentLayoutTest {
@Test
public void testToMetadata() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of("key", "value"));
+ // A layout mutation must round-trip every non-layout field, not just
properties.
+
metadata.setAutoScalePolicy(org.apache.pulsar.common.policies.data.AutoScalePolicyOverride
+ .builder().enabled(false).maxSegments(8).build());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
SegmentLayout afterSplit = layout.splitSegment(0, 0L);
- ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key",
"value"));
+ ScalableTopicMetadata restored = afterSplit.toMetadata(metadata);
assertEquals(restored.getEpoch(), 1);
assertEquals(restored.getNextSegmentId(), 4);
assertEquals(restored.getSegments().size(), 4);
assertEquals(restored.getProperties().get("key"), "value");
+ assertEquals(restored.getAutoScalePolicy(),
metadata.getAutoScalePolicy(),
+ "split/merge must not drop the per-topic auto-scale policy");
}
@Test
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index cc3f5a92964..b1063e8b542 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -29,6 +29,7 @@ import
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -1366,6 +1367,72 @@ public interface Namespaces {
*/
CompletableFuture<Void> removeAutoTopicCreationAsync(String namespace);
+ /**
+ * Sets the scalable-topic auto split/merge policy override for a
namespace (PIP-483),
+ * overriding the broker's defaults for every scalable topic in the
namespace that does
+ * not carry its own per-topic override.
+ *
+ * @param namespace
+ * Namespace name
+ * @param override
+ * the override; unset fields fall through to the broker
configuration
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setScalableTopicAutoScalePolicy(String namespace,
AutoScalePolicyOverride override)
+ throws PulsarAdminException;
+
+ /**
+ * Sets the scalable-topic auto split/merge policy override for a
namespace asynchronously.
+ *
+ * @param namespace
+ * Namespace name
+ * @param override
+ * the override; unset fields fall through to the broker
configuration
+ */
+ CompletableFuture<Void> setScalableTopicAutoScalePolicyAsync(
+ String namespace, AutoScalePolicyOverride override);
+
+ /**
+ * Get the scalable-topic auto split/merge policy override for a namespace.
+ *
+ * @param namespace
+ * Namespace name
+ * @return the override, or {@code null} if none is set
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace)
throws PulsarAdminException;
+
+ /**
+ * Get the scalable-topic auto split/merge policy override for a namespace
asynchronously.
+ *
+ * @param namespace
+ * Namespace name
+ * @return the override, or {@code null} if none is set
+ */
+ CompletableFuture<AutoScalePolicyOverride>
getScalableTopicAutoScalePolicyAsync(String namespace);
+
+ /**
+ * Removes the scalable-topic auto split/merge policy override from a
namespace, letting
+ * the broker configuration apply.
+ *
+ * @param namespace
+ * Namespace name
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeScalableTopicAutoScalePolicy(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Removes the scalable-topic auto split/merge policy override from a
namespace
+ * asynchronously.
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> removeScalableTopicAutoScalePolicyAsync(String
namespace);
+
/**
* Sets the autoSubscriptionCreation policy for a given namespace,
overriding broker settings.
* <p/>
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index debf54231b9..b2f7632951e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -145,6 +146,54 @@ public interface ScalableTopics {
*/
CompletableFuture<ScalableTopicMetadata> getMetadataAsync(String topic);
+ /**
+ * Set the per-topic auto split/merge policy override (PIP-483). Overrides
the namespace
+ * policy and the broker defaults for this topic; unset fields fall
through.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ * @param override the override to apply
+ */
+ void setAutoScalePolicy(String topic, AutoScalePolicyOverride override)
throws PulsarAdminException;
+
+ /**
+ * Set the per-topic auto split/merge policy override asynchronously.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ * @param override the override to apply
+ */
+ CompletableFuture<Void> setAutoScalePolicyAsync(String topic,
AutoScalePolicyOverride override);
+
+ /**
+ * Get the per-topic auto split/merge policy override.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ * @return the override, or {@code null} if none is set
+ */
+ AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
PulsarAdminException;
+
+ /**
+ * Get the per-topic auto split/merge policy override asynchronously.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ * @return the override, or {@code null} if none is set
+ */
+ CompletableFuture<AutoScalePolicyOverride> getAutoScalePolicyAsync(String
topic);
+
+ /**
+ * Remove the per-topic auto split/merge policy override, letting the
namespace policy
+ * and broker defaults apply.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ */
+ void removeAutoScalePolicy(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove the per-topic auto split/merge policy override asynchronously.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ */
+ CompletableFuture<Void> removeAutoScalePolicyAsync(String topic);
+
/**
* Delete a scalable topic and all its underlying segment topics.
*
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java
new file mode 100644
index 00000000000..30ecd78e5a7
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoScalePolicyOverride.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Override of the scalable-topic auto split/merge policy (PIP-483), settable
at the
+ * namespace level (field on {@link Policies}) and at the topic level (field
on the
+ * scalable-topic metadata).
+ *
+ * <p>Every field is optional: an unset ({@code null}) field falls through to
the next
+ * resolution layer — topic override → namespace override → broker
configuration. Setting
+ * {@code enabled = false} opts the namespace or topic out of auto split/merge
entirely.
+ *
+ * <p>The resolved policy must satisfy the same invariants as the broker
configuration
+ * (positive split thresholds, split thresholds strictly above merge
thresholds,
+ * {@code minSegments <= maxSegments}, non-negative cooldowns); an override
that would
+ * violate them is rejected when it is set.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public final class AutoScalePolicyOverride {
+
+ /** Master switch; {@code false} opts this namespace/topic out of auto
split/merge. */
+ private Boolean enabled;
+
+ /** Hard ceiling on active segments; splits stop once reached. */
+ private Integer maxSegments;
+
+ /** Hard floor on active segments; merges stop once reached. */
+ private Integer minSegments;
+
+ /** Max merges in a segment's lineage before merges are disabled for it. */
+ private Integer maxDagDepth;
+
+ /** Minimum seconds between automatic splits (coalesces bursts). */
+ private Long splitCooldownSeconds;
+
+ /** Minimum seconds between automatic merges. */
+ private Long mergeCooldownSeconds;
+
+ /** Seconds a segment pair must stay cold before becoming merge-eligible.
*/
+ private Long mergeWindowSeconds;
+
+ /** Inbound msg/s above which a segment is split. */
+ private Double splitMsgRateInThreshold;
+
+ /** Inbound bytes/s above which a segment is split. */
+ private Long splitBytesRateInThreshold;
+
+ /** Outbound (dispatched) msg/s above which a segment is split. */
+ private Double splitMsgRateOutThreshold;
+
+ /** Outbound bytes/s above which a segment is split. */
+ private Long splitBytesRateOutThreshold;
+
+ /** Inbound msg/s below which a segment counts as cold for merging. */
+ private Double mergeMsgRateInThreshold;
+
+ /** Inbound bytes/s below which a segment counts as cold for merging. */
+ private Long mergeBytesRateInThreshold;
+
+ /** Outbound msg/s below which a segment counts as cold for merging. */
+ private Double mergeMsgRateOutThreshold;
+
+ /** Outbound bytes/s below which a segment counts as cold for merging. */
+ private Long mergeBytesRateOutThreshold;
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index db0aa0620d5..df3cf53a3ca 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -61,6 +61,8 @@ public class Policies {
public AutoTopicCreationOverride autoTopicCreationOverride = null;
// If set, it will override the broker settings for allowing auto
subscription creation
public AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
null;
+ // If set, it will override the broker's scalable-topic auto split/merge
settings (PIP-483)
+ public AutoScalePolicyOverride scalableTopicAutoScalePolicy = null;
public Map<String, PublishRate> publishMaxMessageRate = new HashMap<>();
@SuppressWarnings("checkstyle:MemberName")
@@ -158,7 +160,7 @@ public class Policies {
backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
topicDispatchRate, subscriptionDispatchRate,
replicatorDispatchRate,
clusterSubscribeRate, deduplicationEnabled,
autoTopicCreationOverride,
- autoSubscriptionCreationOverride, persistence,
+ autoSubscriptionCreationOverride,
scalableTopicAutoScalePolicy, persistence,
bundles, latency_stats_sample_rate,
message_ttl_in_seconds, subscription_expiration_time_minutes,
retention_policies,
encryption_required, delayed_delivery_policies,
inactive_topic_policies,
@@ -198,6 +200,7 @@ public class Policies {
&& Objects.equals(deduplicationEnabled,
other.deduplicationEnabled)
&& Objects.equals(autoTopicCreationOverride,
other.autoTopicCreationOverride)
&& Objects.equals(autoSubscriptionCreationOverride,
other.autoSubscriptionCreationOverride)
+ && Objects.equals(scalableTopicAutoScalePolicy,
other.scalableTopicAutoScalePolicy)
&& Objects.equals(persistence, other.persistence) &&
Objects.equals(bundles, other.bundles)
&& Objects.equals(latency_stats_sample_rate,
other.latency_stats_sample_rate)
&& Objects.equals(message_ttl_in_seconds,
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
index 6f3123228f1..ce246a1340d 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadata.java
@@ -54,6 +54,12 @@ public class ScalableTopicMetadata {
@Builder.Default
private Map<String, String> properties = Map.of();
+ /**
+ * Per-topic auto split/merge policy override (PIP-483). {@code null}
means no override:
+ * the namespace policy and then the broker configuration apply.
+ */
+ private AutoScalePolicyOverride autoScalePolicy;
+
/**
* Describes a single segment in a scalable topic's DAG.
*/
diff --git
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
index 4784ea9b1b9..6259d9799a4 100644
---
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
+++
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/common/policies/data/ScalableTopicMetadataTest.java
@@ -79,13 +79,16 @@ public class ScalableTopicMetadataTest {
segments.put(0L, activeSegment(0L, 0, 0xFFFF, 0L));
Map<String, String> props = Map.of("retention", "7d");
+ AutoScalePolicyOverride autoScalePolicy =
+ AutoScalePolicyOverride.builder().enabled(false).build();
- ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments,
props);
+ ScalableTopicMetadata md = new ScalableTopicMetadata(3L, 1L, segments,
props, autoScalePolicy);
assertEquals(md.getEpoch(), 3L);
assertEquals(md.getNextSegmentId(), 1L);
assertEquals(md.getSegments(), segments);
assertEquals(md.getProperties(), props);
+ assertEquals(md.getAutoScalePolicy(), autoScalePolicy);
}
@Test
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 75a331a45db..fd0e5c5bec0 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -566,6 +567,45 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
return asyncDeleteRequest(path);
}
+ @Override
+ public void setScalableTopicAutoScalePolicy(String namespace,
+ AutoScalePolicyOverride override) throws PulsarAdminException {
+ sync(() -> setScalableTopicAutoScalePolicyAsync(namespace, override));
+ }
+
+ @Override
+ public CompletableFuture<Void> setScalableTopicAutoScalePolicyAsync(
+ String namespace, AutoScalePolicyOverride override) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy");
+ return asyncPostRequest(path, Entity.entity(override,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String
namespace)
+ throws PulsarAdminException {
+ return sync(() -> getScalableTopicAutoScalePolicyAsync(namespace));
+ }
+
+ @Override
+ public CompletableFuture<AutoScalePolicyOverride>
getScalableTopicAutoScalePolicyAsync(String namespace) {
+ return asyncGetNamespaceParts(new
FutureCallback<AutoScalePolicyOverride>() {
+ }, namespace,
+ "scalableTopicAutoScalePolicy");
+ }
+
+ @Override
+ public void removeScalableTopicAutoScalePolicy(String namespace) throws
PulsarAdminException {
+ sync(() -> removeScalableTopicAutoScalePolicyAsync(namespace));
+ }
+
+ @Override
+ public CompletableFuture<Void>
removeScalableTopicAutoScalePolicyAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "scalableTopicAutoScalePolicy");
+ return asyncDeleteRequest(path);
+ }
+
@Override
public void setAutoSubscriptionCreation(String namespace,
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride)
throws PulsarAdminException {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index e11c6cfe747..e4905aa63c8 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.ScalableTopics;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoScalePolicyOverride;
import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -141,6 +142,43 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
return asyncGetRequest(topicPath(tn), ScalableTopicMetadata.class);
}
+ // --- Auto split/merge policy override (PIP-483) ---
+
+ @Override
+ public void setAutoScalePolicy(String topic, AutoScalePolicyOverride
override)
+ throws PulsarAdminException {
+ sync(() -> setAutoScalePolicyAsync(topic, override));
+ }
+
+ @Override
+ public CompletableFuture<Void> setAutoScalePolicyAsync(String topic,
AutoScalePolicyOverride override) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn).path("autoScalePolicy");
+ return asyncPostRequest(path, Entity.entity(override,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
PulsarAdminException {
+ return sync(() -> getAutoScalePolicyAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<AutoScalePolicyOverride>
getAutoScalePolicyAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ return asyncGetRequest(topicPath(tn).path("autoScalePolicy"),
AutoScalePolicyOverride.class);
+ }
+
+ @Override
+ public void removeAutoScalePolicy(String topic) throws
PulsarAdminException {
+ sync(() -> removeAutoScalePolicyAsync(topic));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeAutoScalePolicyAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ return asyncDeleteRequest(topicPath(tn).path("autoScalePolicy"));
+ }
+
// --- Delete ---
@Override
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index fd05ee40e76..2a2b56c599b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -55,6 +55,7 @@ public enum PolicyName {
DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
ALLOW_CLUSTERS,
ALLOW_CUSTOM_METRIC_LABELS,
+ SCALABLE_TOPIC_AUTO_SCALE,
// cluster policies
CLUSTER_MIGRATION,