This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d200fd5e85f [improve][broker] Add authn/authz to Scalable Topics Admin
API (PIP-460) (#25618)
d200fd5e85f is described below
commit d200fd5e85f3e343be29cc373008dc928c7fd370
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 29 23:17:04 2026 +0300
[improve][broker] Add authn/authz to Scalable Topics Admin API (PIP-460)
(#25618)
---
.../pulsar/broker/admin/v2/ScalableTopics.java | 122 +++++------
.../apache/pulsar/broker/admin/v2/Segments.java | 27 ++-
.../broker/admin/ScalableTopicsAuthZTest.java | 228 +++++++++++++++++++++
3 files changed, 311 insertions(+), 66 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index f4c5726fe51..6e7aa7f6723 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -28,6 +28,7 @@ import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
@@ -49,9 +50,12 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.scalable.ScalableTopicController;
+import org.apache.pulsar.broker.service.scalable.ScalableTopicService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentTopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -91,7 +95,8 @@ public class ScalableTopics extends AdminResource {
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- resources().listScalableTopicsAsync(namespaceName)
+ validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPICS)
+ .thenCompose(__ ->
resources().listScalableTopicsAsync(namespaceName))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error().attr("clientAppId",
clientAppId()).attr("namespace", namespaceName)
@@ -128,19 +133,19 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- if (numInitialSegments < 1) {
- asyncResponse.resume(new
RestException(Response.Status.fromStatusCode(412),
- "numInitialSegments must be >= 1"));
- return;
- }
-
- Map<String, String> props = properties != null ? properties : Map.of();
- ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(
- numInitialSegments, props);
-
- // Segment persistent topics are auto-created on demand when clients
connect,
- // so we only need to store the metadata here.
- resources().createScalableTopicAsync(tn, metadata)
+ validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.CREATE_TOPIC)
+ .thenCompose(__ -> {
+ if (numInitialSegments < 1) {
+ throw new
RestException(Response.Status.fromStatusCode(412),
+ "numInitialSegments must be >= 1");
+ }
+ Map<String, String> props = properties != null ?
properties : Map.of();
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(
+ numInitialSegments, props);
+ // Segment persistent topics are auto-created on demand
when clients connect,
+ // so we only need to store the metadata here.
+ return resources().createScalableTopicAsync(tn, metadata);
+ })
.thenAccept(__ -> {
log.info().attr("clientAppId",
clientAppId()).attr("topic", tn)
.attr("numInitialSegments", numInitialSegments)
@@ -182,7 +187,8 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- resources().getScalableTopicMetadataAsync(tn)
+ validateTopicOperationAsync(tn, TopicOperation.GET_METADATA)
+ .thenCompose(__ ->
resources().getScalableTopicMetadataAsync(tn))
.thenAccept(optMd -> {
if (optMd.isEmpty()) {
asyncResponse.resume(new
RestException(Response.Status.NOT_FOUND,
@@ -223,7 +229,8 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- resources().getScalableTopicMetadataAsync(tn)
+ validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.DELETE_TOPIC)
+ .thenCompose(__ ->
resources().getScalableTopicMetadataAsync(tn))
.thenCompose(optMd -> {
if (optMd.isEmpty()) {
throw new RestException(Response.Status.NOT_FOUND,
@@ -268,14 +275,8 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- var scalableTopicService =
pulsar().getBrokerService().getScalableTopicService();
- if (scalableTopicService == null) {
- asyncResponse.resume(new
RestException(Response.Status.SERVICE_UNAVAILABLE,
- "Scalable topic service not available"));
- return;
- }
-
- scalableTopicService.getStats(tn)
+ validateTopicOperationAsync(tn, TopicOperation.GET_STATS)
+ .thenCompose(__ -> withScalableTopicService(svc ->
svc.getStats(tn)))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error().attr("clientAppId",
clientAppId()).attr("topic", tn)
@@ -313,15 +314,9 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- var scalableTopicService =
pulsar().getBrokerService().getScalableTopicService();
- if (scalableTopicService == null) {
- asyncResponse.resume(new
RestException(Response.Status.SERVICE_UNAVAILABLE,
- "Scalable topic service not available"));
- return;
- }
-
- redirectToControllerLeaderIfNeeded(tn)
- .thenCompose(__ -> scalableTopicService.createSubscription(tn,
subscription, type))
+ validateTopicOperationAsync(tn, TopicOperation.SUBSCRIBE, subscription)
+ .thenCompose(__ -> onControllerLeader(tn,
+ svc -> svc.createSubscription(tn, subscription, type)))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId())
.attr("subscription", subscription).attr("topic",
tn)
@@ -359,15 +354,9 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- var scalableTopicService =
pulsar().getBrokerService().getScalableTopicService();
- if (scalableTopicService == null) {
- asyncResponse.resume(new
RestException(Response.Status.SERVICE_UNAVAILABLE,
- "Scalable topic service not available"));
- return;
- }
-
- redirectToControllerLeaderIfNeeded(tn)
- .thenCompose(__ -> scalableTopicService.deleteSubscription(tn,
subscription))
+ validateTopicOperationAsync(tn, TopicOperation.UNSUBSCRIBE,
subscription)
+ .thenCompose(__ -> onControllerLeader(tn,
+ svc -> svc.deleteSubscription(tn, subscription)))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId())
.attr("subscription", subscription).attr("topic",
tn)
@@ -406,15 +395,8 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- var scalableTopicService =
pulsar().getBrokerService().getScalableTopicService();
- if (scalableTopicService == null) {
- asyncResponse.resume(new
RestException(Response.Status.SERVICE_UNAVAILABLE,
- "Scalable topic service not available"));
- return;
- }
-
- redirectToControllerLeaderIfNeeded(tn)
- .thenCompose(__ -> scalableTopicService.splitSegment(tn,
segmentId))
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> onControllerLeader(tn, svc ->
svc.splitSegment(tn, segmentId)))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId())
.attr("segmentId", segmentId).attr("topic", tn)
@@ -453,15 +435,9 @@ public class ScalableTopics extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
- var scalableTopicService =
pulsar().getBrokerService().getScalableTopicService();
- if (scalableTopicService == null) {
- asyncResponse.resume(new
RestException(Response.Status.SERVICE_UNAVAILABLE,
- "Scalable topic service not available"));
- return;
- }
-
- redirectToControllerLeaderIfNeeded(tn)
- .thenCompose(__ -> scalableTopicService.mergeSegments(tn,
segmentId1, segmentId2))
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> onControllerLeader(tn,
+ svc -> svc.mergeSegments(tn, segmentId1, segmentId2)))
.thenAccept(__ -> {
log.info().attr("clientAppId", clientAppId())
.attr("segmentId1", segmentId1).attr("segmentId2",
segmentId2)
@@ -480,6 +456,32 @@ public class ScalableTopics extends AdminResource {
// --- Internal helpers ---
+ /**
+ * Resolve the local scalable-topic service and pass it to {@code op}.
Surfaces a 503
+ * RestException if the service is not available on this broker.
+ */
+ private <T> CompletableFuture<T> withScalableTopicService(
+ Function<ScalableTopicService, CompletableFuture<T>> op) {
+ ScalableTopicService svc =
pulsar().getBrokerService().getScalableTopicService();
+ if (svc == null) {
+ return FutureUtil.failedFuture(new
RestException(Response.Status.SERVICE_UNAVAILABLE,
+ "Scalable topic service not available"));
+ }
+ return op.apply(svc);
+ }
+
+ /**
+ * Invoke {@code op} on the controller leader for {@code tn}. Combines the
+ * service-availability check with {@link
#redirectToControllerLeaderIfNeeded(TopicName)}
+ * so that endpoints requiring the elected controller leader can express
the operation
+ * as a single chained step.
+ */
+ private <T> CompletableFuture<T> onControllerLeader(TopicName tn,
+ Function<ScalableTopicService, CompletableFuture<T>> op) {
+ return withScalableTopicService(svc ->
redirectToControllerLeaderIfNeeded(tn)
+ .thenCompose(__ -> op.apply(svc)));
+ }
+
/**
* If this broker is not the elected controller leader for {@code tn},
redirect the
* request to the leader via HTTP 307. Read-only endpoints (like {@code
getStats}) do
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 6e81be7e672..e01961f9a30 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -51,6 +51,12 @@ import org.apache.pulsar.common.naming.TopicName;
*
* <p>These endpoints route to the broker owning the segment's namespace bundle
* via {@code validateTopicOwnershipAsync}.
+ *
+ * <p><b>Authorization:</b> all endpoints in this resource are state-modifying
+ * cross-broker coordination primitives invoked by the controller broker during
+ * scalable-topic split/merge. They require <b>super-user</b> access. End users
+ * (including tenant admins) should use the {@link ScalableTopics} resource
+ * instead, which provides the user-facing operations on scalable topics.
*/
@CustomLog
@Path("/segments")
@@ -66,9 +72,11 @@ public class Segments extends AdminResource {
@PUT
@Path("/{tenant}/{namespace}/{topic}/{descriptor}")
- @ApiOperation(value = "Create a segment topic on the owning broker.")
+ @ApiOperation(value = "Create a segment topic on the owning broker.
Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Segment topic created
successfully"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
@ApiResponse(code = 500, message = "Internal server error")})
public void createSegment(
@Suspended final AsyncResponse asyncResponse,
@@ -87,7 +95,8 @@ public class Segments extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
- validateTopicOwnershipAsync(segmentTopic, authoritative)
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
.thenCompose(__ ->
pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
.thenCompose(topic -> {
log.info().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
@@ -116,9 +125,11 @@ public class Segments extends AdminResource {
@POST
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/terminate")
- @ApiOperation(value = "Terminate a segment topic so no more messages can
be published.")
+ @ApiOperation(value = "Terminate a segment topic so no more messages can
be published. Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Segment topic terminated
successfully"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
@ApiResponse(code = 404, message = "Segment topic not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public void terminateSegment(
@@ -136,7 +147,8 @@ public class Segments extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
- validateTopicOwnershipAsync(segmentTopic, authoritative)
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
.thenCompose(__ ->
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
@@ -164,9 +176,11 @@ public class Segments extends AdminResource {
@DELETE
@Path("/{tenant}/{namespace}/{topic}/{descriptor}")
- @ApiOperation(value = "Delete a segment topic.")
+ @ApiOperation(value = "Delete a segment topic. Super-user only.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Segment topic deleted
successfully"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
@ApiResponse(code = 500, message = "Internal server error")})
public void deleteSegment(
@Suspended final AsyncResponse asyncResponse,
@@ -185,7 +199,8 @@ public class Segments extends AdminResource {
validateNamespaceName(tenant, namespace);
TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
- validateTopicOwnershipAsync(segmentTopic, authoritative)
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
.thenCompose(__ ->
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsAuthZTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsAuthZTest.java
new file mode 100644
index 00000000000..0d6f7acab39
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsAuthZTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import java.util.List;
+import java.util.UUID;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class ScalableTopicsAuthZTest extends AuthZTest {
+
+ private static final String NAMESPACE = "public/default";
+
+ @SneakyThrows
+ @BeforeClass(alwaysRun = true)
+ public void setup() {
+ configureTokenAuthentication();
+ configureDefaultAuthorization();
+ start();
+ this.superUserAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ final TenantInfo tenantInfo =
superUserAdmin.tenants().getTenantInfo("public");
+ tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
+ superUserAdmin.tenants().updateTenant("public", tenantInfo);
+ this.tenantManagerAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
+ .build();
+ }
+
+ @SneakyThrows
+ @AfterClass(alwaysRun = true)
+ public void cleanup() {
+ close();
+ }
+
+ private PulsarAdmin nobodyAdmin() throws Exception {
+ return PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(NOBODY_TOKEN))
+ .build();
+ }
+
+ private static String randomTopic() {
+ return NAMESPACE + "/" + UUID.randomUUID();
+ }
+
+ /** A segment URL is segment://tenant/namespace/topic/descriptor. */
+ private static String segmentTopicOf(String topic) {
+ return "segment://" + topic + "/0000-7fff-1";
+ }
+
+ /** Asserts that calling {@code call} throws {@link
PulsarAdminException.NotAuthorizedException}. */
+ private static void assertNotAuthorized(ThrowingRunnable call) {
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
call::run);
+ }
+
+ /**
+ * Asserts that {@code call} either succeeds or fails with anything other
than NotAuthorized.
+ * Used to verify that the role is allowed past the authz check, without
depending on the
+ * scalable-topic happy path being fully wired up in the test harness.
+ */
+ private static void assertAuthorized(ThrowingRunnable call) {
+ try {
+ call.run();
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ Assert.fail("Operation should have been authorized, got
NotAuthorized instead", e);
+ } catch (Throwable ignored) {
+ // Any other failure is fine — we only care about the authz
contract.
+ }
+ }
+
+ @FunctionalInterface
+ private interface ThrowingRunnable {
+ void run() throws Throwable;
+ }
+
+ // -------- ScalableTopics: tenant-admin-allowed endpoints --------
+
+ @Test
+ public void testListScalableTopicsAuthZ() throws Exception {
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().listScalableTopics(NAMESPACE));
+ assertAuthorized(() ->
tenantManagerAdmin.scalableTopics().listScalableTopics(NAMESPACE));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().listScalableTopics(NAMESPACE));
+ }
+
+ @Test
+ public void testCreateScalableTopicAuthZ() throws Exception {
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().createScalableTopic(randomTopic(), 1));
+ assertAuthorized(() ->
tenantManagerAdmin.scalableTopics().createScalableTopic(randomTopic(), 1));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().createScalableTopic(randomTopic(), 1));
+ }
+
+ @Test
+ public void testGetScalableTopicMetadataAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() -> nobody.scalableTopics().getMetadata(topic));
+ assertAuthorized(() ->
tenantManagerAdmin.scalableTopics().getMetadata(topic));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().getMetadata(topic));
+ }
+
+ @Test
+ public void testDeleteScalableTopicAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().deleteScalableTopic(topic, true));
+ // tenant admin allowed; we let it succeed (or get NotFound on a
second attempt).
+ assertAuthorized(() ->
tenantManagerAdmin.scalableTopics().deleteScalableTopic(topic, true));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().deleteScalableTopic(randomTopic(), true));
+ }
+
+ @Test
+ public void testGetStatsAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() -> nobody.scalableTopics().getStats(topic));
+ assertAuthorized(() ->
tenantManagerAdmin.scalableTopics().getStats(topic));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().getStats(topic));
+ }
+
+ @Test
+ public void testCreateSubscriptionAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() -> nobody.scalableTopics()
+ .createSubscription(topic, "sub-nobody",
ScalableSubscriptionType.STREAM));
+ assertAuthorized(() -> tenantManagerAdmin.scalableTopics()
+ .createSubscription(topic, "sub-tenant",
ScalableSubscriptionType.STREAM));
+ assertAuthorized(() -> superUserAdmin.scalableTopics()
+ .createSubscription(topic, "sub-super",
ScalableSubscriptionType.STREAM));
+ }
+
+ @Test
+ public void testDeleteSubscriptionAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().deleteSubscription(topic, "sub-x"));
+ assertAuthorized(() ->
tenantManagerAdmin.scalableTopics().deleteSubscription(topic, "sub-x"));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().deleteSubscription(topic, "sub-x"));
+ }
+
+ // -------- ScalableTopics: super-user-only endpoints --------
+
+ @Test
+ public void testSplitSegmentAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() -> nobody.scalableTopics().splitSegment(topic,
1L));
+ assertNotAuthorized(() ->
tenantManagerAdmin.scalableTopics().splitSegment(topic, 1L));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().splitSegment(topic, 1L));
+ }
+
+ @Test
+ public void testMergeSegmentsAuthZ() throws Exception {
+ final String topic = randomTopic();
+ superUserAdmin.scalableTopics().createScalableTopic(topic, 1);
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() -> nobody.scalableTopics().mergeSegments(topic,
1L, 2L));
+ assertNotAuthorized(() ->
tenantManagerAdmin.scalableTopics().mergeSegments(topic, 1L, 2L));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().mergeSegments(topic, 1L, 2L));
+ }
+
+ // -------- Segments resource: super-user only on all endpoints --------
+
+ @Test
+ public void testCreateSegmentAuthZ() throws Exception {
+ final String segment = segmentTopicOf(randomTopic());
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().createSegment(segment, List.of()));
+ assertNotAuthorized(() ->
tenantManagerAdmin.scalableTopics().createSegment(segment, List.of()));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().createSegment(segment, List.of()));
+ }
+
+ @Test
+ public void testTerminateSegmentAuthZ() throws Exception {
+ final String segment = segmentTopicOf(randomTopic());
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().terminateSegment(segment));
+ assertNotAuthorized(() ->
tenantManagerAdmin.scalableTopics().terminateSegment(segment));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().terminateSegment(segment));
+ }
+
+ @Test
+ public void testDeleteSegmentAuthZ() throws Exception {
+ final String segment = segmentTopicOf(randomTopic());
+ @Cleanup PulsarAdmin nobody = nobodyAdmin();
+ assertNotAuthorized(() ->
nobody.scalableTopics().deleteSegment(segment, true));
+ assertNotAuthorized(() ->
tenantManagerAdmin.scalableTopics().deleteSegment(segment, true));
+ assertAuthorized(() ->
superUserAdmin.scalableTopics().deleteSegment(segment, true));
+ }
+}