This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch 1.2.0-hotfix
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/1.2.0-hotfix by this push:
     new 15eaa8bfb4 [#10907] fix(auth): Backport JCasbin policy lookup 
optimization to 1.2.0-hotfix (#10930)
15eaa8bfb4 is described below

commit 15eaa8bfb4b59c792d9579b9ac6bcd186faad5b0
Author: Qi Yu <[email protected]>
AuthorDate: Wed May 6 17:06:02 2026 +0800

    [#10907] fix(auth): Backport JCasbin policy lookup optimization to 
1.2.0-hotfix (#10930)
    
    ### What changes were proposed in this pull request?
    
    This PR backports the optimization from #10908 to `1.2.0-hotfix`.
    
    The backport includes:
    - caching the current request's user role IDs in
    `AuthorizationRequestContext`
    - building and using a per-role policy index in `JcasbinAuthorizer`
    - keeping user-role assignment changes immediately visible during
    authorization
    - backporting the related regression tests
    
    ### Why are the changes needed?
    
    The previous JCasbin authorization path relied on `enforce` for each
    privilege probe and could still observe stale user-role links cached in
    the enforcer.
    
    This backport keeps the hotfix branch aligned with the optimized
    authorization path from branch-1.2, reduces the lookup cost on the
    authorization hot path, and makes role assignment changes take effect
    immediately.
    
    Fix: #10907
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `./gradlew --no-daemon :server-common:test --tests
    org.apache.gravitino.server.authorization.jcasbin.TestJcasbinAuthorizer
    :core:test --tests
    org.apache.gravitino.authorization.TestAuthorizationRequestContext
    -PskipITs -PskipDockerTests=false`
---
 .../authorization/AuthorizationRequestContext.java |  44 ++
 .../TestAuthorizationRequestContext.java           |  40 +
 .../authorization/jcasbin/JcasbinAuthorizer.java   | 280 ++++---
 .../jcasbin/TestJcasbinAuthorizer.java             | 845 ++++++++++++++++++++-
 4 files changed, 1094 insertions(+), 115 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
index 392eb13c27..050c093ce3 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
@@ -18,11 +18,15 @@
 package org.apache.gravitino.authorization;
 
 import java.security.Principal;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import javax.annotation.Nullable;
 import org.apache.gravitino.MetadataObject;
 
 public class AuthorizationRequestContext {
@@ -36,6 +40,13 @@ public class AuthorizationRequestContext {
   /** Used to determine whether the role has already been loaded. */
   private final AtomicBoolean hasLoadRole = new AtomicBoolean();
 
+  /**
+   * Role IDs of the current principal in the metalake under check. Populated 
once per request from
+   * {@code loadRole} and reused by every {@code authorize}/{@code deny} call 
so we don't re-derive
+   * the user→role linkage on each enforcer probe.
+   */
+  private volatile Set<Long> userRoleIds = Collections.emptySet();
+
   private volatile String originalAuthorizationExpression;
 
   /**
@@ -103,6 +114,39 @@ public class AuthorizationRequestContext {
     this.originalAuthorizationExpression = originalAuthorizationExpression;
   }
 
+  /**
+   * Returns the user role IDs associated with the current authorization 
request.
+   *
+   * <p>This context is request-scoped and is expected to be used only for the 
lifetime of a single
+   * authorization request.
+   *
+   * <p>The returned set is immutable and safe to iterate without defensive 
copying.
+   *
+   * @return the user role IDs for the current request, or an empty set if 
none have been set
+   */
+  public Set<Long> getUserRoleIds() {
+    return userRoleIds;
+  }
+
+  /**
+   * Sets the user role IDs associated with the current authorization request.
+   *
+   * <p>This context is request-scoped and is expected to be used only for the 
lifetime of a single
+   * authorization request.
+   *
+   * <p>The provided set is defensively copied before being stored, so 
subsequent caller-side
+   * mutations are not reflected in this context.
+   *
+   * @param userRoleIds the user role IDs for the current request; if {@code 
null}, an empty set is
+   *     stored
+   */
+  public void setUserRoleIds(@Nullable Set<Long> userRoleIds) {
+    this.userRoleIds =
+        userRoleIds == null || userRoleIds.isEmpty()
+            ? Collections.emptySet()
+            : Collections.unmodifiableSet(new HashSet<>(userRoleIds));
+  }
+
   public static class AuthorizationKey {
     private Principal principal;
     private String metalake;
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
index 2216a7dcfe..619cd77c22 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
@@ -19,8 +19,14 @@ package org.apache.gravitino.authorization;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import com.google.common.collect.ImmutableSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -101,4 +107,38 @@ public class TestAuthorizationRequestContext {
     context.loadRole(counter::incrementAndGet);
     assertEquals(2, counter.get(), "After a successful loadRole, further calls 
must be ignored.");
   }
+
+  @Test
+  public void testUserRoleIdsDefaultsToEmptySet() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    assertEquals(Collections.emptySet(), context.getUserRoleIds());
+    assertTrue(context.getUserRoleIds().isEmpty());
+  }
+
+  @Test
+  public void testSetUserRoleIdsWithNullNormalizesToEmptySet() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    context.setUserRoleIds(ImmutableSet.of(1L, 2L));
+    context.setUserRoleIds(null);
+    // Null must not propagate; downstream callers iterate the set without a 
null check.
+    assertSame(Collections.emptySet(), context.getUserRoleIds());
+  }
+
+  @Test
+  public void testSetUserRoleIdsRoundTrip() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    Set<Long> roles = ImmutableSet.of(7L, 11L, 13L);
+    context.setUserRoleIds(roles);
+    assertEquals(roles, context.getUserRoleIds());
+  }
+
+  @Test
+  public void testSetUserRoleIdsDefensivelyCopiesAndReturnsImmutableSet() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    Set<Long> roles = new HashSet<>(ImmutableSet.of(17L, 19L));
+    context.setUserRoleIds(roles);
+    roles.add(23L);
+    assertEquals(ImmutableSet.of(17L, 19L), context.getUserRoleIds());
+    assertThrows(UnsupportedOperationException.class, () -> 
context.getUserRoleIds().add(29L));
+  }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index cea47e353b..3a2b1847eb 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -27,14 +27,20 @@ import java.nio.charset.StandardCharsets;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Configs;
@@ -75,17 +81,11 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
   /** Jcasbin deny enforcer is used for metadata authorization. */
   private Enforcer denyEnforcer;
 
-  /** allow internal authorizer */
-  private InternalAuthorizer allowInternalAuthorizer;
-
-  /** deny internal authorizer */
-  private InternalAuthorizer denyInternalAuthorizer;
-
   /**
-   * loadedRoles is used to cache roles that have loaded permissions. When the 
permissions of a role
-   * are updated, they should be removed from it.
+   * loadedRoles caches the indexed privileges for each loaded role. When a 
role's privileges are
+   * updated, the role should be removed from this cache.
    */
-  private Cache<Long, Boolean> loadedRoles;
+  private Cache<Long, Map<PolicyKey, Effect>> loadedRoles;
 
   private Cache<Long, Optional<Long>> ownerRel;
 
@@ -104,9 +104,7 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
 
     // Initialize enforcers before the caches that reference them in removal 
listeners
     allowEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
-    allowInternalAuthorizer = new InternalAuthorizer(allowEnforcer);
     denyEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
-    denyInternalAuthorizer = new InternalAuthorizer(denyEnforcer);
 
     loadedRoles =
         Caffeine.newBuilder()
@@ -164,12 +162,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
             metadataObject,
             privilege,
             (authorizationKey) ->
-                allowInternalAuthorizer.authorizeInternal(
-                    authorizationKey.getPrincipal().getName(),
-                    authorizationKey.getMetalake(),
-                    authorizationKey.getMetadataObject(),
-                    authorizationKey.getPrivilege().name(),
-                    requestContext));
+                loadAndResolveEffect(
+                        authorizationKey.getPrincipal().getName(),
+                        authorizationKey.getMetalake(),
+                        authorizationKey.getMetadataObject(),
+                        authorizationKey.getPrivilege().name(),
+                        requestContext)
+                    == Effect.ALLOW);
     LOG.debug(
         "Authorization expression: {},privilege {}, result {}\n, principal 
{},metalake {},metadata object {}",
         requestContext.getOriginalAuthorizationExpression(),
@@ -195,12 +194,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
             metadataObject,
             privilege,
             (authorizationKey) ->
-                denyInternalAuthorizer.authorizeInternal(
-                    authorizationKey.getPrincipal().getName(),
-                    authorizationKey.getMetalake(),
-                    authorizationKey.getMetadataObject(),
-                    authorizationKey.getPrivilege().name(),
-                    requestContext));
+                loadAndResolveEffect(
+                        authorizationKey.getPrincipal().getName(),
+                        authorizationKey.getMetalake(),
+                        authorizationKey.getMetadataObject(),
+                        authorizationKey.getPrivilege().name(),
+                        requestContext)
+                    == Effect.DENY);
     LOG.debug(
         "Authorization expression: {},privilege {},deny result {}\n, principal 
{},metalake {},metadata object {}",
         requestContext.getOriginalAuthorizationExpression(),
@@ -405,56 +405,81 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     }
   }
 
-  private class InternalAuthorizer {
-
-    Enforcer enforcer;
-
-    public InternalAuthorizer(Enforcer enforcer) {
-      this.enforcer = enforcer;
+  /**
+   * Loads role privileges (if not yet loaded for this request) and resolves 
the effect of a single
+   * privilege probe. Returns {@code null} when the user/metadata cannot be 
looked up, so the caller
+   * treats it as "no rule applies".
+   */
+  @Nullable
+  private Effect loadAndResolveEffect(
+      String username,
+      String metalake,
+      MetadataObject metadataObject,
+      String privilege,
+      AuthorizationRequestContext requestContext) {
+    Long metadataId;
+    Long userId;
+    try {
+      UserEntity userEntity = getUserEntity(username, metalake);
+      userId = userEntity.id();
+      metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+    } catch (Exception e) {
+      LOG.debug("Can not get entity id", e);
+      return null;
     }
+    loadRolePrivilege(metalake, username, userId, requestContext);
+    return resolveEffect(userId, metadataObject, metadataId, privilege, 
requestContext);
+  }
 
-    private boolean authorizeInternal(
-        String username,
-        String metalake,
-        MetadataObject metadataObject,
-        String privilege,
-        AuthorizationRequestContext requestContext) {
-      return loadPrivilegeAndAuthorize(
-          username, metalake, metadataObject, privilege, requestContext);
+  /**
+   * Resolve a single privilege probe against the per-role policy index. 
Replaces the previous
+   * {@code enforcer.enforce} call, which scanned every policy line in the 
enforcer for each probe.
+   * Per-request cost goes from {@code O(total_policies)} to {@code 
O(roles_per_user)} hash probes.
+   *
+   * <p>Returns {@link Effect#ALLOW} or {@link Effect#DENY} when a matching 
rule is found, or {@code
+   * null} when no role grants or denies this key. The top-level {@code 
authorize}/{@code deny}
+   * entrypoints share this resolver and only differ in how they compare the 
returned effect to a
+   * boolean.
+   *
+   * <p>Cross-role priority: any role with {@link Effect#DENY} short-circuits 
and beats {@link
+   * Effect#ALLOW} from other roles. Within a single role DENY also beats 
ALLOW; that ordering is
+   * enforced by {@link #loadPolicyByRoleEntity} when building the index.
+   *
+   * <p>OWNER is resolved against {@link #ownerRel} rather than the role 
index: being the owner maps
+   * to {@link Effect#ALLOW}, otherwise {@code null} (so {@code deny(OWNER)} 
stays {@code false} for
+   * owners — only an explicit DENY policy can deny a privilege).
+   */
+  @Nullable
+  private Effect resolveEffect(
+      Long userId,
+      MetadataObject metadataObject,
+      Long metadataId,
+      String privilege,
+      AuthorizationRequestContext requestContext) {
+    if (AuthConstants.OWNER.equals(privilege)) {
+      Optional<Long> owner = ownerRel.getIfPresent(metadataId);
+      return Objects.equals(Optional.of(userId), owner) ? Effect.ALLOW : null;
     }
-
-    private boolean loadPrivilegeAndAuthorize(
-        String username,
-        String metalake,
-        MetadataObject metadataObject,
-        String privilege,
-        AuthorizationRequestContext requestContext) {
-      Long metadataId;
-      Long userId;
-      try {
-        UserEntity userEntity = getUserEntity(username, metalake);
-        userId = userEntity.id();
-        metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      } catch (Exception e) {
-        LOG.debug("Can not get entity id", e);
-        return false;
-      }
-      loadRolePrivilege(metalake, username, userId, requestContext);
-      return authorizeByJcasbin(userId, metadataObject, metadataId, privilege);
+    Set<Long> roleIds = requestContext.getUserRoleIds();
+    if (roleIds.isEmpty()) {
+      return null;
     }
-
-    private boolean authorizeByJcasbin(
-        Long userId, MetadataObject metadataObject, Long metadataId, String 
privilege) {
-      if (AuthConstants.OWNER.equals(privilege)) {
-        Optional<Long> owner = ownerRel.getIfPresent(metadataId);
-        return Objects.equals(Optional.of(userId), owner);
+    PolicyKey key = new PolicyKey(metadataObject.type().name(), metadataId, 
privilege);
+    Effect resolved = null;
+    for (Long roleId : roleIds) {
+      Map<PolicyKey, Effect> idx = loadedRoles.getIfPresent(roleId);
+      if (idx == null) {
+        continue;
+      }
+      Effect effect = idx.get(key);
+      if (effect == Effect.DENY) {
+        return Effect.DENY;
+      }
+      if (effect == Effect.ALLOW) {
+        resolved = Effect.ALLOW;
       }
-      return enforcer.enforce(
-          String.valueOf(userId),
-          String.valueOf(metadataObject.type()),
-          String.valueOf(metadataId),
-          privilege);
     }
+    return resolved;
   }
 
   private static UserEntity getUserEntity(String username, String metalake) 
throws IOException {
@@ -482,36 +507,40 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
                         SupportsRelationOperations.Type.ROLE_USER_REL,
                         userNameIdentifier,
                         Entity.EntityType.USER);
+            Set<Long> roleIds = new HashSet<>(entities.size());
             List<CompletableFuture<Void>> loadRoleFutures = new ArrayList<>();
             for (RoleEntity role : entities) {
               Long roleId = role.id();
+              roleIds.add(roleId);
               allowEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
               denyEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
               if (loadedRoles.getIfPresent(roleId) != null) {
                 continue;
               }
               CompletableFuture<Void> loadRoleFuture =
-                  CompletableFuture.supplyAsync(
-                          () -> {
-                            try {
-                              return entityStore.get(
-                                  NameIdentifierUtil.ofRole(metalake, 
role.name()),
-                                  Entity.EntityType.ROLE,
-                                  RoleEntity.class);
-                            } catch (Exception e) {
-                              throw new RuntimeException("Failed to load role: 
" + role.name(), e);
-                            }
-                          },
-                          executor)
-                      .thenAcceptAsync(
-                          roleEntity -> {
-                            loadPolicyByRoleEntity(roleEntity);
-                            loadedRoles.put(roleId, true);
-                          },
-                          executor);
+                  CompletableFuture.runAsync(
+                      () -> {
+                        loadedRoles.get(
+                            roleId,
+                            unused -> {
+                              try {
+                                RoleEntity roleEntity =
+                                    entityStore.get(
+                                        NameIdentifierUtil.ofRole(metalake, 
role.name()),
+                                        Entity.EntityType.ROLE,
+                                        RoleEntity.class);
+                                return loadPolicyByRoleEntity(roleEntity);
+                              } catch (Exception e) {
+                                throw new RuntimeException(
+                                    "Failed to load role: " + role.name(), e);
+                              }
+                            });
+                      },
+                      executor);
               loadRoleFutures.add(loadRoleFuture);
             }
             CompletableFuture.allOf(loadRoleFutures.toArray(new 
CompletableFuture[0])).join();
+            requestContext.setUserRoleIds(roleIds);
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
@@ -548,22 +577,27 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     }
   }
 
-  private void loadPolicyByRoleEntity(RoleEntity roleEntity) {
+  private Map<PolicyKey, Effect> loadPolicyByRoleEntity(RoleEntity roleEntity) 
{
     String metalake = 
NameIdentifierUtil.getMetalake(roleEntity.nameIdentifier());
     List<SecurableObject> securableObjects = roleEntity.securableObjects();
+    Long roleId = roleEntity.id();
+    String roleIdStr = String.valueOf(roleId);
+    Map<PolicyKey, Effect> index = new ConcurrentHashMap<>();
 
     for (SecurableObject securableObject : securableObjects) {
+      Long metadataId = MetadataIdConverter.getID(securableObject, metalake);
+      String metadataIdStr = String.valueOf(metadataId);
+      String typeName = securableObject.type().name();
       for (Privilege privilege : securableObject.privileges()) {
         Privilege.Condition condition = privilege.condition();
-        if (AuthConstants.DENY.equalsIgnoreCase(condition.name())) {
+        String privilegeName =
+            AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
+                .name()
+                .toUpperCase(Locale.ROOT);
+        boolean isDeny = AuthConstants.DENY.equalsIgnoreCase(condition.name());
+        if (isDeny) {
           denyEnforcer.addPolicy(
-              String.valueOf(roleEntity.id()),
-              securableObject.type().name(),
-              String.valueOf(MetadataIdConverter.getID(securableObject, 
metalake)),
-              AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
-                  .name()
-                  .toUpperCase(java.util.Locale.ROOT),
-              AuthConstants.ALLOW);
+              roleIdStr, typeName, metadataIdStr, privilegeName, 
AuthConstants.ALLOW);
         }
         // Since different roles of a user may simultaneously hold both 
"allow" and "deny"
         // permissions
@@ -574,14 +608,58 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         // roles should receive a false result when calling the authorize 
method.
 
         allowEnforcer.addPolicy(
-            String.valueOf(roleEntity.id()),
-            securableObject.type().name(),
-            String.valueOf(MetadataIdConverter.getID(securableObject, 
metalake)),
-            AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
-                .name()
-                .toUpperCase(java.util.Locale.ROOT),
-            condition.name().toLowerCase(java.util.Locale.ROOT));
+            roleIdStr,
+            typeName,
+            metadataIdStr,
+            privilegeName,
+            condition.name().toLowerCase(Locale.ROOT));
+
+        // Populate the per-role index. Within a single role DENY wins over 
ALLOW so that the
+        // index agrees with the allowEnforcer's policy_effect (some allow && 
!some deny).
+        PolicyKey key = new PolicyKey(typeName, metadataId, privilegeName);
+        Effect effect = isDeny ? Effect.DENY : Effect.ALLOW;
+        index.merge(
+            key, effect, (existing, incoming) -> existing == Effect.DENY ? 
existing : incoming);
       }
     }
+    return index;
+  }
+
+  /** Composite key for the per-role policy index. */
+  static final class PolicyKey {
+    private final String type;
+    private final Long metadataId;
+    private final String privilege;
+    private final int hash;
+
+    PolicyKey(String type, Long metadataId, String privilege) {
+      this.type = type;
+      this.metadataId = metadataId;
+      this.privilege = privilege;
+      this.hash = Objects.hash(type, metadataId, privilege);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof PolicyKey)) {
+        return false;
+      }
+      PolicyKey other = (PolicyKey) o;
+      return hash == other.hash
+          && Objects.equals(metadataId, other.metadataId)
+          && Objects.equals(type, other.type)
+          && Objects.equals(privilege, other.privilege);
+    }
+
+    @Override
+    public int hashCode() {
+      return hash;
+    }
+  }
+
+  /** Per-role per-key effect; DENY beats ALLOW within a role and across 
roles. */
+  enum Effect {
+    ALLOW,
+    DENY
   }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index 4388ef0952..fa0c86fee2 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -17,8 +17,13 @@
 
 package org.apache.gravitino.server.authorization.jcasbin;
 
+import static org.apache.gravitino.authorization.Privilege.Name.RUN_JOB;
+import static org.apache.gravitino.authorization.Privilege.Name.SELECT_TABLE;
 import static org.apache.gravitino.authorization.Privilege.Name.USE_CATALOG;
+import static org.apache.gravitino.authorization.Privilege.Name.USE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,9 +42,17 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Entity;
@@ -70,6 +83,7 @@ import org.apache.gravitino.utils.PrincipalUtils;
 import org.casbin.jcasbin.main.Enforcer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
@@ -149,6 +163,13 @@ public class TestJcasbinAuthorizer {
         .thenReturn(baseMetalake);
   }
 
+  @BeforeEach
+  public void resetUserRoleStubBetweenTests() throws IOException {
+    // Restore the default empty role assignment for "tester" so that one 
test's stubbed roles
+    // don't leak into the next test. Each test re-stubs its own role list 
before asserting.
+    mockUserRoles(NameIdentifierUtil.ofUser(METALAKE, USERNAME));
+  }
+
   @AfterAll
   public static void stop() {
     if (principalUtilsMockedStatic != null) {
@@ -185,9 +206,10 @@ public class TestJcasbinAuthorizer {
             eq(Entity.EntityType.USER)))
         .thenReturn(ImmutableList.of(allowRole));
     assertTrue(doAuthorize(currentPrincipal));
-    // Test role cache.
-    // When permissions are changed but handleRolePrivilegeChange is not 
executed, the system will
-    // use the cached permissions in JCasbin, so authorize can succeed.
+    // After re-assigning the user from allowRole to a role with no 
privileges, authorize must
+    // return false even though allowRole's policies are still cached in the 
enforcer. Each
+    // request iterates the user's fresh role list, so removed role 
assignments take effect
+    // immediately without waiting for a handleRolePrivilegeChange call.
     Long newRoleId = -1L;
     RoleEntity tempNewRole = getRoleEntity(newRoleId, "tempNewRole", 
ImmutableList.of());
     when(entityStore.get(
@@ -200,10 +222,11 @@ public class TestJcasbinAuthorizer {
             eq(userNameIdentifier),
             eq(Entity.EntityType.USER)))
         .thenReturn(ImmutableList.of(tempNewRole));
-    assertTrue(doAuthorize(currentPrincipal));
-    // After clearing the cache, authorize will fail
+    assertFalse(doAuthorize(currentPrincipal));
+    // Invalidating the role cache is still a no-op in this scenario; we left 
it in to exercise
+    // the invalidation path.
     jcasbinAuthorizer.handleRolePrivilegeChange(ALLOW_ROLE_ID);
-    //    assertFalse(doAuthorize(currentPrincipal));
+    assertFalse(doAuthorize(currentPrincipal));
     // When the user is re-assigned the correct role, the authorization will 
succeed.
     when(supportsRelationOperations.listEntitiesByRelation(
             eq(SupportsRelationOperations.Type.ROLE_USER_REL),
@@ -363,11 +386,12 @@ public class TestJcasbinAuthorizer {
     makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
 
     // Get the loadedRoles cache via reflection
-    Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
 
     // Manually add a role to the cache
     Long testRoleId = 100L;
-    loadedRoles.put(testRoleId, true);
+    loadedRoles.put(testRoleId, Collections.emptyMap());
 
     // Verify it's in the cache
     assertNotNull(loadedRoles.getIfPresent(testRoleId));
@@ -410,7 +434,8 @@ public class TestJcasbinAuthorizer {
     Enforcer denyEnforcer = getDenyEnforcer(jcasbinAuthorizer);
 
     // Get the loadedRoles cache
-    Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
 
     // Add a role and its policy to the enforcer
     Long testRoleId = 300L;
@@ -421,7 +446,7 @@ public class TestJcasbinAuthorizer {
     denyEnforcer.addPolicy(roleIdStr, "CATALOG", "999", "USE_CATALOG", 
"allow");
 
     // Add role to cache
-    loadedRoles.put(testRoleId, true);
+    loadedRoles.put(testRoleId, Collections.emptyMap());
 
     // Verify role exists in enforcer (has policy)
     assertTrue(allowEnforcer.hasPolicy(roleIdStr, "CATALOG", "999", 
"USE_CATALOG", "allow"));
@@ -436,10 +461,759 @@ public class TestJcasbinAuthorizer {
     assertFalse(denyEnforcer.hasPolicy(roleIdStr, "CATALOG", "999", 
"USE_CATALOG", "allow"));
   }
 
+  @Test
+  public void testAuthorizeAndDenyReturnFalseForUserWithNoRoles() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    mockUserRoles(userIdent);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testDenyEndpointReturnsTrueForExplicitDenyRole() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1001L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            roleId,
+            "denyOnlyRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testDenyEndpointReturnsFalseForAllowOnlyRole() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1002L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            roleId,
+            "allowOnlyRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(allowRole);
+    mockUserRoles(userIdent, allowRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertTrue(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testLoadPolicyByRoleEntityAddsAllowOnlyToAllowEnforcer() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2010L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            roleId,
+            "allowLoadRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(allowRole);
+    mockUserRoles(userIdent, allowRole);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+
+    String roleIdStr = String.valueOf(roleId);
+    String metadataIdStr = String.valueOf(CATALOG_ID);
+    Enforcer allowEnforcer = getAllowEnforcer(jcasbinAuthorizer);
+    Enforcer denyEnforcer = getDenyEnforcer(jcasbinAuthorizer);
+    assertTrue(
+        allowEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "allow"));
+    assertFalse(
+        denyEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "allow"));
+
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertEquals(
+        JcasbinAuthorizer.Effect.ALLOW,
+        loadedRoles
+            .getIfPresent(roleId)
+            .get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void 
testLoadPolicyByRoleEntityAddsDenyToBothEnforcersWithExpectedEffects()
+      throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2011L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            roleId,
+            "denyLoadRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    assertTrue(
+        jcasbinAuthorizer.deny(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+
+    String roleIdStr = String.valueOf(roleId);
+    String metadataIdStr = String.valueOf(CATALOG_ID);
+    Enforcer allowEnforcer = getAllowEnforcer(jcasbinAuthorizer);
+    Enforcer denyEnforcer = getDenyEnforcer(jcasbinAuthorizer);
+    assertTrue(allowEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "deny"));
+    assertTrue(denyEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "allow"));
+
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertEquals(
+        JcasbinAuthorizer.Effect.DENY,
+        loadedRoles
+            .getIfPresent(roleId)
+            .get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void testCrossRoleDenyBeatsAllowOnBothEndpoints() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long allowRoleId = 1007L;
+    Long denyRoleId = 1008L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            allowRoleId,
+            "mixedAllow" + allowRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog",
+                    MetadataObject.Type.CATALOG,
+                    allowRoleId,
+                    USE_CATALOG,
+                    "ALLOW")));
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "mixedDeny" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, denyRoleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(allowRole);
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, allowRole, denyRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testRoleAssignmentChangeImmediatelyVisibleToDenyEndpoint() 
throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long denyRoleId = 1009L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "denyAssignRole" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, denyRoleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.deny(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    // Revoke the deny role assignment from the user. Even though the enforcer 
still has the
+    // user→role edge from the previous request and the role's policy is still 
cached, the new
+    // request reads a fresh userRoleIds and therefore must no longer see the 
deny.
+    mockUserRoles(userIdent);
+    assertFalse(
+        jcasbinAuthorizer.deny(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testHandleRolePrivilegeChangeRemovesRoleFromIndex() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1003L;
+    RoleEntity role =
+        getRoleEntity(
+            roleId,
+            "indexRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertNotNull(loadedRoles.getIfPresent(roleId));
+    jcasbinAuthorizer.handleRolePrivilegeChange(roleId);
+    assertNull(loadedRoles.getIfPresent(roleId));
+  }
+
+  @Test
+  public void testRoleIndexReloadAfterPrivilegeChange() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1004L;
+    RoleEntity withPrivilege =
+        getRoleEntity(
+            roleId,
+            "reloadRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(withPrivilege);
+    mockUserRoles(userIdent, withPrivilege);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    // Revoke the privilege from the role and invalidate. The index must be 
repopulated
+    // from the new (empty) policy set, not stay stuck on the old ALLOW entry.
+    RoleEntity withoutPrivilege = getRoleEntity(roleId, "reloadRole" + roleId, 
ImmutableList.of());
+    mockRoleEntity(withoutPrivilege);
+    jcasbinAuthorizer.handleRolePrivilegeChange(roleId);
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testConcurrentRoleLoadsShareOnePolicyIndex() throws Exception {
+    Long roleId = 1010L;
+    JcasbinAuthorizer.PolicyKey policyKey =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, "USE_CATALOG");
+    Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect> expectedIndex =
+        Collections.singletonMap(policyKey, JcasbinAuthorizer.Effect.ALLOW);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    AtomicInteger roleLoadCount = new AtomicInteger();
+    CountDownLatch ready = new CountDownLatch(2);
+    CountDownLatch start = new CountDownLatch(1);
+    ExecutorService requestExecutor = Executors.newFixedThreadPool(2);
+    try {
+      Future<Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> first 
=
+          requestExecutor.submit(
+              () -> {
+                ready.countDown();
+                start.await(5, TimeUnit.SECONDS);
+                return loadedRoles.get(
+                    roleId,
+                    unused -> {
+                      roleLoadCount.incrementAndGet();
+                      try {
+                        Thread.sleep(100L);
+                      } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                      }
+                      return expectedIndex;
+                    });
+              });
+      Future<Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
second =
+          requestExecutor.submit(
+              () -> {
+                ready.countDown();
+                start.await(5, TimeUnit.SECONDS);
+                return loadedRoles.get(
+                    roleId,
+                    unused -> {
+                      roleLoadCount.incrementAndGet();
+                      try {
+                        Thread.sleep(100L);
+                      } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                      }
+                      return expectedIndex;
+                    });
+              });
+      assertTrue(ready.await(5, TimeUnit.SECONDS));
+      start.countDown();
+      assertEquals(expectedIndex, first.get(5, TimeUnit.SECONDS));
+      assertEquals(expectedIndex, second.get(5, TimeUnit.SECONDS));
+    } finally {
+      requestExecutor.shutdownNow();
+    }
+    assertEquals(1, roleLoadCount.get());
+    assertEquals(expectedIndex, loadedRoles.getIfPresent(roleId));
+  }
+
+  @Test
+  public void testLoadedRoleCacheStoresPolicyIndexValue() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1011L;
+    RoleEntity role =
+        getRoleEntity(
+            roleId,
+            "cachedIndexRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect> policyIndex =
+        getLoadedRolesCache(jcasbinAuthorizer).getIfPresent(roleId);
+    assertNotNull(policyIndex);
+    assertEquals(
+        JcasbinAuthorizer.Effect.ALLOW,
+        policyIndex.get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void testCachedRolePolicyIndexSkipsRoleEntityLoad() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1012L;
+    RoleEntity role = getRoleEntity(roleId, "cachedRole" + roleId, 
ImmutableList.of());
+    mockUserRoles(userIdent, role);
+    getLoadedRolesCache(jcasbinAuthorizer)
+        .put(
+            roleId,
+            Collections.singletonMap(
+                new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG"),
+                JcasbinAuthorizer.Effect.ALLOW));
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, role.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenThrow(new AssertionError("Cached roles should not be loaded 
again."));
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testPolicyKeyEqualityAndHash() {
+    JcasbinAuthorizer.PolicyKey base =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 1L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey same =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 1L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey differentType =
+        new JcasbinAuthorizer.PolicyKey("SCHEMA", 1L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey differentId =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 2L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey differentPrivilege =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 1L, "SELECT_TABLE");
+    assertEquals(base, same);
+    assertEquals(base.hashCode(), same.hashCode());
+    assertNotEquals(base, differentType);
+    assertNotEquals(base, differentId);
+    assertNotEquals(base, differentPrivilege);
+    assertNotEquals(base, null);
+    assertNotEquals(base, "not a policy key");
+  }
+
+  @Test
+  public void testIcebergTableLevelAuthorizeMatchesByEntityType() throws 
Exception {
+    // Iceberg API auth resolves to a TABLE-typed MetadataObject and probes 
SELECT_TABLE on it.
+    // The per-role index must key on entity type, so a TABLE-level grant must 
NOT satisfy a
+    // CATALOG-level probe (different type → different PolicyKey).
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1005L;
+    RoleEntity tableRole =
+        getRoleEntity(
+            roleId,
+            "icebergTableRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "icebergCat.icebergSchema.icebergTable",
+                    MetadataObject.Type.TABLE,
+                    roleId,
+                    SELECT_TABLE,
+                    "ALLOW")));
+    mockRoleEntity(tableRole);
+    mockUserRoles(userIdent, tableRole);
+
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(
+                ImmutableList.of("icebergCat", "icebergSchema", 
"icebergTable"),
+                MetadataObject.Type.TABLE),
+            SELECT_TABLE,
+            new AuthorizationRequestContext()));
+
+    // Different privilege on the same TABLE: no policy → false.
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(
+                ImmutableList.of("icebergCat", "icebergSchema", 
"icebergTable"),
+                MetadataObject.Type.TABLE),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+
+    // Different entity type, same id: PolicyKey differs by `type`, so this 
must be false.
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "icebergCat", 
MetadataObject.Type.CATALOG),
+            SELECT_TABLE,
+            new AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testGravitinoApiSchemaLevelAuthorize() throws Exception {
+    // Gravitino API auth flows through AuthorizationExpressionEvaluator → 
JcasbinAuthorizer with
+    // SCHEMA-typed objects for schema-scoped operations. Verify that a 
SCHEMA-level USE_SCHEMA
+    // grant via a role authorizes the schema and only the schema.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1006L;
+    RoleEntity schemaRole =
+        getRoleEntity(
+            roleId,
+            "schemaRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "cat1.schema1", MetadataObject.Type.SCHEMA, roleId, 
USE_SCHEMA, "ALLOW")));
+    mockRoleEntity(schemaRole);
+    mockUserRoles(userIdent, schemaRole);
+
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(ImmutableList.of("cat1", "schema1"), 
MetadataObject.Type.SCHEMA),
+            USE_SCHEMA,
+            new AuthorizationRequestContext()));
+
+    // USE_CATALOG on the parent catalog has no matching policy.
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "cat1", MetadataObject.Type.CATALOG),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testIntraRoleDenyBeatsAllowForSameKey() throws Exception {
+    // Within a single role, granting both ALLOW and DENY for the same (type, 
metadataId,
+    // privilege) must resolve to DENY. The unified resolveEffect relies on
+    // loadPolicyByRoleEntity's merge function (existing == DENY ? existing : 
incoming) to enforce
+    // intra-role priority, so authorize/deny see DENY regardless of insertion 
order.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2020L;
+    SecurableObject allow =
+        makeSecurableObject(
+            "testCatalog", MetadataObject.Type.CATALOG, roleId, USE_CATALOG, 
"ALLOW");
+    SecurableObject deny =
+        makeSecurableObject(
+            "testCatalog", MetadataObject.Type.CATALOG, roleId, USE_CATALOG, 
"DENY");
+    RoleEntity mixedRole =
+        getRoleEntity(roleId, "mixedSameKey" + roleId, ImmutableList.of(allow, 
deny));
+    mockRoleEntity(mixedRole);
+    mockUserRoles(userIdent, mixedRole);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertEquals(
+        JcasbinAuthorizer.Effect.DENY,
+        loadedRoles
+            .getIfPresent(roleId)
+            .get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void testAuthorizeAndDenyBothFalseWhenRoleHasNoRuleForKey() throws 
Exception {
+    // The role grants USE_CATALOG on the catalog but the request probes 
SELECT_TABLE. The shared
+    // resolver returns null (no matching rule), so both endpoints must return 
false. This guards
+    // against the unified resolveEffect collapsing "no rule" into either 
ALLOW or DENY.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2021L;
+    RoleEntity role =
+        getRoleEntity(
+            roleId,
+            "noRuleRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(
+        jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, catalog, 
SELECT_TABLE, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
SELECT_TABLE, ctx));
+  }
+
+  @Test
+  public void testAuthorizeAndDenyShareSameResolveAcrossPrivileges() throws 
Exception {
+    // A single role grants ALLOW on USE_CATALOG and DENY on USE_SCHEMA for 
the same metadata
+    // object. The unified resolveEffect must return the corresponding Effect 
for each PolicyKey
+    // independently, so authorize/deny disagree per privilege rather than 
collapsing to one
+    // verdict for the whole role. This is the core property roryqi asked for: 
one resolver, two
+    // endpoints differing only in how they compare the result.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2022L;
+    SecurableObject allowUseCatalog =
+        makeSecurableObject(
+            "testCatalog", MetadataObject.Type.CATALOG, roleId, USE_CATALOG, 
"ALLOW");
+    SecurableObject denyUseSchema =
+        makeSecurableObject("testCatalog", MetadataObject.Type.CATALOG, 
roleId, USE_SCHEMA, "DENY");
+    RoleEntity role =
+        getRoleEntity(
+            roleId, "mixedPrivRole" + roleId, 
ImmutableList.of(allowUseCatalog, denyUseSchema));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertTrue(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_SCHEMA, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_SCHEMA, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeDenyOnly() throws 
Exception {
+    // Reproduces the case roryqi flagged: an OGNL expression like 
`METALAKE::RUN_JOB` is rewritten
+    // by AuthorizationExpressionConverter to a *single* 
`authorizer.authorize(...)` call — there is
+    // no companion `authorizer.deny(...)` call (which only ANY_xxx aliases 
generate). So the
+    // authorize endpoint must independently respect explicit DENY policies; a 
role that grants
+    // only DENY RUN_JOB on the metalake must cause authorize() to return 
false.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 3001L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            roleId,
+            "denyRunJobRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, roleId, 
RUN_JOB, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeDenyBeatsAllowRole() 
throws Exception {
+    // Companion to testAuthorizeReturnsFalseForDirectPrivilegeDenyOnly per 
roryqi's review: also
+    // assign the user a sibling role that ALLOWs RUN_JOB on the same 
metalake, so the test pins
+    // down DENY-beats-ALLOW on the authorize endpoint when the OGNL 
expression `METALAKE::RUN_JOB`
+    // emits only an authorize() call (no deny() probe). Distinct role IDs 
from any other test to
+    // avoid cache-pollution interactions.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long denyRoleId = 3010L;
+    Long allowRoleId = 3011L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "denyRunJobRole" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, denyRoleId, 
RUN_JOB, "DENY")));
+    RoleEntity allowRole =
+        getRoleEntity(
+            allowRoleId,
+            "allowRunJobRole" + allowRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, allowRoleId, 
RUN_JOB, "ALLOW")));
+    mockRoleEntity(denyRole);
+    mockRoleEntity(allowRole);
+    mockUserRoles(userIdent, denyRole, allowRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeIntraRoleDeny() 
throws Exception {
+    // Same role grants both ALLOW and DENY for METALAKE::RUN_JOB. Even though 
the expression
+    // converter only emits a single authorize() call (no deny() probe), DENY 
must still beat
+    // ALLOW inside the role and propagate through resolveEffect to the 
authorize endpoint.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 3002L;
+    SecurableObject allow =
+        makeSecurableObject("testMetalake", MetadataObject.Type.METALAKE, 
roleId, RUN_JOB, "ALLOW");
+    SecurableObject deny =
+        makeSecurableObject("testMetalake", MetadataObject.Type.METALAKE, 
roleId, RUN_JOB, "DENY");
+    RoleEntity mixedRole =
+        getRoleEntity(roleId, "mixedRunJobRole" + roleId, 
ImmutableList.of(allow, deny));
+    mockRoleEntity(mixedRole);
+    mockUserRoles(userIdent, mixedRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeCrossRoleDeny() 
throws Exception {
+    // One role grants ALLOW RUN_JOB on metalake, another role grants DENY 
RUN_JOB on the same
+    // metalake. The direct-privilege expression `METALAKE::RUN_JOB` only 
triggers authorize();
+    // cross-role DENY priority must still take effect from the authorize side 
without relying on
+    // a paired deny() call from an ANY_xxx expansion.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long allowRoleId = 3003L;
+    Long denyRoleId = 3004L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            allowRoleId,
+            "allowRunJob" + allowRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, allowRoleId, 
RUN_JOB, "ALLOW")));
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "denyRunJob" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, denyRoleId, 
RUN_JOB, "DENY")));
+    mockRoleEntity(allowRole);
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, allowRole, denyRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testIsOwnerReturnsFalseWhenAnotherUserIsOwner() throws Exception 
{
+    // Existing testAuthorizeByOwner only covers "user is owner" and "no owner 
cached". This case
+    // pins the third branch: ownerRel resolves to a *different* user, so 
isOwner must return
+    // false even though the cache entry exists.
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    Long otherUserId = USER_ID + 1000L;
+    Cache<Long, Optional<Long>> ownerRel = getOwnerRelCache(jcasbinAuthorizer);
+    ownerRel.invalidateAll();
+    ownerRel.put(CATALOG_ID, Optional.of(otherUserId));
+
+    NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE, 
"testCatalog");
+    // Stub the relation lookup so loadOwnerPolicy doesn't overwrite our 
manually-injected entry
+    // when isOwner walks through. We pre-seed ownerRel above, so 
loadOwnerPolicy's
+    // `getIfPresent != null` short-circuit fires and the relation lookup is 
never reached — but
+    // stub it defensively so a future refactor that drops the short-circuit 
doesn't silently
+    // change this test's semantics.
+    doReturn(ImmutableList.of())
+        .when(supportsRelationOperations)
+        .listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.OWNER_REL),
+            eq(catalogIdent),
+            eq(Entity.EntityType.CATALOG));
+
+    assertFalse(doAuthorizeOwner(currentPrincipal));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseWhenMetadataIdLookupFails() throws 
Exception {
+    // loadAndResolveEffect swallows exceptions from the user/metadata id 
lookup and returns null
+    // (treated as "no rule"). This guards against an unauthenticated user or 
a transient
+    // catalog-resolution error inadvertently leaking through as an ALLOW or 
DENY verdict — both
+    // endpoints must return false.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    MetadataObject unknown =
+        MetadataObjects.of(null, "missingCatalog", 
MetadataObject.Type.CATALOG);
+    metadataIdConverterMockedStatic
+        .when(() -> MetadataIdConverter.getID(eq(unknown), eq(METALAKE)))
+        .thenThrow(new RuntimeException("simulated id lookup failure"));
+    try {
+      AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+      assertFalse(
+          jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, unknown, 
USE_CATALOG, ctx));
+      assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, unknown, 
USE_CATALOG, ctx));
+    } finally {
+      // Restore the default stub so subsequent tests still resolve to 
CATALOG_ID.
+      metadataIdConverterMockedStatic
+          .when(() -> MetadataIdConverter.getID(eq(unknown), eq(METALAKE)))
+          .thenReturn(CATALOG_ID);
+    }
+  }
+
   @Test
   public void testCacheInitialization() throws Exception {
     // Verify that caches are initialized
-    Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
     Cache<Long, Optional<Long>> ownerRel = getOwnerRelCache(jcasbinAuthorizer);
 
     assertNotNull(loadedRoles, "loadedRoles cache should be initialized");
@@ -447,11 +1221,12 @@ public class TestJcasbinAuthorizer {
   }
 
   @SuppressWarnings("unchecked")
-  private static Cache<Long, Boolean> getLoadedRolesCache(JcasbinAuthorizer 
authorizer)
-      throws Exception {
+  private static Cache<Long, Map<JcasbinAuthorizer.PolicyKey, 
JcasbinAuthorizer.Effect>>
+      getLoadedRolesCache(JcasbinAuthorizer authorizer) throws Exception {
     Field field = JcasbinAuthorizer.class.getDeclaredField("loadedRoles");
     field.setAccessible(true);
-    return (Cache<Long, Boolean>) field.get(authorizer);
+    return (Cache<Long, Map<JcasbinAuthorizer.PolicyKey, 
JcasbinAuthorizer.Effect>>)
+        field.get(authorizer);
   }
 
   @SuppressWarnings("unchecked")
@@ -473,4 +1248,46 @@ public class TestJcasbinAuthorizer {
     field.setAccessible(true);
     return (Enforcer) field.get(authorizer);
   }
+
+  private static SecurableObject makeSecurableObject(
+      String name,
+      MetadataObject.Type type,
+      Long roleId,
+      Privilege.Name privilege,
+      String condition) {
+    try {
+      SecurableObjectPO po =
+          SecurableObjectPO.builder()
+              .withType(String.valueOf(type))
+              .withMetadataObjectId(CATALOG_ID)
+              .withRoleId(roleId)
+              .withPrivilegeNames(
+                  
objectMapper.writeValueAsString(ImmutableList.of(privilege.name())))
+              
.withPrivilegeConditions(objectMapper.writeValueAsString(ImmutableList.of(condition)))
+              .withDeletedAt(0L)
+              .withCurrentVersion(1L)
+              .withLastVersion(1L)
+              .build();
+      return POConverters.fromSecurableObjectPO(name, po, type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void mockUserRoles(NameIdentifier userIdent, RoleEntity... 
roles)
+      throws IOException {
+    when(supportsRelationOperations.listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.ROLE_USER_REL),
+            eq(userIdent),
+            eq(Entity.EntityType.USER)))
+        .thenReturn(ImmutableList.copyOf(roles));
+  }
+
+  private static void mockRoleEntity(RoleEntity role) throws IOException {
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, role.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenReturn(role);
+  }
 }

Reply via email to