This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new ba486bf78b [#10412] feat(server-common): Support group ownership in 
JcasbinAuthorizer (#10867)
ba486bf78b is described below

commit ba486bf78b7eb7a1d8519d6892d94c0f1f643632
Author: Bharath Krishna <[email protected]>
AuthorDate: Sat Apr 25 02:57:47 2026 -0700

    [#10412] feat(server-common): Support group ownership in JcasbinAuthorizer 
(#10867)
    
    ### What changes were proposed in this pull request?
    
    Extend `JcasbinAuthorizer` to recognize group-based ownership. When a
    metadata object is owned by a group, all members of that group are now
    treated as owners and granted owner privileges.
    
    Key changes:
    - **`OwnerInfo` inner class**: Replaces the raw `Long` owner ID in the
    cache with a struct that stores `id`, `type` (USER/GROUP), and `name`.
    - **`ownerRel` cache type**: Changed from `Cache<Long, Optional<Long>>`
    to `Cache<Long, Optional<OwnerInfo>>`.
    - **`loadOwnerPolicy()`**: Now handles `GroupEntity` alongside
    `UserEntity` when populating the owner cache.
    - **`checkOwnership()`**: New method that resolves both user and group
    owners — for USER owners it compares entity IDs, for GROUP owners it
    checks whether the principal's groups include the owning group.
    - **`isOwner()` and `authorizeByJcasbin()`**: Refactored to delegate to
    `checkOwnership()`, eliminating duplicated ownership logic.
    - **Documentation**: Removed "group ownership not supported" info boxes
    from `docs/security/access-control.md` and added group ownership bullet.
    
    ### Why are the changes needed?
    
    Currently, when a metadata object's owner is set to a group (supported
    since #10848), the `JcasbinAuthorizer` does not recognize group
    ownership — only individual user ownership is checked. This means group
    members are denied owner privileges even when the group is the
    registered owner.
    
    This PR is part 2 of a 3-PR series for #10412:
    1. **#10848** — Core/API: `OwnerManager.setOwner` accepts GROUP
    (**merged**)
    2. **This PR** — Enforcement: `JcasbinAuthorizer` recognizes group
    ownership
    3. **Planned** — Role inheritance: `loadRolePrivilege` queries
    `ROLE_GROUP_REL`
    
    Fix: #10412
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users who assign group ownership to metadata objects will now have
    all group members recognized as owners by the JCasbin authorization
    plugin.
    
    ### How was this patch tested?
    
    Unit tests in `TestJcasbinAuthorizer`:
    - `testAuthorizeByGroupOwner`: Verifies a principal whose groups include
    the owning group is recognized as owner, a non-member group is denied,
    and clearing ownership returns false.
    - Existing tests (`testIsOwner`, `testOwnerCacheInvalidation`,
    `testCacheInitialization`) updated for `OwnerInfo` type.
    - All 8 tests pass: `./gradlew :server-common:test --tests
    "...TestJcasbinAuthorizer"`
---
 docs/security/access-control.md                    |   9 +-
 .../authorization/jcasbin/JcasbinAuthorizer.java   |  93 +++++++++++++++---
 .../jcasbin/TestJcasbinAuthorizer.java             | 104 ++++++++++++++++++++-
 3 files changed, 182 insertions(+), 24 deletions(-)

diff --git a/docs/security/access-control.md b/docs/security/access-control.md
index 4e023c5164..76858304c7 100644
--- a/docs/security/access-control.md
+++ b/docs/security/access-control.md
@@ -132,10 +132,7 @@ Every securable object in Gravitino has an owner - the 
user with administrative
 - **Automatic assignment**: The creator of an object automatically becomes its 
owner
 - **Administrative privileges**: Owners have implicit management privileges 
(e.g., drop, alter)
 - **Exclusive control**: Only the owner can fully manage the object
-
-:::info
-Group ownership is not currently supported. Only user ownership is available.
-:::
+- **Group ownership**: Ownership can be assigned to a group, granting all 
members of that group owner privileges
 
 **Supported Objects:**
 
@@ -173,10 +170,6 @@ A group is a collection of users that simplifies 
permission management by allowi
 
 All users in a group inherit the roles and privileges granted to that group.
 
-:::info
-Groups can be granted roles and privileges, but they cannot be owners of 
securable objects. Only users can be owners.
-:::
-
 ### Metadata Objects
 
 Metadata objects are entities managed by Gravitino, such as catalogs, schemas, 
tables, filesets, topics, models, functions, roles, and metalakes.
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 7630f71ae3..e675277ab2 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Configs;
@@ -45,6 +46,8 @@ import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.UserGroup;
+import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.authorization.AuthorizationUtils;
@@ -52,6 +55,7 @@ import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.exceptions.NoSuchUserException;
+import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
@@ -87,7 +91,7 @@ public class JcasbinAuthorizer implements GravitinoAuthorizer 
{
    */
   private Cache<Long, Boolean> loadedRoles;
 
-  private Cache<Long, Optional<Long>> ownerRel;
+  private Cache<Long, Optional<OwnerInfo>> ownerRel;
 
   private Executor executor = null;
 
@@ -218,15 +222,11 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       String metalake,
       MetadataObject metadataObject,
       AuthorizationRequestContext requestContext) {
-    Long userId;
     boolean result;
     try {
       Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
       loadOwnerPolicy(metalake, metadataObject, metadataId);
-      UserEntity userEntity = getUserEntity(principal.getName(), metalake);
-      userId = userEntity.id();
-      metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      result = Objects.equals(Optional.of(userId), 
ownerRel.getIfPresent(metadataId));
+      result = checkOwnership(principal, metalake, metadataId);
     } catch (Exception e) {
       LOG.debug("Can not get entity id", e);
       result = false;
@@ -457,14 +457,17 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         return false;
       }
       loadRolePrivilege(metalake, username, userId, requestContext);
-      return authorizeByJcasbin(userId, metadataObject, metadataId, privilege);
+      return authorizeByJcasbin(userId, metalake, metadataObject, metadataId, 
privilege);
     }
 
     private boolean authorizeByJcasbin(
-        Long userId, MetadataObject metadataObject, Long metadataId, String 
privilege) {
+        Long userId,
+        String metalake,
+        MetadataObject metadataObject,
+        Long metadataId,
+        String privilege) {
       if (AuthConstants.OWNER.equals(privilege)) {
-        Optional<Long> owner = ownerRel.getIfPresent(metadataId);
-        return Objects.equals(Optional.of(userId), owner);
+        return checkOwnership(PrincipalUtils.getCurrentPrincipal(), metalake, 
metadataId);
       }
       return enforcer.enforce(
           String.valueOf(userId),
@@ -556,7 +559,14 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         for (Entity ownerEntity : owners) {
           if (ownerEntity instanceof UserEntity) {
             UserEntity user = (UserEntity) ownerEntity;
-            ownerRel.put(metadataId, Optional.of(user.id()));
+            ownerRel.put(
+                metadataId,
+                Optional.of(new OwnerInfo(user.id(), Entity.EntityType.USER, 
user.name())));
+          } else if (ownerEntity instanceof GroupEntity) {
+            GroupEntity group = (GroupEntity) ownerEntity;
+            ownerRel.put(
+                metadataId,
+                Optional.of(new OwnerInfo(group.id(), Entity.EntityType.GROUP, 
group.name())));
           }
         }
       }
@@ -601,4 +611,65 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       }
     }
   }
+
+  /**
+   * Checks whether the given principal is the owner of the metadata object 
identified by
+   * metadataId. Supports both user and group ownership.
+   */
+  private boolean checkOwnership(Principal principal, String metalake, Long 
metadataId) {
+    Optional<OwnerInfo> ownerOpt = ownerRel.getIfPresent(metadataId);
+    if (ownerOpt == null || !ownerOpt.isPresent()) {
+      return false;
+    }
+    OwnerInfo owner = ownerOpt.get();
+    // We compare by entity ID rather than name to guard against stale cache 
entries.
+    // If a user/group is deleted and recreated with the same name, the cached 
OwnerInfo
+    // still holds the old ID. A name-only comparison would incorrectly grant 
ownership
+    // to the new entity. The extra IO to fetch the current entity ensures 
correctness.
+    if (owner.type == Entity.EntityType.USER) {
+      try {
+        UserEntity userEntity = getUserEntity(principal.getName(), metalake);
+        return Objects.equals(userEntity.id(), owner.id);
+      } catch (Exception e) {
+        LOG.debug("Can not get user entity for ownership check", e);
+        return false;
+      }
+    } else if (owner.type == Entity.EntityType.GROUP) {
+      if (principal instanceof UserPrincipal) {
+        List<UserGroup> groups = ((UserPrincipal) principal).getGroups();
+        if (groups.isEmpty()) {
+          return false;
+        }
+        try {
+          List<NameIdentifier> groupIdents =
+              groups.stream()
+                  .map(g -> NameIdentifierUtil.ofGroup(metalake, 
g.getGroupname()))
+                  .collect(Collectors.toList());
+          List<GroupEntity> groupEntities =
+              GravitinoEnv.getInstance()
+                  .entityStore()
+                  .batchGet(groupIdents, Entity.EntityType.GROUP, 
GroupEntity.class);
+          return groupEntities.stream().anyMatch(ge -> Objects.equals(ge.id(), 
owner.id));
+        } catch (Exception e) {
+          LOG.debug("Can not get group entities for ownership check", e);
+          return false;
+        }
+      }
+      return false;
+    }
+    return false;
+  }
+
+  /** Holds the owner identity for a metadata object in the owner cache. */
+  static class OwnerInfo {
+    final Long id;
+    final Entity.EntityType type;
+    final String name;
+
+    OwnerInfo(Long id, Entity.EntityType type, String name) {
+      this.id = id;
+      this.type = type;
+      this.name = name;
+    }
+  }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index 7bafa045db..fb925cc915 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -51,17 +51,20 @@ import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.UserGroup;
 import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.ServerConfig;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
+import 
org.apache.gravitino.server.authorization.jcasbin.JcasbinAuthorizer.OwnerInfo;
 import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
 import org.apache.gravitino.storage.relational.service.OwnerMetaService;
 import org.apache.gravitino.storage.relational.utils.POConverters;
@@ -91,6 +94,10 @@ public class TestJcasbinAuthorizer {
 
   private static final String METALAKE = "testMetalake";
 
+  private static final Long GROUP_ID = 6L;
+
+  private static final String GROUP_NAME = "testGroup";
+
   private static EntityStore entityStore = mock(EntityStore.class);
 
   private static GravitinoEnv gravitinoEnv = mock(GravitinoEnv.class);
@@ -259,6 +266,84 @@ public class TestJcasbinAuthorizer {
     assertFalse(doAuthorizeOwner(currentPrincipal));
   }
 
+  @Test
+  public void testAuthorizeByGroupOwner() throws Exception {
+    // Set up a UserPrincipal whose groups include GROUP_NAME
+    UserPrincipal groupPrincipal =
+        new UserPrincipal(USERNAME, ImmutableList.of(new 
UserGroup(Optional.empty(), GROUP_NAME)));
+    
principalUtilsMockedStatic.when(PrincipalUtils::getCurrentPrincipal).thenReturn(groupPrincipal);
+
+    NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE, 
"testCatalog");
+
+    // Mock entityStore.batchGet for group entity lookup (needed for ID-based 
ownership
+    // verification)
+    when(entityStore.batchGet(
+            eq(ImmutableList.of(NameIdentifierUtil.ofGroup(METALAKE, 
GROUP_NAME))),
+            eq(Entity.EntityType.GROUP),
+            eq(GroupEntity.class)))
+        .thenReturn(ImmutableList.of(getGroupEntity()));
+
+    // For non-member principal, mock batchGet for "otherGroup"
+    when(entityStore.batchGet(
+            eq(ImmutableList.of(NameIdentifierUtil.ofGroup(METALAKE, 
"otherGroup"))),
+            eq(Entity.EntityType.GROUP),
+            eq(GroupEntity.class)))
+        .thenReturn(
+            ImmutableList.of(
+                GroupEntity.builder()
+                    .withId(99L)
+                    .withName("otherGroup")
+                    .withNamespace(Namespace.of(METALAKE, "group"))
+                    .withAuditInfo(AuditInfo.EMPTY)
+                    .build()));
+
+    // Mock owner relation returning a GroupEntity
+    List<GroupEntity> owners = ImmutableList.of(getGroupEntity());
+    doReturn(owners)
+        .when(supportsRelationOperations)
+        .listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.OWNER_REL),
+            eq(catalogIdent),
+            eq(Entity.EntityType.CATALOG));
+    getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
+
+    // The principal belongs to the owning group, so isOwner should return true
+    assertTrue(doAuthorizeOwner(groupPrincipal));
+
+    // Clear owner and verify it returns false
+    doReturn(new ArrayList<>())
+        .when(supportsRelationOperations)
+        .listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.OWNER_REL),
+            eq(catalogIdent),
+            eq(Entity.EntityType.CATALOG));
+    jcasbinAuthorizer.handleMetadataOwnerChange(
+        METALAKE, GROUP_ID, catalogIdent, Entity.EntityType.CATALOG);
+    assertFalse(doAuthorizeOwner(groupPrincipal));
+
+    // Verify a principal whose groups do NOT include the owner group gets 
denied
+    UserPrincipal nonMemberPrincipal =
+        new UserPrincipal(
+            USERNAME, ImmutableList.of(new UserGroup(Optional.empty(), 
"otherGroup")));
+    principalUtilsMockedStatic
+        .when(PrincipalUtils::getCurrentPrincipal)
+        .thenReturn(nonMemberPrincipal);
+    // Re-populate the owner cache with the group owner
+    doReturn(ImmutableList.of(getGroupEntity()))
+        .when(supportsRelationOperations)
+        .listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.OWNER_REL),
+            eq(catalogIdent),
+            eq(Entity.EntityType.CATALOG));
+    getOwnerRelCache(jcasbinAuthorizer).invalidateAll();
+    assertFalse(doAuthorizeOwner(nonMemberPrincipal));
+
+    // Restore the original principal mock
+    principalUtilsMockedStatic
+        .when(PrincipalUtils::getCurrentPrincipal)
+        .thenReturn(new UserPrincipal(USERNAME));
+  }
+
   private Boolean doAuthorize(Principal currentPrincipal) {
     return jcasbinAuthorizer.authorize(
         currentPrincipal,
@@ -285,6 +370,15 @@ public class TestJcasbinAuthorizer {
         .build();
   }
 
+  private static GroupEntity getGroupEntity() {
+    return GroupEntity.builder()
+        .withId(GROUP_ID)
+        .withName(GROUP_NAME)
+        .withNamespace(Namespace.of(METALAKE, "group"))
+        .withAuditInfo(AuditInfo.EMPTY)
+        .build();
+  }
+
   private static RoleEntity getRoleEntity(
       Long roleId, String roleName, List<SecurableObject> securableObjects) {
     Namespace namespace = NamespaceUtil.ofRole(METALAKE);
@@ -383,10 +477,10 @@ public class TestJcasbinAuthorizer {
   @Test
   public void testOwnerCacheInvalidation() throws Exception {
     // Get the ownerRel cache via reflection
-    Cache<Long, Optional<Long>> ownerRel = getOwnerRelCache(jcasbinAuthorizer);
+    Cache<Long, Optional<OwnerInfo>> ownerRel = 
getOwnerRelCache(jcasbinAuthorizer);
 
     // Manually add an owner relation to the cache
-    ownerRel.put(CATALOG_ID, Optional.of(USER_ID));
+    ownerRel.put(CATALOG_ID, Optional.of(new OwnerInfo(USER_ID, 
Entity.EntityType.USER, USERNAME)));
 
     // Verify it's in the cache
     assertNotNull(ownerRel.getIfPresent(CATALOG_ID));
@@ -441,7 +535,7 @@ public class TestJcasbinAuthorizer {
   public void testCacheInitialization() throws Exception {
     // Verify that caches are initialized
     Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
-    Cache<Long, Optional<Long>> ownerRel = getOwnerRelCache(jcasbinAuthorizer);
+    Cache<Long, Optional<OwnerInfo>> ownerRel = 
getOwnerRelCache(jcasbinAuthorizer);
 
     assertNotNull(loadedRoles, "loadedRoles cache should be initialized");
     assertNotNull(ownerRel, "ownerRel cache should be initialized");
@@ -597,11 +691,11 @@ public class TestJcasbinAuthorizer {
   }
 
   @SuppressWarnings("unchecked")
-  private static Cache<Long, Optional<Long>> 
getOwnerRelCache(JcasbinAuthorizer authorizer)
+  private static Cache<Long, Optional<OwnerInfo>> 
getOwnerRelCache(JcasbinAuthorizer authorizer)
       throws Exception {
     Field field = JcasbinAuthorizer.class.getDeclaredField("ownerRel");
     field.setAccessible(true);
-    return (Cache<Long, Optional<Long>>) field.get(authorizer);
+    return (Cache<Long, Optional<OwnerInfo>>) field.get(authorizer);
   }
 
   private static Enforcer getAllowEnforcer(JcasbinAuthorizer authorizer) 
throws Exception {

Reply via email to