This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d200fd5e85f [improve][broker] Add authn/authz to Scalable Topics Admin 
API (PIP-460) (#25618)
d200fd5e85f is described below

commit d200fd5e85f3e343be29cc373008dc928c7fd370
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 29 23:17:04 2026 +0300

    [improve][broker] Add authn/authz to Scalable Topics Admin API (PIP-460) 
(#25618)
---
 .../pulsar/broker/admin/v2/ScalableTopics.java     | 122 +++++------
 .../apache/pulsar/broker/admin/v2/Segments.java    |  27 ++-
 .../broker/admin/ScalableTopicsAuthZTest.java      | 228 +++++++++++++++++++++
 3 files changed, 311 insertions(+), 66 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index f4c5726fe51..6e7aa7f6723 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -28,6 +28,7 @@ import java.net.URI;
 import java.net.URL;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
@@ -49,9 +50,12 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.scalable.ScalableTopicController;
+import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentTopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -91,7 +95,8 @@ public class ScalableTopics extends AdminResource {
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        resources().listScalableTopicsAsync(namespaceName)
+        validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.GET_TOPICS)
+                .thenCompose(__ -> 
resources().listScalableTopicsAsync(namespaceName))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error().attr("clientAppId", 
clientAppId()).attr("namespace", namespaceName)
@@ -128,19 +133,19 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        if (numInitialSegments < 1) {
-            asyncResponse.resume(new 
RestException(Response.Status.fromStatusCode(412),
-                    "numInitialSegments must be >= 1"));
-            return;
-        }
-
-        Map<String, String> props = properties != null ? properties : Map.of();
-        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(
-                numInitialSegments, props);
-
-        // Segment persistent topics are auto-created on demand when clients 
connect,
-        // so we only need to store the metadata here.
-        resources().createScalableTopicAsync(tn, metadata)
+        validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.CREATE_TOPIC)
+                .thenCompose(__ -> {
+                    if (numInitialSegments < 1) {
+                        throw new 
RestException(Response.Status.fromStatusCode(412),
+                                "numInitialSegments must be >= 1");
+                    }
+                    Map<String, String> props = properties != null ? 
properties : Map.of();
+                    ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(
+                            numInitialSegments, props);
+                    // Segment persistent topics are auto-created on demand 
when clients connect,
+                    // so we only need to store the metadata here.
+                    return resources().createScalableTopicAsync(tn, metadata);
+                })
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", 
clientAppId()).attr("topic", tn)
                             .attr("numInitialSegments", numInitialSegments)
@@ -182,7 +187,8 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        resources().getScalableTopicMetadataAsync(tn)
+        validateTopicOperationAsync(tn, TopicOperation.GET_METADATA)
+                .thenCompose(__ -> 
resources().getScalableTopicMetadataAsync(tn))
                 .thenAccept(optMd -> {
                     if (optMd.isEmpty()) {
                         asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
@@ -223,7 +229,8 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        resources().getScalableTopicMetadataAsync(tn)
+        validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.DELETE_TOPIC)
+                .thenCompose(__ -> 
resources().getScalableTopicMetadataAsync(tn))
                 .thenCompose(optMd -> {
                     if (optMd.isEmpty()) {
                         throw new RestException(Response.Status.NOT_FOUND,
@@ -268,14 +275,8 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        var scalableTopicService = 
pulsar().getBrokerService().getScalableTopicService();
-        if (scalableTopicService == null) {
-            asyncResponse.resume(new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
-                    "Scalable topic service not available"));
-            return;
-        }
-
-        scalableTopicService.getStats(tn)
+        validateTopicOperationAsync(tn, TopicOperation.GET_STATS)
+                .thenCompose(__ -> withScalableTopicService(svc -> 
svc.getStats(tn)))
                 .thenAccept(asyncResponse::resume)
                 .exceptionally(ex -> {
                     log.error().attr("clientAppId", 
clientAppId()).attr("topic", tn)
@@ -313,15 +314,9 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        var scalableTopicService = 
pulsar().getBrokerService().getScalableTopicService();
-        if (scalableTopicService == null) {
-            asyncResponse.resume(new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
-                    "Scalable topic service not available"));
-            return;
-        }
-
-        redirectToControllerLeaderIfNeeded(tn)
-                .thenCompose(__ -> scalableTopicService.createSubscription(tn, 
subscription, type))
+        validateTopicOperationAsync(tn, TopicOperation.SUBSCRIBE, subscription)
+                .thenCompose(__ -> onControllerLeader(tn,
+                        svc -> svc.createSubscription(tn, subscription, type)))
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", clientAppId())
                             .attr("subscription", subscription).attr("topic", 
tn)
@@ -359,15 +354,9 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        var scalableTopicService = 
pulsar().getBrokerService().getScalableTopicService();
-        if (scalableTopicService == null) {
-            asyncResponse.resume(new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
-                    "Scalable topic service not available"));
-            return;
-        }
-
-        redirectToControllerLeaderIfNeeded(tn)
-                .thenCompose(__ -> scalableTopicService.deleteSubscription(tn, 
subscription))
+        validateTopicOperationAsync(tn, TopicOperation.UNSUBSCRIBE, 
subscription)
+                .thenCompose(__ -> onControllerLeader(tn,
+                        svc -> svc.deleteSubscription(tn, subscription)))
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", clientAppId())
                             .attr("subscription", subscription).attr("topic", 
tn)
@@ -406,15 +395,8 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        var scalableTopicService = 
pulsar().getBrokerService().getScalableTopicService();
-        if (scalableTopicService == null) {
-            asyncResponse.resume(new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
-                    "Scalable topic service not available"));
-            return;
-        }
-
-        redirectToControllerLeaderIfNeeded(tn)
-                .thenCompose(__ -> scalableTopicService.splitSegment(tn, 
segmentId))
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> onControllerLeader(tn, svc -> 
svc.splitSegment(tn, segmentId)))
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", clientAppId())
                             .attr("segmentId", segmentId).attr("topic", tn)
@@ -453,15 +435,9 @@ public class ScalableTopics extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
 
-        var scalableTopicService = 
pulsar().getBrokerService().getScalableTopicService();
-        if (scalableTopicService == null) {
-            asyncResponse.resume(new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
-                    "Scalable topic service not available"));
-            return;
-        }
-
-        redirectToControllerLeaderIfNeeded(tn)
-                .thenCompose(__ -> scalableTopicService.mergeSegments(tn, 
segmentId1, segmentId2))
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> onControllerLeader(tn,
+                        svc -> svc.mergeSegments(tn, segmentId1, segmentId2)))
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", clientAppId())
                             .attr("segmentId1", segmentId1).attr("segmentId2", 
segmentId2)
@@ -480,6 +456,32 @@ public class ScalableTopics extends AdminResource {
 
     // --- Internal helpers ---
 
+    /**
+     * Resolve the local scalable-topic service and pass it to {@code op}. 
Surfaces a 503
+     * RestException if the service is not available on this broker.
+     */
+    private <T> CompletableFuture<T> withScalableTopicService(
+            Function<ScalableTopicService, CompletableFuture<T>> op) {
+        ScalableTopicService svc = 
pulsar().getBrokerService().getScalableTopicService();
+        if (svc == null) {
+            return FutureUtil.failedFuture(new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
+                    "Scalable topic service not available"));
+        }
+        return op.apply(svc);
+    }
+
+    /**
+     * Invoke {@code op} on the controller leader for {@code tn}. Combines the
+     * service-availability check with {@link 
#redirectToControllerLeaderIfNeeded(TopicName)}
+     * so that endpoints requiring the elected controller leader can express 
the operation
+     * as a single chained step.
+     */
+    private <T> CompletableFuture<T> onControllerLeader(TopicName tn,
+            Function<ScalableTopicService, CompletableFuture<T>> op) {
+        return withScalableTopicService(svc -> 
redirectToControllerLeaderIfNeeded(tn)
+                .thenCompose(__ -> op.apply(svc)));
+    }
+
     /**
      * If this broker is not the elected controller leader for {@code tn}, 
redirect the
      * request to the leader via HTTP 307. Read-only endpoints (like {@code 
getStats}) do
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 6e81be7e672..e01961f9a30 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -51,6 +51,12 @@ import org.apache.pulsar.common.naming.TopicName;
  *
  * <p>These endpoints route to the broker owning the segment's namespace bundle
  * via {@code validateTopicOwnershipAsync}.
+ *
+ * <p><b>Authorization:</b> all endpoints in this resource are state-modifying
+ * cross-broker coordination primitives invoked by the controller broker during
+ * scalable-topic split/merge. They require <b>super-user</b> access. End users
+ * (including tenant admins) should use the {@link ScalableTopics} resource
+ * instead, which provides the user-facing operations on scalable topics.
  */
 @CustomLog
 @Path("/segments")
@@ -66,9 +72,11 @@ public class Segments extends AdminResource {
 
     @PUT
     @Path("/{tenant}/{namespace}/{topic}/{descriptor}")
-    @ApiOperation(value = "Create a segment topic on the owning broker.")
+    @ApiOperation(value = "Create a segment topic on the owning broker. 
Super-user only.")
     @ApiResponses(value = {
             @ApiResponse(code = 204, message = "Segment topic created 
successfully"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void createSegment(
             @Suspended final AsyncResponse asyncResponse,
@@ -87,7 +95,8 @@ public class Segments extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
 
-        validateTopicOwnershipAsync(segmentTopic, authoritative)
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
                 .thenCompose(__ -> 
pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
                 .thenCompose(topic -> {
                     log.info().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
@@ -116,9 +125,11 @@ public class Segments extends AdminResource {
 
     @POST
     @Path("/{tenant}/{namespace}/{topic}/{descriptor}/terminate")
-    @ApiOperation(value = "Terminate a segment topic so no more messages can 
be published.")
+    @ApiOperation(value = "Terminate a segment topic so no more messages can 
be published. Super-user only.")
     @ApiResponses(value = {
             @ApiResponse(code = 204, message = "Segment topic terminated 
successfully"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
             @ApiResponse(code = 404, message = "Segment topic not found"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void terminateSegment(
@@ -136,7 +147,8 @@ public class Segments extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
 
-        validateTopicOwnershipAsync(segmentTopic, authoritative)
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
                 .thenCompose(__ -> 
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
                 .thenCompose(optTopic -> {
                     if (optTopic.isEmpty()) {
@@ -164,9 +176,11 @@ public class Segments extends AdminResource {
 
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/{descriptor}")
-    @ApiOperation(value = "Delete a segment topic.")
+    @ApiOperation(value = "Delete a segment topic. Super-user only.")
     @ApiResponses(value = {
             @ApiResponse(code = 204, message = "Segment topic deleted 
successfully"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void deleteSegment(
             @Suspended final AsyncResponse asyncResponse,
@@ -185,7 +199,8 @@ public class Segments extends AdminResource {
         validateNamespaceName(tenant, namespace);
         TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
 
-        validateTopicOwnershipAsync(segmentTopic, authoritative)
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
                 .thenCompose(__ -> 
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
                 .thenCompose(optTopic -> {
                     if (optTopic.isEmpty()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsAuthZTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsAuthZTest.java
new file mode 100644
index 00000000000..0d6f7acab39
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsAuthZTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import java.util.List;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class ScalableTopicsAuthZTest extends AuthZTest {
+
+    private static final String NAMESPACE = "public/default";
+
+    @SneakyThrows
+    @BeforeClass(alwaysRun = true)
+    public void setup() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        start();
+        this.superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        final TenantInfo tenantInfo = 
superUserAdmin.tenants().getTenantInfo("public");
+        tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+        superUserAdmin.tenants().updateTenant("public", tenantInfo);
+        this.tenantManagerAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+                .build();
+    }
+
+    @SneakyThrows
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        close();
+    }
+
+    private PulsarAdmin nobodyAdmin() throws Exception {
+        return PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(NOBODY_TOKEN))
+                .build();
+    }
+
+    private static String randomTopic() {
+        return NAMESPACE + "/" + UUID.randomUUID();
+    }
+
+    /** A segment URL is segment://tenant/namespace/topic/descriptor. */
+    private static String segmentTopicOf(String topic) {
+        return "segment://" + topic + "/0000-7fff-1";
+    }
+
+    /** Asserts that calling {@code call} throws {@link 
PulsarAdminException.NotAuthorizedException}. */
+    private static void assertNotAuthorized(ThrowingRunnable call) {
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, 
call::run);
+    }
+
+    /**
+     * Asserts that {@code call} either succeeds or fails with anything other 
than NotAuthorized.
+     * Used to verify that the role is allowed past the authz check, without 
depending on the
+     * scalable-topic happy path being fully wired up in the test harness.
+     */
+    private static void assertAuthorized(ThrowingRunnable call) {
+        try {
+            call.run();
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            Assert.fail("Operation should have been authorized, got 
NotAuthorized instead", e);
+        } catch (Throwable ignored) {
+            // Any other failure is fine — we only care about the authz 
contract.
+        }
+    }
+
+    @FunctionalInterface
+    private interface ThrowingRunnable {
+        void run() throws Throwable;
+    }
+
+    // -------- ScalableTopics: tenant-admin-allowed endpoints --------
+
+    @Test
+    public void testListScalableTopicsAuthZ() throws Exception {
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().listScalableTopics(NAMESPACE));
+        assertAuthorized(() -> 
tenantManagerAdmin.scalableTopics().listScalableTopics(NAMESPACE));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().listScalableTopics(NAMESPACE));
+    }
+
+    @Test
+    public void testCreateScalableTopicAuthZ() throws Exception {
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().createScalableTopic(randomTopic(), 1));
+        assertAuthorized(() -> 
tenantManagerAdmin.scalableTopics().createScalableTopic(randomTopic(), 1));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().createScalableTopic(randomTopic(), 1));
+    }
+
+    @Test
+    public void testGetScalableTopicMetadataAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> nobody.scalableTopics().getMetadata(topic));
+        assertAuthorized(() -> 
tenantManagerAdmin.scalableTopics().getMetadata(topic));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().getMetadata(topic));
+    }
+
+    @Test
+    public void testDeleteScalableTopicAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().deleteScalableTopic(topic, true));
+        // tenant admin allowed; we let it succeed (or get NotFound on a 
second attempt).
+        assertAuthorized(() -> 
tenantManagerAdmin.scalableTopics().deleteScalableTopic(topic, true));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().deleteScalableTopic(randomTopic(), true));
+    }
+
+    @Test
+    public void testGetStatsAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> nobody.scalableTopics().getStats(topic));
+        assertAuthorized(() -> 
tenantManagerAdmin.scalableTopics().getStats(topic));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().getStats(topic));
+    }
+
+    @Test
+    public void testCreateSubscriptionAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> nobody.scalableTopics()
+                .createSubscription(topic, "sub-nobody", 
ScalableSubscriptionType.STREAM));
+        assertAuthorized(() -> tenantManagerAdmin.scalableTopics()
+                .createSubscription(topic, "sub-tenant", 
ScalableSubscriptionType.STREAM));
+        assertAuthorized(() -> superUserAdmin.scalableTopics()
+                .createSubscription(topic, "sub-super", 
ScalableSubscriptionType.STREAM));
+    }
+
+    @Test
+    public void testDeleteSubscriptionAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().deleteSubscription(topic, "sub-x"));
+        assertAuthorized(() -> 
tenantManagerAdmin.scalableTopics().deleteSubscription(topic, "sub-x"));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().deleteSubscription(topic, "sub-x"));
+    }
+
+    // -------- ScalableTopics: super-user-only endpoints --------
+
+    @Test
+    public void testSplitSegmentAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> nobody.scalableTopics().splitSegment(topic, 
1L));
+        assertNotAuthorized(() -> 
tenantManagerAdmin.scalableTopics().splitSegment(topic, 1L));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().splitSegment(topic, 1L));
+    }
+
+    @Test
+    public void testMergeSegmentsAuthZ() throws Exception {
+        final String topic = randomTopic();
+        superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> nobody.scalableTopics().mergeSegments(topic, 
1L, 2L));
+        assertNotAuthorized(() -> 
tenantManagerAdmin.scalableTopics().mergeSegments(topic, 1L, 2L));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().mergeSegments(topic, 1L, 2L));
+    }
+
+    // -------- Segments resource: super-user only on all endpoints --------
+
+    @Test
+    public void testCreateSegmentAuthZ() throws Exception {
+        final String segment = segmentTopicOf(randomTopic());
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().createSegment(segment, List.of()));
+        assertNotAuthorized(() -> 
tenantManagerAdmin.scalableTopics().createSegment(segment, List.of()));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().createSegment(segment, List.of()));
+    }
+
+    @Test
+    public void testTerminateSegmentAuthZ() throws Exception {
+        final String segment = segmentTopicOf(randomTopic());
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().terminateSegment(segment));
+        assertNotAuthorized(() -> 
tenantManagerAdmin.scalableTopics().terminateSegment(segment));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().terminateSegment(segment));
+    }
+
+    @Test
+    public void testDeleteSegmentAuthZ() throws Exception {
+        final String segment = segmentTopicOf(randomTopic());
+        @Cleanup PulsarAdmin nobody = nobodyAdmin();
+        assertNotAuthorized(() -> 
nobody.scalableTopics().deleteSegment(segment, true));
+        assertNotAuthorized(() -> 
tenantManagerAdmin.scalableTopics().deleteSegment(segment, true));
+        assertAuthorized(() -> 
superUserAdmin.scalableTopics().deleteSegment(segment, true));
+    }
+}

Reply via email to