This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch mattison/authorization-operation-metrics
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 78657b803369d81ef01aa69bece3f43e81663c59
Author: mattisonchao <[email protected]>
AuthorDate: Mon Apr 13 18:19:59 2026 +0800

    Add authorization operation metrics
---
 .../broker/authorization/AuthorizationService.java | 103 ++++++++++++++-------
 .../metrics/AuthorizationMetrics.java              |  44 +++++++++
 .../authorization/AuthorizationServiceTest.java    |  45 +++++++++
 3 files changed, 160 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 2964db30a55..3acc7eb400e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationParameters;
+import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
 import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
@@ -102,12 +103,13 @@ public class AuthorizationService {
     }
 
     public CompletableFuture<Boolean> isSuperUser(String user, 
AuthenticationDataSource authenticationData) {
-        return provider.isSuperUser(user, authenticationData, conf);
+        return recordAuthorizationDenial(provider.isSuperUser(user, 
authenticationData, conf), "superuser", "check");
     }
 
     public CompletableFuture<Boolean> isTenantAdmin(String tenant, String 
role, TenantInfo tenantInfo,
                                                     AuthenticationDataSource 
authenticationData) {
-        return provider.isTenantAdmin(tenant, role, tenantInfo, 
authenticationData);
+        return recordAuthorizationDenial(provider.isTenantAdmin(tenant, role, 
tenantInfo, authenticationData),
+                "tenant_admin", "check");
     }
 
     /**
@@ -529,7 +531,8 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.allowTenantOperationAsync(tenantName, role, operation, 
authData);
+        return 
recordAuthorizationDenial(provider.allowTenantOperationAsync(tenantName, role, 
operation, authData),
+                "tenant", operation.name().toLowerCase());
     }
 
     public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName,
@@ -538,7 +541,7 @@ public class AuthorizationService {
                                                                 String role,
                                                                 
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("tenant", operation.name().toLowerCase());
         }
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTenantOperationAsync(
@@ -559,18 +562,23 @@ public class AuthorizationService {
                                                                 String role,
                                                                 
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("broker", 
brokerOperation.name().toLowerCase());
         }
 
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
-            final var isRoleAuthorizedFuture = 
provider.allowBrokerOperationAsync(clusterName, brokerId,
-                    brokerOperation, role, authData);
-            final var isOriginalAuthorizedFuture =  
provider.allowBrokerOperationAsync(clusterName, brokerId,
-                    brokerOperation, originalRole, authData);
+            final var isRoleAuthorizedFuture = recordAuthorizationDenial(
+                    provider.allowBrokerOperationAsync(clusterName, brokerId, 
brokerOperation, role, authData),
+                    "broker", brokerOperation.name().toLowerCase());
+            final var isOriginalAuthorizedFuture = recordAuthorizationDenial(
+                    provider.allowBrokerOperationAsync(clusterName, brokerId, 
brokerOperation, originalRole,
+                            authData),
+                    "broker", brokerOperation.name().toLowerCase());
             return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
                     (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
         } else {
-            return provider.allowBrokerOperationAsync(clusterName, brokerId, 
brokerOperation, role, authData);
+            return recordAuthorizationDenial(
+                    provider.allowBrokerOperationAsync(clusterName, brokerId, 
brokerOperation, role, authData),
+                    "broker", brokerOperation.name().toLowerCase());
         }
     }
 
@@ -580,18 +588,22 @@ public class AuthorizationService {
                                                                 String role,
                                                                 
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("cluster", 
clusterOperation.name().toLowerCase());
         }
 
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
-            final var isRoleAuthorizedFuture = 
provider.allowClusterOperationAsync(clusterName,
-                    clusterOperation, role, authData);
-            final var isOriginalAuthorizedFuture =  
provider.allowClusterOperationAsync(clusterName,
-                    clusterOperation, originalRole, authData);
+            final var isRoleAuthorizedFuture = recordAuthorizationDenial(
+                    provider.allowClusterOperationAsync(clusterName, 
clusterOperation, role, authData),
+                    "cluster", clusterOperation.name().toLowerCase());
+            final var isOriginalAuthorizedFuture = recordAuthorizationDenial(
+                    provider.allowClusterOperationAsync(clusterName, 
clusterOperation, originalRole, authData),
+                    "cluster", clusterOperation.name().toLowerCase());
             return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
                     (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
         } else {
-            return provider.allowClusterOperationAsync(clusterName, 
clusterOperation, role, authData);
+            return recordAuthorizationDenial(
+                    provider.allowClusterOperationAsync(clusterName, 
clusterOperation, role, authData),
+                    "cluster", clusterOperation.name().toLowerCase());
         }
     }
 
@@ -602,18 +614,22 @@ public class AuthorizationService {
                                                                  String role,
                                                                  
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("cluster_policy", 
operation.name().toLowerCase());
         }
 
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
-            final var isRoleAuthorizedFuture = 
provider.allowClusterPolicyOperationAsync(clusterName, role,
-                    policy, operation, authData);
-            final var isOriginalAuthorizedFuture =  
provider.allowClusterPolicyOperationAsync(clusterName, originalRole,
-                    policy, operation, authData);
+            final var isRoleAuthorizedFuture = recordAuthorizationDenial(
+                    provider.allowClusterPolicyOperationAsync(clusterName, 
role, policy, operation, authData),
+                    "cluster_policy", operation.name().toLowerCase());
+            final var isOriginalAuthorizedFuture = recordAuthorizationDenial(
+                    provider.allowClusterPolicyOperationAsync(clusterName, 
originalRole, policy, operation, authData),
+                    "cluster_policy", operation.name().toLowerCase());
             return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
                     (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
         } else {
-            return provider.allowClusterPolicyOperationAsync(clusterName, 
role, policy, operation, authData);
+            return recordAuthorizationDenial(
+                    provider.allowClusterPolicyOperationAsync(clusterName, 
role, policy, operation, authData),
+                    "cluster_policy", operation.name().toLowerCase());
         }
     }
 
@@ -657,7 +673,8 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.allowNamespaceOperationAsync(namespaceName, role, 
operation, authData);
+        return 
recordAuthorizationDenial(provider.allowNamespaceOperationAsync(namespaceName, 
role, operation,
+                authData), "namespace", operation.name().toLowerCase());
     }
 
     public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName,
@@ -666,7 +683,7 @@ public class AuthorizationService {
                                                                    String role,
                                                                    
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("namespace", operation.name().toLowerCase());
         }
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespaceOperationAsync(
@@ -700,7 +717,8 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.allowNamespacePolicyOperationAsync(namespaceName, 
policy, operation, role, authData);
+        return 
recordAuthorizationDenial(provider.allowNamespacePolicyOperationAsync(namespaceName,
 policy, operation,
+                role, authData), "namespace_policy", 
operation.name().toLowerCase());
     }
 
     public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
@@ -710,7 +728,7 @@ public class AuthorizationService {
                                                                          
String role,
                                                                          
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("namespace_policy", 
operation.name().toLowerCase());
         }
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
@@ -763,7 +781,8 @@ public class AuthorizationService {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
-        return provider.allowTopicPolicyOperationAsync(topicName, role, 
policy, operation, authData);
+        return 
recordAuthorizationDenial(provider.allowTopicPolicyOperationAsync(topicName, 
role, policy, operation,
+                authData), "topic_policy", operation.name().toLowerCase());
     }
 
     public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName 
topicName,
@@ -773,7 +792,7 @@ public class AuthorizationService {
                                                                      String 
role,
                                                                      
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("topic_policy", 
operation.name().toLowerCase());
         }
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicPolicyOperationAsync(
@@ -833,8 +852,9 @@ public class AuthorizationService {
             return CompletableFuture.completedFuture(true);
         }
 
-        CompletableFuture<Boolean> allowFuture =
-                provider.allowTopicOperationAsync(topicName, role, operation, 
authData);
+        CompletableFuture<Boolean> allowFuture = recordAuthorizationDenial(
+                provider.allowTopicOperationAsync(topicName, role, operation, 
authData),
+                "topic", operation.name().toLowerCase());
         if (log.isDebugEnabled()) {
             return allowFuture.whenComplete((allowed, exception) -> {
                 if (exception == null) {
@@ -870,7 +890,7 @@ public class AuthorizationService {
                                                                
AuthenticationDataSource originalAuthData,
                                                                
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, originalAuthData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("topic", operation.name().toLowerCase());
         }
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicOperationAsync(
@@ -890,7 +910,7 @@ public class AuthorizationService {
                                                                String role,
                                                                
AuthenticationDataSource authData) {
         if (!isValidOriginalPrincipal(role, originalRole, authData)) {
-            return CompletableFuture.completedFuture(false);
+            return deniedFuture("topic", operation.name().toLowerCase());
         }
         if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicOperationAsync(
@@ -938,4 +958,23 @@ public class AuthorizationService {
     public CompletableFuture<Map<String, Set<String>>> 
getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
         return provider.getSubscriptionPermissionsAsync(namespaceName);
     }
+
+    private CompletableFuture<Boolean> deniedFuture(String resourceType, 
String operation) {
+        AuthorizationMetrics.recordFailure(resourceType, operation);
+        return CompletableFuture.completedFuture(false);
+    }
+
+    private CompletableFuture<Boolean> 
recordAuthorizationDenial(CompletableFuture<Boolean> authorizationFuture,
+                                                                 String 
resourceType,
+                                                                 String 
operation) {
+        return authorizationFuture.whenComplete((allowed, exception) -> {
+            if (exception == null) {
+                if (Boolean.TRUE.equals(allowed)) {
+                    AuthorizationMetrics.recordSuccess(resourceType, 
operation);
+                } else if (Boolean.FALSE.equals(allowed)) {
+                    AuthorizationMetrics.recordFailure(resourceType, 
operation);
+                }
+            }
+        });
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java
new file mode 100644
index 00000000000..5f9cbf32272
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization.metrics;
+
+import io.prometheus.client.Counter;
+
+public final class AuthorizationMetrics {
+    public static final String AUTHORIZATION_OPERATIONS_METRIC_NAME = 
"pulsar_authorization_operations_total";
+    public static final String RESULT_SUCCESS = "success";
+    public static final String RESULT_FAILURE = "failure";
+
+    private static final Counter authorizationOperations = Counter.build()
+            .name(AUTHORIZATION_OPERATIONS_METRIC_NAME)
+            .help("Pulsar authorization operations")
+            .labelNames("resource_type", "operation", "result")
+            .register();
+
+    private AuthorizationMetrics() {
+    }
+
+    public static void recordSuccess(String resourceType, String operation) {
+        authorizationOperations.labels(resourceType, operation, 
RESULT_SUCCESS).inc();
+    }
+
+    public static void recordFailure(String resourceType, String operation) {
+        authorizationOperations.labels(resourceType, operation, 
RESULT_FAILURE).inc();
+    }
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
index 6f9dffa11b9..481a633625b 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java
@@ -20,9 +20,11 @@ package org.apache.pulsar.broker.authorization;
 
 import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertTrue;
+import io.prometheus.client.CollectorRegistry;
 import java.util.HashSet;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -132,4 +134,47 @@ public class AuthorizationServiceTest {
                 PolicyName.ALL, PolicyOperation.READ, originalRole, role, 
null).get();
         checkResult(shouldPass, isAuthorized);
     }
+
+    @Test
+    public void testAuthorizationFailureMetricForTopicOperation() throws 
Exception {
+        double before = getAuthorizationOperations("topic", 
TopicOperation.PRODUCE.name().toLowerCase(), "failure");
+        boolean isAuthorized = 
authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+                TopicOperation.PRODUCE, null, "fail.client", null).get();
+        double after = getAuthorizationOperations("topic", 
TopicOperation.PRODUCE.name().toLowerCase(), "failure");
+
+        assertFalse(isAuthorized);
+        assertTrue(after - before == 1.0d);
+    }
+
+    @Test
+    public void testAuthorizationFailureMetricForInvalidOriginalPrincipal() 
throws Exception {
+        double before = getAuthorizationOperations("namespace", 
NamespaceOperation.PACKAGES.name().toLowerCase(),
+                "failure");
+        boolean isAuthorized = 
authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"),
+                NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", 
null).get();
+        double after = getAuthorizationOperations("namespace", 
NamespaceOperation.PACKAGES.name().toLowerCase(),
+                "failure");
+
+        assertFalse(isAuthorized);
+        assertTrue(after - before == 1.0d);
+    }
+
+    @Test
+    public void testAuthorizationSuccessMetricForTopicOperation() throws 
Exception {
+        double before = getAuthorizationOperations("topic", 
TopicOperation.PRODUCE.name().toLowerCase(), "success");
+        boolean isAuthorized = 
authorizationService.allowTopicOperationAsync(TopicName.get("topic"),
+                TopicOperation.PRODUCE, null, "pass.client", null).get();
+        double after = getAuthorizationOperations("topic", 
TopicOperation.PRODUCE.name().toLowerCase(), "success");
+
+        assertTrue(isAuthorized);
+        assertTrue(after - before == 1.0d);
+    }
+
+    private double getAuthorizationOperations(String resourceType, String 
operation, String result) {
+        Double sample = CollectorRegistry.defaultRegistry.getSampleValue(
+                AuthorizationMetrics.AUTHORIZATION_OPERATIONS_METRIC_NAME,
+                new String[] {"resource_type", "operation", "result"},
+                new String[] {resourceType, operation, result});
+        return sample == null ? 0.0d : sample;
+    }
 }

Reply via email to