This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch mattison/authorization-operation-metrics in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 78657b803369d81ef01aa69bece3f43e81663c59 Author: mattisonchao <[email protected]> AuthorDate: Mon Apr 13 18:19:59 2026 +0800 Add authorization operation metrics --- .../broker/authorization/AuthorizationService.java | 103 ++++++++++++++------- .../metrics/AuthorizationMetrics.java | 44 +++++++++ .../authorization/AuthorizationServiceTest.java | 45 +++++++++ 3 files changed, 160 insertions(+), 32 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 2964db30a55..3acc7eb400e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -32,6 +32,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationParameters; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; @@ -102,12 +103,13 @@ public class AuthorizationService { } public CompletableFuture<Boolean> isSuperUser(String user, AuthenticationDataSource authenticationData) { - return provider.isSuperUser(user, authenticationData, conf); + return recordAuthorizationDenial(provider.isSuperUser(user, authenticationData, conf), "superuser", "check"); } public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) { - return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData); + return recordAuthorizationDenial(provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData), + "tenant_admin", "check"); } /** @@ -529,7 +531,8 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTenantOperationAsync(tenantName, role, operation, authData); + return recordAuthorizationDenial(provider.allowTenantOperationAsync(tenantName, role, operation, authData), + "tenant", operation.name().toLowerCase()); } public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName, @@ -538,7 +541,7 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("tenant", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTenantOperationAsync( @@ -559,18 +562,23 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("broker", brokerOperation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, - brokerOperation, role, authData); - final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, - brokerOperation, originalRole, authData); + final var isRoleAuthorizedFuture = recordAuthorizationDenial( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), + "broker", brokerOperation.name().toLowerCase()); + final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, originalRole, + authData), + "broker", brokerOperation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData); + return recordAuthorizationDenial( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), + "broker", brokerOperation.name().toLowerCase()); } } @@ -580,18 +588,22 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("cluster", clusterOperation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, - clusterOperation, role, authData); - final var isOriginalAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, - clusterOperation, originalRole, authData); + final var isRoleAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), + "cluster", clusterOperation.name().toLowerCase()); + final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterOperationAsync(clusterName, clusterOperation, originalRole, authData), + "cluster", clusterOperation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData); + return recordAuthorizationDenial( + provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), + "cluster", clusterOperation.name().toLowerCase()); } } @@ -602,18 +614,22 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("cluster_policy", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, role, - policy, operation, authData); - final var isOriginalAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, originalRole, - policy, operation, authData); + final var isRoleAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), + "cluster_policy", operation.name().toLowerCase()); + final var isOriginalAuthorizedFuture = recordAuthorizationDenial( + provider.allowClusterPolicyOperationAsync(clusterName, originalRole, policy, operation, authData), + "cluster_policy", operation.name().toLowerCase()); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData); + return recordAuthorizationDenial( + provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), + "cluster_policy", operation.name().toLowerCase()); } } @@ -657,7 +673,8 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData); + return recordAuthorizationDenial(provider.allowNamespaceOperationAsync(namespaceName, role, operation, + authData), "namespace", operation.name().toLowerCase()); } public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName, @@ -666,7 +683,7 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("namespace", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespaceOperationAsync( @@ -700,7 +717,8 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData); + return recordAuthorizationDenial(provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, + role, authData), "namespace_policy", operation.name().toLowerCase()); } public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -710,7 +728,7 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("namespace_policy", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( @@ -763,7 +781,8 @@ public class AuthorizationService { if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData); + return recordAuthorizationDenial(provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, + authData), "topic_policy", operation.name().toLowerCase()); } public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topicName, @@ -773,7 +792,7 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("topic_policy", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicPolicyOperationAsync( @@ -833,8 +852,9 @@ public class AuthorizationService { return CompletableFuture.completedFuture(true); } - CompletableFuture<Boolean> allowFuture = - provider.allowTopicOperationAsync(topicName, role, operation, authData); + CompletableFuture<Boolean> allowFuture = recordAuthorizationDenial( + provider.allowTopicOperationAsync(topicName, role, operation, authData), + "topic", operation.name().toLowerCase()); if (log.isDebugEnabled()) { return allowFuture.whenComplete((allowed, exception) -> { if (exception == null) { @@ -870,7 +890,7 @@ public class AuthorizationService { AuthenticationDataSource originalAuthData, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, originalAuthData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("topic", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -890,7 +910,7 @@ public class AuthorizationService { String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture("topic", operation.name().toLowerCase()); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture<Boolean> isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -938,4 +958,23 @@ public class AuthorizationService { public CompletableFuture<Map<String, Set<String>>> getSubscriptionPermissionsAsync(NamespaceName namespaceName) { return provider.getSubscriptionPermissionsAsync(namespaceName); } + + private CompletableFuture<Boolean> deniedFuture(String resourceType, String operation) { + AuthorizationMetrics.recordFailure(resourceType, operation); + return CompletableFuture.completedFuture(false); + } + + private CompletableFuture<Boolean> recordAuthorizationDenial(CompletableFuture<Boolean> authorizationFuture, + String resourceType, + String operation) { + return authorizationFuture.whenComplete((allowed, exception) -> { + if (exception == null) { + if (Boolean.TRUE.equals(allowed)) { + AuthorizationMetrics.recordSuccess(resourceType, operation); + } else if (Boolean.FALSE.equals(allowed)) { + AuthorizationMetrics.recordFailure(resourceType, operation); + } + } + }); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java new file mode 100644 index 00000000000..5f9cbf32272 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authorization.metrics; + +import io.prometheus.client.Counter; + +public final class AuthorizationMetrics { + public static final String AUTHORIZATION_OPERATIONS_METRIC_NAME = "pulsar_authorization_operations_total"; + public static final String RESULT_SUCCESS = "success"; + public static final String RESULT_FAILURE = "failure"; + + private static final Counter authorizationOperations = Counter.build() + .name(AUTHORIZATION_OPERATIONS_METRIC_NAME) + .help("Pulsar authorization operations") + .labelNames("resource_type", "operation", "result") + .register(); + + private AuthorizationMetrics() { + } + + public static void recordSuccess(String resourceType, String operation) { + authorizationOperations.labels(resourceType, operation, RESULT_SUCCESS).inc(); + } + + public static void recordFailure(String resourceType, String operation) { + authorizationOperations.labels(resourceType, operation, RESULT_FAILURE).inc(); + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java index 6f9dffa11b9..481a633625b 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java @@ -20,9 +20,11 @@ package org.apache.pulsar.broker.authorization; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; +import io.prometheus.client.CollectorRegistry; import java.util.HashSet; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -132,4 +134,47 @@ public class AuthorizationServiceTest { PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get(); checkResult(shouldPass, isAuthorized); } + + @Test + public void testAuthorizationFailureMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "failure"); + boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "fail.client", null).get(); + double after = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "failure"); + + assertFalse(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationFailureMetricForInvalidOriginalPrincipal() throws Exception { + double before = getAuthorizationOperations("namespace", NamespaceOperation.PACKAGES.name().toLowerCase(), + "failure"); + boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"), + NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", null).get(); + double after = getAuthorizationOperations("namespace", NamespaceOperation.PACKAGES.name().toLowerCase(), + "failure"); + + assertFalse(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationSuccessMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "success"); + boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "pass.client", null).get(); + double after = getAuthorizationOperations("topic", TopicOperation.PRODUCE.name().toLowerCase(), "success"); + + assertTrue(isAuthorized); + assertTrue(after - before == 1.0d); + } + + private double getAuthorizationOperations(String resourceType, String operation, String result) { + Double sample = CollectorRegistry.defaultRegistry.getSampleValue( + AuthorizationMetrics.AUTHORIZATION_OPERATIONS_METRIC_NAME, + new String[] {"resource_type", "operation", "result"}, + new String[] {resourceType, operation, result}); + return sample == null ? 0.0d : sample; + } }
